6 Mart 2023 Pazartesi

Kafka Streams API KafkaStreams Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.KafkaStreams;
Belirtilen topology nesnesini çalıştırır

start metodu
Örnek
Şöyle yaparız
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
prop.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());
prop.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/test");

StreamsBuilder builder = new StreamsBuilder();
...
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), prop);
kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

Hiç yorum yok:

Yorum Gönder