18 Ocak 2021 Pazartesi

Kafka Streams API

Giriş
State kullanan bir örnek burada. Kodu burada.

Gradle
Şu satırı dahil ederiz
compile 'org.apache.kafka:kafka-streams:2.2.0'
compile 'org.slf4j:slf4j-simple:1.7.21'
Stream İçin Gereken Adımlar
Açıklaması şöyle
There are 3 major steps we will need to complete to have a running Streams app. 
1. We need to set various configuration parameters, 
2. build a topology by registering each of our streams, 
3. then create and run the Streams app.
Birinci adım yani Properties için şöyle yaparız
private final static String BOOTSTRAP_SERVERS = "localhost:29092";
private final static String APPLICATION_ID = "arctype-stream";

private static Properties makeProps() {
  final Properties props = new Properties();

  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);

  // You will learn more about SerDe's soon!
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

  props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);
  props.put(StreamsConfig.POLL_MS_CONFIG, 100);
  props.put(StreamsConfig.RETRIES_CONFIG, 100);
  return props;
}
İkinci adım için StreamsBuilder kullanılır.
Üçüncü adım için KafkaStreams kullanılır

Local state vs. external state
Açıklaması şöyle. KTable ve GlobalKTable farkı
Usually, stream processors keep this state locally for faster access. It is written to memory first, then eventually flushed out to a key/value store on the disk, such as RocksDB.

But in some cases, the state is stored in an external place like a database. Although it introduces additional latency, it works well for simple workloads and provides you good scalability.

StreamsBuilder Sınıfı
StreamsBuilder Sınıfı yazısına taşıdım

KafkaStreams Sınıfı
Şu satırı dahil ederiz. Bu sınıfın start() ve close() metodları çağrılarak stream'ler başlatılır
import org.apache.kafka.streams.KafkaStreams;
constructor - Topology + Properties
Örnek
Şöyle yaparız
Properties props = ...
StreamsBuilder builder = new StreamsBuilder();
...
KafkaStreams streams = new KafkaStreams(builder.build(), props);
Örnek
Şöyle yaparız
Properties props = ...;
Topology topology = ...;
KafkaStreams streams = new KafkaStreams(topology, props);
allMetadataForStore metodu
Örnek
Şöyle yaparız
KafkaStreams streams = ...;
// Find all the locations of local instances of the state store named "word-count"
Collection<StreamsMetadata> wordCountHosts = streams.allMetadataForStore("word-count");
Daha sonra bir StreamsMedata sorgulaması için şöyle yaparız
// Construct the (fictituous) full endpoint URL to query the current remote application instance
String url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice"; // Read and return the count for 'alice', if any. return http.getLong(url);
start metodu
Örnek
Şöyle yaparız
public static void main(String[] args) {
  Topology topology = ...;
  Properties streamConfig = ...;
  KafkaStreams kafkaStreams=new KafkaStreams(topology, streamConfig);
  kafkaStreams.start();

  Runtime.getRuntime().addShutdownHook(new Thread(()->{
    logger.info("Shutdown hook invoked... Application shutting down");
     kafkaStreams.close();
  }));

}
Örnek
Şöyle yaparız
public static void main(String[] args) throws Exception {
  Properties props = ...;
  Topology topology = ...;
  KafkaStreams streams = new KafkaStreams(topology, props);

  CountDownLatch latch = new CountDownLatch(1);
  Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run(){
      streams.close();
      latch.countDown();
    }
  });

  try {
    streams.start();
    latch.await();
  } catch (Throwable e) {
    System.exit(1);
  }
  System.exit(0);
}
KStream Sınıfı
KStream Sınıfı yazısına taşıdım

KGroupedStream Sınıfı
count metodu
Açıklaması şöyle
We can turn a stream into a table by aggregating the stream with operations such as COUNT() or SUM(), for example.
Örnek
Şöyle yaparız. value değerine göre gruplar. Grupları saymak için KGroupedStream.count() çağrılır. Bu çağrı bize bir KTable döndürür.
KStream<String, String> ks1 = ks0
.flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")));
KGroupedStream<String, String> ks2 = ks1.groupBy((k, v)->v);

