Serializer ve Deserializer Arayüzleri
ÖrnekElimizde şöyle bir kod olsun
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Serializer;
public class ProtMessageSerializer implements Serializer<ProtMessage>{
@Override
public byte[] serialize(String topic, ProtMessage data) {
return data.toByteArray();
}
}
import com.google.protobuf.InvalidProtocolBufferException;
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Deserializer;
public class ProtMessageDeserializer implements Deserializer<ProtMessage>{
@Override
public ProtMessage deserialize(String topic, byte[] data) {
try {
return ProtMessage.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
throw new RuntimeException("excepiton while parsing");
}
}
}
Producer tarafında şöyle yaparız
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
public class MyKafkaProducerWithProtobufModel {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("linger.ms", 1);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<Integer, ProtMessage> producer =
new KafkaProducer<>(props, new IntegerSerializer(), new ProtMessageSerializer());
for (int i = 1; i <= 10; i++){
producer.send(new ProducerRecord<>("myFirstTopic", 0, i,
ProtMessage.newBuilder()
.setId(i)
.setName(i + "proto value")
.build()));
}
producer.close();
}
}
Consumer tarafında şöyle yaparız
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
public class MyKafkaConsumerWithProtobufModel {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Integer, ProtMessage> consumer =
new KafkaConsumer<>(props, new IntegerDeserializer(),
new ProtMessageDeserializer());
consumer.subscribe(Arrays.asList("myFirstTopic"));
while (true) {
ConsumerRecords<Integer, ProtMessage> records = consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, ProtMessage> record : records) {
System.out.println("Received message: (" + record.key() + ", " +
record.value().toString() + ") at offset " + record.offset());
}
}
}
}
ProducerRecord Sınıfı
Şu satırı dahil ederiz
import org.apache.kafka.clients.producer.ProducerRecord;
constructor
Örnek - topic + key + value
Şöyle yaparız
KafkaProducer<Integer, String> producer;
private KafkaProducer<Integer, String> getProducer() {
if (producer == null) {
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", ...);
producerProps.setProperty("key.serializer",
IntegerSerializer.class.getCanonicalName());
producerProps.setProperty("value.serializer",
StringSerializer.class.getCanonicalName());
producer = new KafkaProducer<>(producerProps);
}
return producer;
}
public Future<RecordMetadata> produce(String topic, Integer key, String value) {
return getProducer().send(new ProducerRecord<>(topic, key, value));
}
constructor - topic + partition + timestamp + key + value
Örnek
Şöyle yaparız
Future<RecordMetadata> produce(String topic, int partition, Long timestamp,
Integer key, String value) {
return getProducer().send(new ProducerRecord<>(topic,
partition, timestamp, key, value));
}
Hiç yorum yok:
Yorum Gönder