Giriş
Embedded bir engine yaratmak için DebeziumEngine nesnesi yaratılır ve bu nesne bir ExecutorService ile çalıştırılır. DebeziumEngine.notifying() ile de bir listener takılırPropertyler
database.hostnamedatabase.port
database.user
database.password
database.server.name
table.include.list
connector.class
Örnek - MySQL
Şöyle yaparız
// Define the configuration for the Debezium Engine with MySQL connector...Properties props = config.asProperties(); props.setProperty("name", "engine"); props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat"); props.setProperty("offset.flush.interval.ms", "60000"); /* begin connector properties */ props.setProperty("database.hostname", "localhost"); props.setProperty("database.port", "3306"); props.setProperty("database.user", "mysqluser"); props.setProperty("database.password", "mysqlpw"); props.setProperty("database.server.id", "85744"); props.setProperty("database.server.name", "my-app-connector"); props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory"); props.setProperty("database.history.file.filename", "/path/to/storage/dbhistory.dat"); // Create the engine with this configuration ... try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class) .using(props) .notifying(record -> { System.out.println(record); }).build() ) { // Run the engine asynchronously ... ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); // Do something else or wait for a signal or an event } // Engine is stopped when the main code is finished
Örnek - PostgreSQL
Şöyle yaparız
@Component
public class DebeziumConnector {
private static final String SCHEMA_PLACEHOLDER = "<schema>";
private final Logger log = LoggerFactory.getLogger(this.getClass());
private ExecutorService executor = Executors.newSingleThreadExecutor();
private DebeziumEngine<ChangeEvent<String, String>> engine;
private String databaseHostname;
private String databaseSchema;
private String offsetFlushIntervalMs;
private String heartBeatMs;
private String pollIntervalMs;
@PreDestroy
public void shutdown() throws IOException {
log.info("Shutting down the CDC engine");
log.info("Please wait while the current position in the offset log is stored!");
engine.close();
executor.shutdown();
}
@PostConstruct
public void start() {
log.info("Starting the CDC engine");
var properties = getDebeziumProperties();
var offsetPolicy = offsetFlushIntervalMs.equals("0") ?
OffsetCommitPolicy.always() : OffsetCommitPolicy.periodic(properties);
this.engine = DebeziumEngine.create(Json.class)
.using(properties)
.using(offsetPolicy)
.notifying(eventHandler)
.build();
executor.execute(engine);
}
}Properties şöyle olsun
private Properties getDebeziumProperties() {
var props = new Properties();
var snapshotTables = "<schema>.table_one,<schema>.table_two,<schema>.table_three";
var allTables = snapshotTables + ",<schema>.table_without_snapshot_need";
if (!heartBeatMs.isBlank() && !heartBeatMs.equals("0")) {
allTables = allTables + ",<schema>.cdc_heartbeat";
props.setProperty("heartbeat.interval.ms", heartBeatMs);
props.setProperty(
"heartbeat.action.query",
"insert into <schema>.cdc_heartbeat VALUES (B'1')"
.replace(SCHEMA_PLACEHOLDER, databaseSchema)
);
}
props.setProperty("name", "FooToBarCDC");
props.setProperty("plugin.name", "pgoutput");
props.setProperty("publication.autocreate.mode", "filtered");
setOptionalProperty(s -> props.setProperty("offset.storage", s), offsetStorage);
setOptionalProperty(s -> props.setProperty("offset.flush.interval.ms", s),
offsetFlushIntervalMs);
setOptionalProperty(s -> props.setProperty("snapshot.mode", s), snapshotMode);
setOptionalProperty(s -> props.setProperty("poll.interval.ms", s), pollIntervalMs);
if (FileOffsetBackingStore.class.getName().equals(offsetStorage)) {
props.setProperty("offset.storage.file.filename", offsetStorageFileFilename);
}
props.setProperty("database.hostname", databaseHostname);
props.setProperty("database.port", databasePort);
props.setProperty("database.user", databaseUser);
props.setProperty("database.password", databasePassword);
props.setProperty("database.dbname", databaseDbname);
props.setProperty("database.server.name", "foo");
props.setProperty("table.include.list", allTables
.replace(SCHEMA_PLACEHOLDER, databaseSchema));
props.setProperty("snapshot.include.collection.list", snapshotTables
.replace(SCHEMA_PLACEHOLDER, databaseSchema));
props.setProperty("connector.class",
"io.debezium.connector.postgresql.PostgresConnector");
return props;
}
private void setOptionalProperty(Consumer<String> consumer, String property) {
if (property != null && !property.isBlank()) {
consumer.accept(property);
}
}
Okunan JSON verisini çevireceğimiz sınıf şöyle olsun
public class CdcEvent {
private long id;
private String table;
}Event handler şöyle olsun
@Component
public class SingleEventHandler implements
Consumer<ChangeEvent<String, String>> {
private static final JsonPointer TABLE = JsonPointer.compile("/payload/source/table");
private static final JsonPointer PAYLOAD = JsonPointer.compile("/payload/after/id");
private ObjectMapper objectMapper = JacksonUtil.createObjectMapper();
private Map<String, DataExtractor> dataExtractors;
@Override
public void accept(final ChangeEvent<String, String> event) {
var cdcEvent = parseEvent(event);
...
}
private CdcEvent parseEvent(final ChangeEvent<String, String> event) {
if (event == null || event.value() == null) {
return new CdcEvent();
}
try {
var json = objectMapper.readTree(event.value());
var cdcEvent = new CdcEvent().setId(json.at(PAYLOAD).asLong())
.setTable(json.at(TABLE).asText());
return cdcEvent;
} catch (JsonProcessingException e) {
throw new EventParsingException(e);
}
}
}
Hiç yorum yok:
Yorum Gönder