KTable<String, Long> kt0=ks2.count();
aggregate metodu
Örnek - Materialized.with
Şöyle yaparız. Burada aggregation sonucu KTable nesnelerine yazılır.
KStream<String, String> metricsStream = ...;

KTable<String, MetricsCountAndSum> countAndSumAggregate = metricsStream.groupByKey()
  .aggregate(
    new Initializer<MetricsCountAndSum>() {
      @Override
      public MetricsCountAndSum apply() {
        return new MetricsCountAndSum(0L, 0L);
      }
    }, 
    new Aggregator<String, String, MetricsCountAndSum>() {
      @Override
      public MetricsCountAndSum apply(String key, String value,
MetricsCountAndSum current) {
        Long newCount = current.getCount() + 1;
        Long newSum = current.getSum() + Long.valueOf(value);
        MetricsCountAndSum metricsCountAndSum = new MetricsCountAndSum(newCount, newSum);
       
        return metricsCountAndSum;
      }
    },
    Materialized.with(Serdes.String(), new MetricsCountAndSumSerde()));
Örnek - Materialized.as
Şöyle yaparız. Burada aggregation sonucu KTable nesnelerine yazılır.
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;

@Component
public class QueryProcessor {
  public static final String ITEM_STORE_SUFFIX = "-items-store";

  Predicate<String, OrderedItem> isItemCheap = (k, v) -> v.getPrice() < 5;
  Predicate<String, OrderedItem> isItemAffordable = (k, v) -> v.getPrice() >= 5 && v.getPrice() < 50;
  Predicate<String, OrderedItem> isItemExpensive = (k, v) -> v.getPrice() > 50;

  @Bean
  public Function<KStream<String, ValidatedOrder>, KStream<String, OrderedItem>[]> itemProcessor() {
    return validatedOrdersStream -> {
      // group the ordered items by price
      KStream<String, OrderedItem>[] orderedItemsByPriceStream = validatedOrdersStream
        .map(ProcessorUtil::getItem)
        .branch(isItemCheap, isItemAffordable, isItemExpensive);

      // materialize the groups items into separate state stores.
      // Cheap items:
      orderedItemsByPriceStream[0].groupByKey().aggregate(
        ProcessorUtil::initializeItems,
        ProcessorUtil::aggregateItems,
        Materialized.as(Price.CHEAP.label + ITEM_STORE_SUFFIX));
      // Affordable items:
      orderedItemsByPriceStream[1].groupByKey().aggregate(
        ProcessorUtil::initializeItems,
        ProcessorUtil::aggregateItems,
        Materialized.as(Price.AFFORDABLE.label + ITEM_STORE_SUFFIX));
      // Expensive items:
      orderedItemsByPriceStream[2].groupByKey().aggregate(
        ProcessorUtil::initializeItems,
        ProcessorUtil::aggregateItems,
        Materialized.as(Price.EXPENSIVE.label + ITEM_STORE_SUFFIX));

      return orderedItemsByPriceStream;
  };
}
KTable Sınıfı
Açıklaması şöyle
A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key.

A record with a null as value represents a “DELETE” or tombstone for the record’s key.
Açıklaması şöyle
KTable uses the local state for the Kafka instance, so in this case if the one of the Kafka instance goes down, you will lose the data persisted to the local instance because a new Kafka instance is formed on new machine. 
toStream metodu
Şöyle yaparız
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Printed;

KTable<String, Long> kt0 = ...
kt0.toStream().print(Printed.toSysOut());
GlobalKTable Sınıfı
Açıklaması şöyle
In case of KTable every computer maintains the state of each event local to the computer. Sometimes solution requires the global dataset across all the tasks. which can be used by all the task. The GlobalKTable comes into the picture.

- Used for small dataset


Hiç yorum yok:

Yorum Gönder