Giriş
Gradle
Şu satırı dahil ederiz
compile 'org.apache.kafka:kafka-streams:2.2.0'compile 'org.slf4j:slf4j-simple:1.7.21'
Stream İçin Gereken Adımlar
Açıklaması şöyle
There are 3 major steps we will need to complete to have a running Streams app.1. We need to set various configuration parameters,2. build a topology by registering each of our streams,3. then create and run the Streams app.
Birinci adım yani Properties için şöyle yaparız
private final static String BOOTSTRAP_SERVERS = "localhost:29092"; private final static String APPLICATION_ID = "arctype-stream"; private static Properties makeProps() { final Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); // You will learn more about SerDe's soon! props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); props.put(StreamsConfig.POLL_MS_CONFIG, 100); props.put(StreamsConfig.RETRIES_CONFIG, 100); return props; }
İkinci adım için StreamsBuilder kullanılır.
Üçüncü adım için KafkaStreams kullanılır
Local state vs. external state
Açıklaması şöyle. KTable ve GlobalKTable farkı
Usually, stream processors keep this state locally for faster access. It is written to memory first, then eventually flushed out to a key/value store on the disk, such as RocksDB.But in some cases, the state is stored in an external place like a database. Although it introduces additional latency, it works well for simple workloads and provides you good scalability.
StreamsBuilder Sınıfı
StreamsBuilder Sınıfı yazısına taşıdım
Şu satırı dahil ederiz. Bu sınıfın start() ve close() metodları çağrılarak stream'ler başlatılır
import org.apache.kafka.streams.KafkaStreams;
constructor - Topology + Properties
Örnek
Şöyle yaparız
Properties props = ...StreamsBuilder builder = new StreamsBuilder();...KafkaStreams streams = new KafkaStreams(builder.build(), props);
Örnek
Şöyle yaparız
Properties props = ...; Topology topology = ...; KafkaStreams streams = new KafkaStreams(topology, props);
allMetadataForStore metodu
Örnek
Şöyle yaparız
KafkaStreams streams = ...;// Find all the locations of local instances of the state store named "word-count"Collection<StreamsMetadata> wordCountHosts = streams.allMetadataForStore("word-count");
Daha sonra bir StreamsMedata sorgulaması için şöyle yaparız
// Construct the (fictituous) full endpoint URL to query the current remote application instanceString url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice"; // Read and return the count for 'alice', if any. return http.getLong(url);
start metodu
Örnek
Şöyle yaparız
public static void main(String[] args) {Topology topology = ...;Properties streamConfig = ...;KafkaStreams kafkaStreams=new KafkaStreams(topology, streamConfig);kafkaStreams.start();Runtime.getRuntime().addShutdownHook(new Thread(()->{logger.info("Shutdown hook invoked... Application shutting down");kafkaStreams.close();}));}
Örnek
Şöyle yaparız
public static void main(String[] args) throws Exception { Properties props = ...; Topology topology = ...; KafkaStreams streams = new KafkaStreams(topology, props); CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run(){ streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); }
KStream Sınıfı
KStream Sınıfı yazısına taşıdım
KGroupedStream Sınıfı
count metodu
Açıklaması şöyle
We can turn a stream into a table by aggregating the stream with operations such as COUNT() or SUM(), for example.
Örnek
Şöyle yaparız. value değerine göre gruplar. Grupları saymak için KGroupedStream.count() çağrılır. Bu çağrı bize bir KTable döndürür.
KStream<String, String> ks1 = ks0
.flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")));KGroupedStream<String, String> ks2 = ks1.groupBy((k, v)->v);
KTable<String, Long> kt0=ks2.count();
aggregate metodu
Örnek - Materialized.with
Şöyle yaparız. Burada aggregation sonucu KTable nesnelerine yazılır.
KStream<String, String> metricsStream = ...;KTable<String, MetricsCountAndSum> countAndSumAggregate = metricsStream.groupByKey().aggregate(new Initializer<MetricsCountAndSum>() {@Overridepublic MetricsCountAndSum apply() {return new MetricsCountAndSum(0L, 0L);}},new Aggregator<String, String, MetricsCountAndSum>() {@Overridepublic MetricsCountAndSum apply(String key, String value,
MetricsCountAndSum current) {Long newCount = current.getCount() + 1;Long newSum = current.getSum() + Long.valueOf(value);MetricsCountAndSum metricsCountAndSum = new MetricsCountAndSum(newCount, newSum);return metricsCountAndSum;}},Materialized.with(Serdes.String(), new MetricsCountAndSumSerde()));
Örnek - Materialized.as
Şöyle yaparız. Burada aggregation sonucu KTable nesnelerine yazılır.
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; @Component public class QueryProcessor { public static final String ITEM_STORE_SUFFIX = "-items-store"; Predicate<String, OrderedItem> isItemCheap = (k, v) -> v.getPrice() < 5; Predicate<String, OrderedItem> isItemAffordable = (k, v) -> v.getPrice() >= 5 && v.getPrice() < 50; Predicate<String, OrderedItem> isItemExpensive = (k, v) -> v.getPrice() > 50; @Bean public Function<KStream<String, ValidatedOrder>, KStream<String, OrderedItem>[]> itemProcessor() { return validatedOrdersStream -> { // group the ordered items by price KStream<String, OrderedItem>[] orderedItemsByPriceStream = validatedOrdersStream .map(ProcessorUtil::getItem) .branch(isItemCheap, isItemAffordable, isItemExpensive); // materialize the groups items into separate state stores. // Cheap items: orderedItemsByPriceStream[0].groupByKey().aggregate( ProcessorUtil::initializeItems, ProcessorUtil::aggregateItems, Materialized.as(Price.CHEAP.label + ITEM_STORE_SUFFIX)); // Affordable items: orderedItemsByPriceStream[1].groupByKey().aggregate( ProcessorUtil::initializeItems, ProcessorUtil::aggregateItems, Materialized.as(Price.AFFORDABLE.label + ITEM_STORE_SUFFIX)); // Expensive items: orderedItemsByPriceStream[2].groupByKey().aggregate( ProcessorUtil::initializeItems, ProcessorUtil::aggregateItems, Materialized.as(Price.EXPENSIVE.label + ITEM_STORE_SUFFIX)); return orderedItemsByPriceStream; }; }
KTable Sınıfı
Açıklaması şöyle
A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key.A record with a null as value represents a “DELETE” or tombstone for the record’s key.
Açıklaması şöyle
KTable uses the local state for the Kafka instance, so in this case if the one of the Kafka instance goes down, you will lose the data persisted to the local instance because a new Kafka instance is formed on new machine.
toStream metodu
Şöyle yaparız
GlobalKTable Sınıfıimport org.apache.kafka.streams.kstream.KTable;import org.apache.kafka.streams.kstream.Printed;KTable<String, Long> kt0 = ...kt0.toStream().print(Printed.toSysOut());
Açıklaması şöyle
In case of KTable every computer maintains the state of each event local to the computer. Sometimes solution requires the global dataset across all the tasks. which can be used by all the task. The GlobalKTable comes into the picture.- Used for small dataset
Hiç yorum yok:
Yorum Gönder