6 Mart 2023 Pazartesi

Kafka Streams API KTable Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.kstream.KTable;
Açıklaması şöyle
A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key.

A record with a null as value represents a “DELETE” or tombstone for the record’s key.
Açıklaması şöyle
KTable uses the local state for the Kafka instance, so in this case if the one of the Kafka instance goes down, you will lose the data persisted to the local instance because a new Kafka instance is formed on new machine. 
join metodu
Örnek - KTable KTable
Şöyle yaparız
// Key=null,Value=10:00:00AM_GMT
KTable<String, String> table1 = builder.table("kafka-left-topic");

// Key=10:00:00AM, Value=task_1_completed
KTable<String, String> table2 = builder.table("kafka-right-topic");

KTable<String, String> joinedTable = table1.join(table2,
  (value1, value2) -> value1 + "," + value2
);

joinedTable.toStream().foreach((key, value) -> System.out.println(key + ": " + value));
Örnek - KTable + KeyValueMapper
Şöyle yaparız
KeyValueMapper<String, String, String> foreignKeyExtractor =
    (value1, value2) -> value1.split("_")[0];

KTable<String, String> joinedTable = table1.join(table2,foreignKeyExtractor,
  (value1, value2) -> value1 + "," + value2
);
Örnek - KStream + KTable
Şöyle yaparız
KStream<String, GenericRecord> ordersStream = ...
KTable<String, GenericRecord> customersTable = ..

// Create a foreign key extractor to extract the customer_id from the orders stream
KeyValueMapper<GenericRecord, GenericRecord, String> foreignKeyExtractor =
    (order, customer) -> order.get("customer_id").toString();

// Perform the join operation between the orders stream and the customers table
KStream<String, EnrichedOrder> enrichedOrdersStream = ordersStream
    .leftJoin(customersTable, (order, customer) -> new EnrichedOrder(order, customer))
    .selectKey(foreignKeyExtractor);
toStream metodu
KTable nesnesini yeni bir topic'e yazar
Örnek
Şöyle yaparız
import org.apache.kafka.streams.kstream.Printed;

KTable<String, Long> kt0 = ...
kt0.toStream().print(Printed.toSysOut());
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("streams-plaintext-input");
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> 
  Arrays.asList(pattern.split(value.toLowerCase())))
  .groupBy((key, value) -> value)
  .count();
  wordCounts.toStream().to("streams-wordcount-output", 
    Produced.with(Serdes.String(), Serdes.Long()));

Hiç yorum yok:

Yorum Gönder