Bu sınıf bir Topology nesnesi yaratır. Topology nesnesi KafkaStreams sınıfını yaratmak için gerekir.
Ayrıca bu sınıf stream() metodu sağlar. Böylece bir topic event'lerini işleyen KStream nesnesi yaratılır.
build metodu
Örnek
Şöyle yaparız
Topology createTopology() { StreamsBuilder builder = new StreamsBuilder(); // Add your streams here. TradeStream.build(builder); Topology topology = builder.build(); System.out.println(topology.describe()); return topology; }
Yeni stream ekleme şöyledir
public class TradeStream { private final static String TRADE_TOPIC = "ARCTYPE.public.trade"; public static void build(StreamsBuilder builder) { Serde<TradeModel> tradeModelSerde = SerdeFactory.createSerdeFor(TradeModel.class, true); Serde<String> idSerde = Serdes.serdeFrom(new IdSerializer(), new IdDeserializer()); KStream<String, TradeModel> tradeModelKStream = builder.stream(TRADE_TOPIC, Consumed.with(idSerde, tradeModelSerde)); tradeModelKStream.peek((key, value) -> { System.out.println(key.toString()); System.out.println(value.toString()); }); tradeModelKStream.map((id, trade) -> { TradeModel tradeDoubled = new TradeModel(); tradeDoubled.price = trade.price * 2; tradeDoubled.quantity = trade.quantity; tradeDoubled.ticker = trade.ticker; return new KeyValue<>(id, tradeDoubled); }).to("ARCTYPE.doubled-trades", Produced.with(idSerde, tradeModelSerde)); } }
public class IdDeserializer implements Deserializer<String> { private ObjectMapper objectMapper = new ObjectMapper(); @Override public void configure(Map<String, ?> props, boolean isKey) { } @Override public void close() { } @Override public String deserialize(String topic, byte[] bytes) { if (bytes == null) return null; String id; try { Map payload = objectMapper.readValue(new String(bytes), Map.class); id = String.valueOf(payload.get("id")); } catch (Exception e) { throw new SerializationException(e); } return id; } }
public class SerdeFactory { public static <T> Serde<T> createSerdeFor(Class<T> clazz, boolean isKey) { Map<String, Object> serdeProps = new HashMap<>(); serdeProps.put("Class", clazz); Serializer<T> ser = new JsonSerializer<>(); ser.configure(serdeProps, isKey); Deserializer<T> de = new JsonDeserializer<>(); de.configure(serdeProps, isKey); return Serdes.serdeFrom(ser, de); } }
JsonDeserializer şöyle
public class JsonDeserializer<T> implements Deserializer<T> { private ObjectMapper objectMapper = new ObjectMapper(); private Class<T> clazz; @Override public void configure(Map<String, ?> props, boolean isKey) { clazz = (Class<T>) props.get("Class"); } @Override public void close() { } @Override public T deserialize(String topic, byte[] bytes) { if (bytes == null) return null; T data; Map payload; try { payload = objectMapper.readValue(new String(bytes), Map.class); // Debezium updates will contain a key "after" with the latest row contents. Map afterMap = (Map) payload.get("after"); if (afterMap == null) { // Non-Debezium payloads data = objectMapper.readValue(objectMapper.writeValueAsBytes(payload), clazz); } else { // Incoming from Debezium data = objectMapper.readValue(objectMapper.writeValueAsBytes(afterMap), clazz); } } catch (Exception e) { throw new SerializationException(e); } return data; } }
Value nesnes şöyle
@JsonIgnoreProperties(ignoreUnknown = true) public class TradeModel { public Integer id; public String ticker; public Integer price; public Integer quantity; }
stream metodu
Örnek
Şöyle yaparız
StreamsBuilder streamsBuilder = new StreamsBuilder();KStream<String,String> ks0 = streamsBuilder.stream(IAppConfigs.ORDER_RECEIVED_TOPIC);...Topology topology = streamsBuilder.build();Properties streamConfig = ...;KafkaStreams kafkaStreams = new KafkaStreams(topology, streamConfig);kafkaStreams.start();