14 Ekim 2022 Cuma

Debezium Kullanımı

Giriş
Embedded bir engine yaratmak için DebeziumEngine nesnesi yaratılır ve bu nesne bir ExecutorService ile çalıştırılır. DebeziumEngine.notifying() ile de bir listener takılır

Propertyler
database.hostname
database.port
database.user
database.password
database.server.name
table.include.list
connector.class

Örnek - MySQL
Şöyle yaparız
// Define the configuration for the Debezium Engine with MySQL connector...
Properties props = config.asProperties(); props.setProperty("name", "engine"); props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat"); props.setProperty("offset.flush.interval.ms", "60000"); /* begin connector properties */ props.setProperty("database.hostname", "localhost"); props.setProperty("database.port", "3306"); props.setProperty("database.user", "mysqluser"); props.setProperty("database.password", "mysqlpw"); props.setProperty("database.server.id", "85744"); props.setProperty("database.server.name", "my-app-connector"); props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory"); props.setProperty("database.history.file.filename", "/path/to/storage/dbhistory.dat"); // Create the engine with this configuration ... try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class) .using(props) .notifying(record -> { System.out.println(record); }).build() ) { // Run the engine asynchronously ... ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); // Do something else or wait for a signal or an event } // Engine is stopped when the main code is finished
Örnek - PostgreSQL
Şöyle yaparız
@Component
public class DebeziumConnector {

  private static final String SCHEMA_PLACEHOLDER = "<schema>";

  private final Logger log = LoggerFactory.getLogger(this.getClass());

  private ExecutorService executor = Executors.newSingleThreadExecutor();

  private DebeziumEngine<ChangeEvent<String, String>> engine;

  private String databaseHostname;

  private String databaseSchema;

  private String offsetFlushIntervalMs;

  private String heartBeatMs;

  private String pollIntervalMs;
  

  @PreDestroy
  public void shutdown() throws IOException {
    log.info("Shutting down the CDC engine");
    log.info("Please wait while the current position in the offset log is stored!");
    engine.close();
    executor.shutdown();
  }

  @PostConstruct
  public void start() {
    log.info("Starting the CDC engine");

    var properties = getDebeziumProperties();
    var offsetPolicy = offsetFlushIntervalMs.equals("0") ? 
      OffsetCommitPolicy.always() : OffsetCommitPolicy.periodic(properties);

    this.engine = DebeziumEngine.create(Json.class)
      .using(properties)
      .using(offsetPolicy)
      .notifying(eventHandler)
      .build();

    executor.execute(engine);
  }
}
Properties şöyle olsun
private Properties getDebeziumProperties() {
  var props = new Properties();
  var snapshotTables = "<schema>.table_one,<schema>.table_two,<schema>.table_three";

  var allTables = snapshotTables + ",<schema>.table_without_snapshot_need";

  if (!heartBeatMs.isBlank() && !heartBeatMs.equals("0")) {
    allTables = allTables + ",<schema>.cdc_heartbeat";
    props.setProperty("heartbeat.interval.ms", heartBeatMs);
    props.setProperty(
      "heartbeat.action.query",
      "insert into <schema>.cdc_heartbeat VALUES (B'1')"
          .replace(SCHEMA_PLACEHOLDER, databaseSchema)
    );
  }

  props.setProperty("name", "FooToBarCDC");
  props.setProperty("plugin.name", "pgoutput");
  props.setProperty("publication.autocreate.mode", "filtered");
  setOptionalProperty(s -> props.setProperty("offset.storage", s), offsetStorage);
  setOptionalProperty(s -> props.setProperty("offset.flush.interval.ms", s),
      offsetFlushIntervalMs);
  setOptionalProperty(s -> props.setProperty("snapshot.mode", s), snapshotMode);
  setOptionalProperty(s -> props.setProperty("poll.interval.ms", s), pollIntervalMs);

  if (FileOffsetBackingStore.class.getName().equals(offsetStorage)) {
      props.setProperty("offset.storage.file.filename", offsetStorageFileFilename);
  }

  props.setProperty("database.hostname", databaseHostname);
  props.setProperty("database.port", databasePort);
  props.setProperty("database.user", databaseUser);
  props.setProperty("database.password", databasePassword);
  props.setProperty("database.dbname", databaseDbname);
  props.setProperty("database.server.name", "foo");
  props.setProperty("table.include.list", allTables
      .replace(SCHEMA_PLACEHOLDER, databaseSchema));
  props.setProperty("snapshot.include.collection.list", snapshotTables
      .replace(SCHEMA_PLACEHOLDER, databaseSchema));
  props.setProperty("connector.class", 
      "io.debezium.connector.postgresql.PostgresConnector");        

  return props;
}

private void setOptionalProperty(Consumer<String> consumer, String property) {
  if (property != null && !property.isBlank()) {
    consumer.accept(property);
  }
}
Okunan JSON verisini çevireceğimiz sınıf şöyle olsun
public class CdcEvent {

  private long id;

  private String table;

}
Event handler şöyle olsun
@Component
public class SingleEventHandler implements 
  Consumer<ChangeEvent<String, String>> {

  private static final JsonPointer TABLE = JsonPointer.compile("/payload/source/table");

  private static final JsonPointer PAYLOAD = JsonPointer.compile("/payload/after/id");

  private ObjectMapper objectMapper = JacksonUtil.createObjectMapper();

  private Map<String, DataExtractor> dataExtractors;

  @Override
  public void accept(final ChangeEvent<String, String> event) {
    var cdcEvent = parseEvent(event);
    ...
  }
   

  private CdcEvent parseEvent(final ChangeEvent<String, String> event) {
    if (event == null || event.value() == null) {
      return new CdcEvent();
    }

    try {
      var json = objectMapper.readTree(event.value());
      var cdcEvent = new CdcEvent().setId(json.at(PAYLOAD).asLong())
                                   .setTable(json.at(TABLE).asText());
      return cdcEvent;
    } catch (JsonProcessingException e) {
      throw new EventParsingException(e);
    }
  }
}




Hiç yorum yok:

Yorum Gönder