30 Kasım 2021 Salı

Kafka Streams API StreamsBuilder Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.StreamsBuilder;
Bu sınıf bir Topology nesnesi yaratır. Topology nesnesi KafkaStreams sınıfını yaratmak için gerekir.
Ayrıca bu sınıf stream() metodu sağlar. Böylece bir topic event'lerini işleyen KStream nesnesi yaratılır.
build metodu
Örnek
Şöyle yaparız
Topology createTopology() {
  StreamsBuilder builder = new StreamsBuilder();
  // Add your streams here.
  TradeStream.build(builder);
  Topology topology = builder.build();
  System.out.println(topology.describe());
  return topology;
}
Yeni stream ekleme şöyledir
public class TradeStream {
  private final static String TRADE_TOPIC = "ARCTYPE.public.trade";

  public static void build(StreamsBuilder builder) {
    Serde<TradeModel> tradeModelSerde = SerdeFactory.createSerdeFor(TradeModel.class,
      true);
    Serde<String> idSerde = Serdes.serdeFrom(new IdSerializer(), new IdDeserializer());

    KStream<String, TradeModel> tradeModelKStream =
      builder.stream(TRADE_TOPIC, Consumed.with(idSerde, tradeModelSerde));

    tradeModelKStream.peek((key, value) -> {
      System.out.println(key.toString());
      System.out.println(value.toString());
    });
    tradeModelKStream.map((id, trade) -> {
      TradeModel tradeDoubled = new TradeModel();
      tradeDoubled.price = trade.price * 2;
      tradeDoubled.quantity = trade.quantity;
      tradeDoubled.ticker = trade.ticker;
      return new KeyValue<>(id, tradeDoubled);
    }).to("ARCTYPE.doubled-trades", Produced.with(idSerde, tradeModelSerde));
  }
}
Key SerDe için deserializer şöyledir. Serializer da benzer şekilde yazılır
public class IdDeserializer implements Deserializer<String> {
  private ObjectMapper objectMapper = new ObjectMapper();

  @Override
  public void configure(Map<String, ?> props, boolean isKey) { }

  @Override
  public void close() { }

  @Override
  public String deserialize(String topic, byte[] bytes) {
    if (bytes == null)
      return null;

    String id;
    try {
      Map payload = objectMapper.readValue(new String(bytes), Map.class);
      id = String.valueOf(payload.get("id"));
    } catch (Exception e) {
      throw new SerializationException(e);
    }
    return id;
  }
}
Value SerDe için SerdeFactory sınıfı şöyledir
public class SerdeFactory {
  public static <T> Serde<T> createSerdeFor(Class<T> clazz, boolean isKey) {
    Map<String, Object> serdeProps = new HashMap<>();
    serdeProps.put("Class", clazz);

    Serializer<T> ser = new JsonSerializer<>();
    ser.configure(serdeProps, isKey);

    Deserializer<T> de = new JsonDeserializer<>();
    de.configure(serdeProps, isKey);

    return Serdes.serdeFrom(ser, de);
  }
}
JsonDeserializer şöyle
public class JsonDeserializer<T> implements Deserializer<T> {
  private ObjectMapper objectMapper = new ObjectMapper();
  private Class<T> clazz;

  @Override
  public void configure(Map<String, ?> props, boolean isKey) {
    clazz = (Class<T>) props.get("Class");
  }
  @Override
  public void close() { }
  @Override
  public T deserialize(String topic, byte[] bytes) {
    if (bytes == null)
      return null;

    T data;
    Map payload;
    try {
      payload = objectMapper.readValue(new String(bytes), Map.class);
      // Debezium updates will contain a key "after" with the latest row contents.
      Map afterMap = (Map) payload.get("after");
      if (afterMap == null) {
        // Non-Debezium payloads
        data = objectMapper.readValue(objectMapper.writeValueAsBytes(payload), clazz);
      } else {
        // Incoming from Debezium
        data = objectMapper.readValue(objectMapper.writeValueAsBytes(afterMap), clazz);
      }

    } catch (Exception e) {
      throw new SerializationException(e);
    }
    return data;
  }
}
Value nesnes şöyle
@JsonIgnoreProperties(ignoreUnknown = true)
public class TradeModel {
    public Integer id;
    public String ticker;
    public Integer price;
    public Integer quantity;
}
stream metodu
Örnek
Şöyle yaparız
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String,String> ks0 = streamsBuilder.stream(IAppConfigs.ORDER_RECEIVED_TOPIC);
...
Topology topology = streamsBuilder.build();
Properties streamConfig = ...;
KafkaStreams kafkaStreams = new KafkaStreams(topology, streamConfig);
kafkaStreams.start();

Hiç yorum yok:

Yorum Gönder