Giriş
Embedded bir engine yaratmak için DebeziumEngine nesnesi yaratılır ve bu nesne bir ExecutorService ile çalıştırılır. DebeziumEngine.notifying() ile de bir listener takılırPropertyler
database.hostnamedatabase.port
database.user
database.password
database.server.name
table.include.list
connector.class
Örnek - MySQL
Şöyle yaparız
// Define the configuration for the Debezium Engine with MySQL connector...Properties props = config.asProperties(); props.setProperty("name", "engine"); props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat"); props.setProperty("offset.flush.interval.ms", "60000"); /* begin connector properties */ props.setProperty("database.hostname", "localhost"); props.setProperty("database.port", "3306"); props.setProperty("database.user", "mysqluser"); props.setProperty("database.password", "mysqlpw"); props.setProperty("database.server.id", "85744"); props.setProperty("database.server.name", "my-app-connector"); props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory"); props.setProperty("database.history.file.filename", "/path/to/storage/dbhistory.dat"); // Create the engine with this configuration ... try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class) .using(props) .notifying(record -> { System.out.println(record); }).build() ) { // Run the engine asynchronously ... ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); // Do something else or wait for a signal or an event } // Engine is stopped when the main code is finished
Örnek - PostgreSQL
Şöyle yaparız
@Component public class DebeziumConnector { private static final String SCHEMA_PLACEHOLDER = "<schema>"; private final Logger log = LoggerFactory.getLogger(this.getClass()); private ExecutorService executor = Executors.newSingleThreadExecutor(); private DebeziumEngine<ChangeEvent<String, String>> engine; private String databaseHostname; private String databaseSchema; private String offsetFlushIntervalMs; private String heartBeatMs; private String pollIntervalMs; @PreDestroy public void shutdown() throws IOException { log.info("Shutting down the CDC engine"); log.info("Please wait while the current position in the offset log is stored!"); engine.close(); executor.shutdown(); } @PostConstruct public void start() { log.info("Starting the CDC engine"); var properties = getDebeziumProperties(); var offsetPolicy = offsetFlushIntervalMs.equals("0") ? OffsetCommitPolicy.always() : OffsetCommitPolicy.periodic(properties); this.engine = DebeziumEngine.create(Json.class) .using(properties) .using(offsetPolicy) .notifying(eventHandler) .build(); executor.execute(engine); } }
Properties şöyle olsun
private Properties getDebeziumProperties() { var props = new Properties(); var snapshotTables = "<schema>.table_one,<schema>.table_two,<schema>.table_three"; var allTables = snapshotTables + ",<schema>.table_without_snapshot_need"; if (!heartBeatMs.isBlank() && !heartBeatMs.equals("0")) { allTables = allTables + ",<schema>.cdc_heartbeat"; props.setProperty("heartbeat.interval.ms", heartBeatMs); props.setProperty( "heartbeat.action.query", "insert into <schema>.cdc_heartbeat VALUES (B'1')" .replace(SCHEMA_PLACEHOLDER, databaseSchema) ); } props.setProperty("name", "FooToBarCDC"); props.setProperty("plugin.name", "pgoutput"); props.setProperty("publication.autocreate.mode", "filtered"); setOptionalProperty(s -> props.setProperty("offset.storage", s), offsetStorage); setOptionalProperty(s -> props.setProperty("offset.flush.interval.ms", s), offsetFlushIntervalMs); setOptionalProperty(s -> props.setProperty("snapshot.mode", s), snapshotMode); setOptionalProperty(s -> props.setProperty("poll.interval.ms", s), pollIntervalMs); if (FileOffsetBackingStore.class.getName().equals(offsetStorage)) { props.setProperty("offset.storage.file.filename", offsetStorageFileFilename); } props.setProperty("database.hostname", databaseHostname); props.setProperty("database.port", databasePort); props.setProperty("database.user", databaseUser); props.setProperty("database.password", databasePassword); props.setProperty("database.dbname", databaseDbname); props.setProperty("database.server.name", "foo"); props.setProperty("table.include.list", allTables .replace(SCHEMA_PLACEHOLDER, databaseSchema)); props.setProperty("snapshot.include.collection.list", snapshotTables .replace(SCHEMA_PLACEHOLDER, databaseSchema)); props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); return props; } private void setOptionalProperty(Consumer<String> consumer, String property) { if (property != null && !property.isBlank()) { consumer.accept(property); } }
Okunan JSON verisini çevireceğimiz sınıf şöyle olsun
public class CdcEvent { private long id; private String table; }
Event handler şöyle olsun
@Component public class SingleEventHandler implements Consumer<ChangeEvent<String, String>> { private static final JsonPointer TABLE = JsonPointer.compile("/payload/source/table"); private static final JsonPointer PAYLOAD = JsonPointer.compile("/payload/after/id"); private ObjectMapper objectMapper = JacksonUtil.createObjectMapper(); private Map<String, DataExtractor> dataExtractors; @Override public void accept(final ChangeEvent<String, String> event) { var cdcEvent = parseEvent(event); ... } private CdcEvent parseEvent(final ChangeEvent<String, String> event) { if (event == null || event.value() == null) { return new CdcEvent(); } try { var json = objectMapper.readTree(event.value()); var cdcEvent = new CdcEvent().setId(json.at(PAYLOAD).asLong()) .setTable(json.at(TABLE).asText()); return cdcEvent; } catch (JsonProcessingException e) { throw new EventParsingException(e); } } }
Hiç yorum yok:
Yorum Gönder