Maven
Şu satırı dahil ederiz
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>3.3.1</version> </dependency>
Şu satırı dahil ederiz
Şöyle yaparızimport org.apache.avro.generic.GenericRecord;import org.apache.avro.generic.GenericRecordBuilder;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import com.fasterxml.jackson.databind.JsonMappingException;import com.fasterxml.jackson.dataformat.avro.AvroMapper;import com.fasterxml.jackson.dataformat.avro.AvroSchema;
String TOPIC_NAME = "customer-topic";String KAFKA_SERVER_ADDRESS = "localhost:9092";String AVRO_SERIALIZER_CLASS = "io.confluent.kafka.serializers.KafkaAvroSerializer";String SCHEMA_REGISTRY_SERVER_URL = "http://localhost:8081";// Kafka Producer ConfigurationsProperties kafkaProps = new Properties();kafkaProps.put("bootstrap.servers", KAFKA_SERVER_ADDRESS);kafkaProps.put("key.serializer", AVRO_SERIALIZER_CLASS);kafkaProps.put("value.serializer", AVRO_SERIALIZER_CLASS);kafkaProps.put("schema.registry.url", SCHEMA_REGISTRY_SERVER_URL);// Schema Generation For Our Customer ClassAvroMapper avroMapper = new AvroMapper();AvroSchema schema = avroMapper.schemaFor(Customer.class);try(final Producer<String, GenericRecord> producer = new KafkaProducer<>(kafkaProps);) {// Publishing The MessagesCustomer customer = CustomerGenerator.getNext();GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema.getAvroSchema());recordBuilder.set("name", customer.getName());recordBuilder.set("email", customer.getEmail());recordBuilder.set("contactNumber", customer.getContactNumber());GenericRecord genericRecord = recordBuilder.build();ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(TOPIC_NAME,"customer", genericRecord);producer.send(producerRecord);}
Örnek - Consumer
Şöyle yaparız
String TOPIC_NAME = "customer-topic";String KAFKA_SERVER_ADDRESS = "localhost:9092";String SCHEMA_REGISTRY_SERVER_URL = "http://localhost:8081";String CONSUMER_GROUP_ID = "customer-message-consumer-group";// Kafka Consumer ConfigurationsProperties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_ADDRESS);properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);properties.put("schema.registry.url", SCHEMA_REGISTRY_SERVER_URL);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);try(final Consumer<String, Customer> consumer = new KafkaConsumer<>(properties)){consumer.subscribe(Collections.singleton(TOPIC_NAME));while (true) {ConsumerRecords<String, Customer> records = consumer.poll(Duration.ofMillis(100));for (final ConsumerRecord<String, Customer> record : records) {System.out.println(record.value());}consumer.commitAsync();}