24 Kasım 2021 Çarşamba

Kafka Producer API

Serializer ve Deserializer Arayüzleri
Örnek
Elimizde şöyle bir kod olsun
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Serializer;

public class ProtMessageSerializer implements Serializer<ProtMessage>{
    @Override
    public byte[] serialize(String topic, ProtMessage data) {
        return data.toByteArray();
    }
}


import com.google.protobuf.InvalidProtocolBufferException;
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Deserializer;

public class ProtMessageDeserializer implements Deserializer<ProtMessage>{
    @Override
    public ProtMessage deserialize(String topic, byte[] data) {
        try {
            return ProtMessage.parseFrom(data);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            throw new RuntimeException("excepiton while parsing");
        }
    }
}
Producer tarafında şöyle yaparız
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;


public class MyKafkaProducerWithProtobufModel {

  public static void main(String[] args) {

   Properties props = new Properties();
   props.put("bootstrap.servers", "localhost:9092");
   props.put("linger.ms", 1);
   props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
   props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

   Producer<Integer, ProtMessage> producer = 
    new KafkaProducer<>(props, new IntegerSerializer(), new ProtMessageSerializer());
   for (int i = 1; i <= 10; i++){
     producer.send(new ProducerRecord<>("myFirstTopic", 0, i, 
      ProtMessage.newBuilder()
        .setId(i)
        .setName(i + "proto value")
        .build()));
   }
   producer.close();
  }
}
Consumer tarafında şöyle yaparız
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

public class MyKafkaConsumerWithProtobufModel {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "true");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", 
      "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", 
      "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<Integer, ProtMessage> consumer = 
      new KafkaConsumer<>(props, new IntegerDeserializer(),
        new ProtMessageDeserializer());
    consumer.subscribe(Arrays.asList("myFirstTopic"));

    while (true) {
      ConsumerRecords<Integer, ProtMessage> records = consumer
        .poll(Duration.ofMillis(100));
      for (ConsumerRecord<Integer, ProtMessage> record : records) {
        System.out.println("Received message: (" + record.key() +  ", " + 
          record.value().toString() + ") at offset " + record.offset());
      }
    }
  }
}


ProducerRecord Sınıfı
Şu satırı dahil ederiz
import org.apache.kafka.clients.producer.ProducerRecord;
constructor 
Örnek - topic + key + value
Şöyle yaparız
KafkaProducer<Integer, String> producer;

private KafkaProducer<Integer, String> getProducer() {
  if (producer == null) {
    Properties producerProps = new Properties();
    producerProps.setProperty("bootstrap.servers", ...);
    producerProps.setProperty("key.serializer", 
      IntegerSerializer.class.getCanonicalName());
    producerProps.setProperty("value.serializer", 
      StringSerializer.class.getCanonicalName());
    producer = new KafkaProducer<>(producerProps);
  }
  return producer;
}

public Future<RecordMetadata> produce(String topic, Integer key, String value) {
  return getProducer().send(new ProducerRecord<>(topic, key, value));
}
constructor - topic + partition + timestamp + key + value
Örnek
Şöyle yaparız
Future<RecordMetadata> produce(String topic, int partition, Long timestamp, 
  Integer key, String value) {
  return getProducer().send(new ProducerRecord<>(topic, 
    partition, timestamp, key, value));
}



Hiç yorum yok:

Yorum Gönder