29 Ocak 2021 Cuma

Kafka Avro API

Maven
Şu satırı dahil ederiz
<dependency>
   <groupId>org.apache.avro</groupId>
   <artifactId>avro</artifactId>
   <version>1.8.2</version>
</dependency>
<dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-avro-serializer</artifactId>
   <version>3.3.1</version>
</dependency>
Örnek - Producer KafkaAvroSerializer Kullanır
Şu satırı dahil ederiz
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
Şöyle yaparız
String TOPIC_NAME = "customer-topic";
String KAFKA_SERVER_ADDRESS = "localhost:9092";
String AVRO_SERIALIZER_CLASS = "io.confluent.kafka.serializers.KafkaAvroSerializer";
String SCHEMA_REGISTRY_SERVER_URL = "http://localhost:8081";
// Kafka Producer Configurations
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", KAFKA_SERVER_ADDRESS);
kafkaProps.put("key.serializer", AVRO_SERIALIZER_CLASS);
kafkaProps.put("value.serializer", AVRO_SERIALIZER_CLASS);
kafkaProps.put("schema.registry.url", SCHEMA_REGISTRY_SERVER_URL);
   
// Schema Generation For Our Customer Class
AvroMapper avroMapper = new AvroMapper();
AvroSchema schema = avroMapper.schemaFor(Customer.class);
try(final Producer<String, GenericRecord> producer = new KafkaProducer<>(kafkaProps);) {
   
  // Publishing The Messages
  Customer customer = CustomerGenerator.getNext();
  GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema.getAvroSchema());
  recordBuilder.set("name", customer.getName());
  recordBuilder.set("email", customer.getEmail());
  recordBuilder.set("contactNumber", customer.getContactNumber());
  GenericRecord genericRecord = recordBuilder.build();
    
  ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(TOPIC_NAME, 
      "customer", genericRecord);
  producer.send(producerRecord);
  
}
Örnek - Consumer
Şöyle yaparız
String TOPIC_NAME = "customer-topic";
String KAFKA_SERVER_ADDRESS = "localhost:9092";
String SCHEMA_REGISTRY_SERVER_URL = "http://localhost:8081";
String CONSUMER_GROUP_ID = "customer-message-consumer-group";

// Kafka Consumer Configurations
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_ADDRESS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
properties.put("schema.registry.url", SCHEMA_REGISTRY_SERVER_URL);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

try(final Consumer<String, Customer> consumer = new KafkaConsumer<>(properties)){
  consumer.subscribe(Collections.singleton(TOPIC_NAME));
  while (true) {
    ConsumerRecords<String, Customer> records = consumer.poll(Duration.ofMillis(100));
       
    for (final ConsumerRecord<String, Customer> record : records) {
      System.out.println(record.value());
    }
    consumer.commitAsync();
}

Hiç yorum yok:

Yorum Gönder