Giriş
A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key.A record with a null as value represents a “DELETE” or tombstone for the record’s key.
Açıklaması şöyle
KTable uses the local state for the Kafka instance, so in this case if the one of the Kafka instance goes down, you will lose the data persisted to the local instance because a new Kafka instance is formed on new machine.
join metodu
Örnek - KTable + KTable
Şöyle yaparız
// Key=null,Value=10:00:00AM_GMT KTable<String, String> table1 = builder.table("kafka-left-topic"); // Key=10:00:00AM, Value=task_1_completed KTable<String, String> table2 = builder.table("kafka-right-topic"); KTable<String, String> joinedTable = table1.join(table2, (value1, value2) -> value1 + "," + value2 ); joinedTable.toStream().foreach((key, value) -> System.out.println(key + ": " + value));
Örnek - KTable + KeyValueMapper
Şöyle yaparız
KeyValueMapper<String, String, String> foreignKeyExtractor = (value1, value2) -> value1.split("_")[0]; KTable<String, String> joinedTable = table1.join(table2,foreignKeyExtractor, (value1, value2) -> value1 + "," + value2 );
Örnek - KStream + KTable
Şöyle yaparız
KStream<String, GenericRecord> ordersStream = ... KTable<String, GenericRecord> customersTable = .. // Create a foreign key extractor to extract the customer_id from the orders stream KeyValueMapper<GenericRecord, GenericRecord, String> foreignKeyExtractor = (order, customer) -> order.get("customer_id").toString(); // Perform the join operation between the orders stream and the customers table KStream<String, EnrichedOrder> enrichedOrdersStream = ordersStream .leftJoin(customersTable, (order, customer) -> new EnrichedOrder(order, customer)) .selectKey(foreignKeyExtractor);
toStream metodu
KTable nesnesini yeni bir topic'e yazar
Örnek
Şöyle yaparız
import org.apache.kafka.streams.kstream.Printed;KTable<String, Long> kt0 = ...kt0.toStream().print(Printed.toSysOut());
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("streams-plaintext-input"); KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) .groupBy((key, value) -> value) .count(); wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
Hiç yorum yok:
Yorum Gönder