Kafka etiketine sahip kayıtlar gösteriliyor. Tüm kayıtları göster
Kafka etiketine sahip kayıtlar gösteriliyor. Tüm kayıtları göster

6 Mart 2023 Pazartesi

Kafka Streams API KTable Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.kstream.KTable;
Açıklaması şöyle
A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key.

A record with a null as value represents a “DELETE” or tombstone for the record’s key.
Açıklaması şöyle
KTable uses the local state for the Kafka instance, so in this case if the one of the Kafka instance goes down, you will lose the data persisted to the local instance because a new Kafka instance is formed on new machine. 
join metodu
Örnek - KTable KTable
Şöyle yaparız
// Key=null,Value=10:00:00AM_GMT
KTable<String, String> table1 = builder.table("kafka-left-topic");

// Key=10:00:00AM, Value=task_1_completed
KTable<String, String> table2 = builder.table("kafka-right-topic");

KTable<String, String> joinedTable = table1.join(table2,
  (value1, value2) -> value1 + "," + value2
);

joinedTable.toStream().foreach((key, value) -> System.out.println(key + ": " + value));
Örnek - KTable + KeyValueMapper
Şöyle yaparız
KeyValueMapper<String, String, String> foreignKeyExtractor =
    (value1, value2) -> value1.split("_")[0];

KTable<String, String> joinedTable = table1.join(table2,foreignKeyExtractor,
  (value1, value2) -> value1 + "," + value2
);
Örnek - KStream + KTable
Şöyle yaparız
KStream<String, GenericRecord> ordersStream = ...
KTable<String, GenericRecord> customersTable = ..

// Create a foreign key extractor to extract the customer_id from the orders stream
KeyValueMapper<GenericRecord, GenericRecord, String> foreignKeyExtractor =
    (order, customer) -> order.get("customer_id").toString();

// Perform the join operation between the orders stream and the customers table
KStream<String, EnrichedOrder> enrichedOrdersStream = ordersStream
    .leftJoin(customersTable, (order, customer) -> new EnrichedOrder(order, customer))
    .selectKey(foreignKeyExtractor);
toStream metodu
KTable nesnesini yeni bir topic'e yazar
Örnek
Şöyle yaparız
import org.apache.kafka.streams.kstream.Printed;

KTable<String, Long> kt0 = ...
kt0.toStream().print(Printed.toSysOut());
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("streams-plaintext-input");
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> 
  Arrays.asList(pattern.split(value.toLowerCase())))
  .groupBy((key, value) -> value)
  .count();
  wordCounts.toStream().to("streams-wordcount-output", 
    Produced.with(Serdes.String(), Serdes.Long()));

Kafka Streams API KafkaStreams Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.KafkaStreams;
Belirtilen topology nesnesini çalıştırır

start metodu
Örnek
Şöyle yaparız
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
prop.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());
prop.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/test");

StreamsBuilder builder = new StreamsBuilder();
...
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), prop);
kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

22 Şubat 2023 Çarşamba

Kafka Connect API

Kendi Kodumdan Kafka Connect Task Kullanmak - Bunu neden yapmak isteyeyim? 
Örneğin bir başka harici kaynağa erişmek için kod yazmak ve idame ettirmek istemiyorum. Hazır Kafka Connect bunu yapıyor benim için. Aradaki tek fark Kafka Connect Source veriyi Kafka topic içiner yazarken ben kendim bir şey yapmak istiyorum

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>connect-api</artifactId>
  <version>2.8.2</version>
</dependency>
Gradle
Şöyle yaparız
implementation group: 'org.apache.kafka', name: 'connect-api', version: '...'
Örnek
Şöyle yaparız
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;

public class MyKafkaConnectSource<T> {

  private final SourceConnector connector;
  private final SourceTask task;
  private final Map<String, String> taskConfig;


  public void process() {
    if (!taskInit) {
      task.initialize(...);
      task.start(taskConfig);
      taskInit = true;
    }
    try {
      List<SourceRecord> records = task.poll();
      if (records == null) {
        return;
      }

      for (SourceRecord record : records) {
   ...
}
    } catch (InterruptedException e) {
      ...
    }
  }
}

30 Kasım 2022 Çarşamba

Confluent SchemaRegistryRestApplication Sınıfı

Giriş
Şu satırı dahil ederiz
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>6.2.2</version>
  <scope>test</scope>
</dependency>
constructor
Şöyle yaparız
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;

Properties properties = new Properties();
//properties.put("listeners", "http://0.0.0.0:0");
properties.put(SchemaRegistryConfig.KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG, ...);
properties.put(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG, "5000");
SchemaRegistryConfig config = new SchemaRegistryConfig(properties); SchemaRegistryRestApplication schemaRegistryApplication = new SchemaRegistryRestApplication(config); schemaRegistry = schemaRegistryApplication.createServer(); schemaRegistry.start();
Unit Test
 schemaregistry-junit kullanılabilir

Örnek - JUnit 5
Şöyle yaparız
@RegisterExtension
@Order(1)
static final SharedKafkaTestResource kafka = new
  SharedKafkaTestResource().withBrokers(1);
@RegisterExtension
@Order(2)
static final SharedSchemaRegistryTestResource schemaRegistry =
    new SharedSchemaRegistryTestResource()
        .withBootstrapServers(kafka::getKafkaConnectString);


22 Kasım 2022 Salı

Kafka Admin API

Admin Sınıfı
Şu satırı dahil ederiz
import org.apache.kafka.clients.admin.Admin;
createTopics metodu
Örnek
Şöyle yaparız
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.CreateTopicsResult;

public void createTopic(String topicId, int partitionCount) {
  List<NewTopic> newTopics = Collections.singletonList(new NewTopic(topicId, 
     partitionCount, (short) 1));
  CreateTopicsResult createTopicsResult = admin.createTopics(newTopics);
  try {
    createTopicsResult.all().get();
  } catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
  }
}


30 Kasım 2021 Salı

Kafka Streams API StreamsBuilder Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.StreamsBuilder;
Bu sınıf bir Topology nesnesi yaratır. Topology nesnesi KafkaStreams sınıfını yaratmak için gerekir.
Ayrıca bu sınıf stream() metodu sağlar. Böylece bir topic event'lerini işleyen KStream nesnesi yaratılır.
build metodu
Örnek
Şöyle yaparız
Topology createTopology() {
  StreamsBuilder builder = new StreamsBuilder();
  // Add your streams here.
  TradeStream.build(builder);
  Topology topology = builder.build();
  System.out.println(topology.describe());
  return topology;
}
Yeni stream ekleme şöyledir
public class TradeStream {
  private final static String TRADE_TOPIC = "ARCTYPE.public.trade";

  public static void build(StreamsBuilder builder) {
    Serde<TradeModel> tradeModelSerde = SerdeFactory.createSerdeFor(TradeModel.class,
      true);
    Serde<String> idSerde = Serdes.serdeFrom(new IdSerializer(), new IdDeserializer());

    KStream<String, TradeModel> tradeModelKStream =
      builder.stream(TRADE_TOPIC, Consumed.with(idSerde, tradeModelSerde));

    tradeModelKStream.peek((key, value) -> {
      System.out.println(key.toString());
      System.out.println(value.toString());
    });
    tradeModelKStream.map((id, trade) -> {
      TradeModel tradeDoubled = new TradeModel();
      tradeDoubled.price = trade.price * 2;
      tradeDoubled.quantity = trade.quantity;
      tradeDoubled.ticker = trade.ticker;
      return new KeyValue<>(id, tradeDoubled);
    }).to("ARCTYPE.doubled-trades", Produced.with(idSerde, tradeModelSerde));
  }
}
Key SerDe için deserializer şöyledir. Serializer da benzer şekilde yazılır
public class IdDeserializer implements Deserializer<String> {
  private ObjectMapper objectMapper = new ObjectMapper();

  @Override
  public void configure(Map<String, ?> props, boolean isKey) { }

  @Override
  public void close() { }

  @Override
  public String deserialize(String topic, byte[] bytes) {
    if (bytes == null)
      return null;

    String id;
    try {
      Map payload = objectMapper.readValue(new String(bytes), Map.class);
      id = String.valueOf(payload.get("id"));
    } catch (Exception e) {
      throw new SerializationException(e);
    }
    return id;
  }
}
Value SerDe için SerdeFactory sınıfı şöyledir
public class SerdeFactory {
  public static <T> Serde<T> createSerdeFor(Class<T> clazz, boolean isKey) {
    Map<String, Object> serdeProps = new HashMap<>();
    serdeProps.put("Class", clazz);

    Serializer<T> ser = new JsonSerializer<>();
    ser.configure(serdeProps, isKey);

    Deserializer<T> de = new JsonDeserializer<>();
    de.configure(serdeProps, isKey);

    return Serdes.serdeFrom(ser, de);
  }
}
JsonDeserializer şöyle
public class JsonDeserializer<T> implements Deserializer<T> {
  private ObjectMapper objectMapper = new ObjectMapper();
  private Class<T> clazz;

  @Override
  public void configure(Map<String, ?> props, boolean isKey) {
    clazz = (Class<T>) props.get("Class");
  }
  @Override
  public void close() { }
  @Override
  public T deserialize(String topic, byte[] bytes) {
    if (bytes == null)
      return null;

    T data;
    Map payload;
    try {
      payload = objectMapper.readValue(new String(bytes), Map.class);
      // Debezium updates will contain a key "after" with the latest row contents.
      Map afterMap = (Map) payload.get("after");
      if (afterMap == null) {
        // Non-Debezium payloads
        data = objectMapper.readValue(objectMapper.writeValueAsBytes(payload), clazz);
      } else {
        // Incoming from Debezium
        data = objectMapper.readValue(objectMapper.writeValueAsBytes(afterMap), clazz);
      }

    } catch (Exception e) {
      throw new SerializationException(e);
    }
    return data;
  }
}
Value nesnes şöyle
@JsonIgnoreProperties(ignoreUnknown = true)
public class TradeModel {
    public Integer id;
    public String ticker;
    public Integer price;
    public Integer quantity;
}
stream metodu
Örnek
Şöyle yaparız
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String,String> ks0 = streamsBuilder.stream(IAppConfigs.ORDER_RECEIVED_TOPIC);
...
Topology topology = streamsBuilder.build();
Properties streamConfig = ...;
KafkaStreams kafkaStreams = new KafkaStreams(topology, streamConfig);
kafkaStreams.start();

24 Kasım 2021 Çarşamba

Kafka Producer API

Serializer ve Deserializer Arayüzleri
Örnek
Elimizde şöyle bir kod olsun
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Serializer;

public class ProtMessageSerializer implements Serializer<ProtMessage>{
    @Override
    public byte[] serialize(String topic, ProtMessage data) {
        return data.toByteArray();
    }
}


import com.google.protobuf.InvalidProtocolBufferException;
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Deserializer;

public class ProtMessageDeserializer implements Deserializer<ProtMessage>{
    @Override
    public ProtMessage deserialize(String topic, byte[] data) {
        try {
            return ProtMessage.parseFrom(data);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            throw new RuntimeException("excepiton while parsing");
        }
    }
}
Producer tarafında şöyle yaparız
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;


public class MyKafkaProducerWithProtobufModel {

  public static void main(String[] args) {

   Properties props = new Properties();
   props.put("bootstrap.servers", "localhost:9092");
   props.put("linger.ms", 1);
   props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
   props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

   Producer<Integer, ProtMessage> producer = 
    new KafkaProducer<>(props, new IntegerSerializer(), new ProtMessageSerializer());
   for (int i = 1; i <= 10; i++){
     producer.send(new ProducerRecord<>("myFirstTopic", 0, i, 
      ProtMessage.newBuilder()
        .setId(i)
        .setName(i + "proto value")
        .build()));
   }
   producer.close();
  }
}
Consumer tarafında şöyle yaparız
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

public class MyKafkaConsumerWithProtobufModel {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "true");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", 
      "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", 
      "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<Integer, ProtMessage> consumer = 
      new KafkaConsumer<>(props, new IntegerDeserializer(),
        new ProtMessageDeserializer());
    consumer.subscribe(Arrays.asList("myFirstTopic"));

    while (true) {
      ConsumerRecords<Integer, ProtMessage> records = consumer
        .poll(Duration.ofMillis(100));
      for (ConsumerRecord<Integer, ProtMessage> record : records) {
        System.out.println("Received message: (" + record.key() +  ", " + 
          record.value().toString() + ") at offset " + record.offset());
      }
    }
  }
}


ProducerRecord Sınıfı
Şu satırı dahil ederiz
import org.apache.kafka.clients.producer.ProducerRecord;
constructor 
Örnek - topic + key + value
Şöyle yaparız
KafkaProducer<Integer, String> producer;

private KafkaProducer<Integer, String> getProducer() {
  if (producer == null) {
    Properties producerProps = new Properties();
    producerProps.setProperty("bootstrap.servers", ...);
    producerProps.setProperty("key.serializer", 
      IntegerSerializer.class.getCanonicalName());
    producerProps.setProperty("value.serializer", 
      StringSerializer.class.getCanonicalName());
    producer = new KafkaProducer<>(producerProps);
  }
  return producer;
}

public Future<RecordMetadata> produce(String topic, Integer key, String value) {
  return getProducer().send(new ProducerRecord<>(topic, key, value));
}
constructor - topic + partition + timestamp + key + value
Örnek
Şöyle yaparız
Future<RecordMetadata> produce(String topic, int partition, Long timestamp, 
  Integer key, String value) {
  return getProducer().send(new ProducerRecord<>(topic, 
    partition, timestamp, key, value));
}



11 Aralık 2020 Cuma

Kafka Consumer API Config Parametreleri

Giriş
Şu satırı dahil ederiz. Config parametreleri string sabitler olarak ConsumerConfig sınıfında tanımlı. Bu yazıda bazen bu sabitler, bazen de string değerler kullanılıyor.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Kafka Consumer API'yi kullanırken önemli olan şey, Consumer Config işlemini doğru yapabilmek. Config için kullanılan bazı parametreleri aşağıda not aldım. Config işleminde sadece 4 tane parametre zorunlu. Bunlar şöyle
bootstrap.servers, group.id, key.deserializer, value.deserializer
bootstrap.servers
Açıklaması şöyle. Tüm broker'ları yazmaya gerek yok. Ama en az bir tane gerekiyor.
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
group.id Alanı
Açıklama şöyle. Bir topic'e yazılan mesaj, aynı gruptaki consumer'lara paylaştırılır
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
Bir başka açıklama şöyle
If all consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers. If all consumer instances have different consumer groups, then this works like a publish-subscribe and all messages are broadcast to all the consumers.
Şeklen şöyle


Burada önemli olan şey Consumer Rebalance işlemi. Açıklaması şöyle. Bu uzun açıklamada şu anlatılıyor, Eğer Kafka bir şekilde yeni Consumer geldiğini/gittiğini veya yeni bir Partition eklendiğini anlarsa Rebalance işlemini gerçekleştiriyor ve tekrar görev dağıtımı yapıyor. Rebalance işlemi süresince tüm grup mesaj işlemeyi bırakıyor ve bekliyor.
As we saw in the previous section, consumers in a consumer group share ownership of the partitions in the topics they subscribe to. When we add a new consumer to the group, it starts consuming messages from partitions previously consumed by another consumer. The same thing happens when a consumer shuts down or crashes; it leaves the group, and the partitions it used to consume will be consumed by one of the remaining consumers. Reassignment of partitions to consumers also happen when the topics the consumer group is consuming are modified (e.g., if an administrator adds new partitions).

Moving partition ownership from one consumer to another is called a rebalance. Rebalances are important because they provide the consumer group with high availability and scalability (allowing us to easily and safely add and remove consumers), but in the normal course of events they are fairly undesirable. During a rebalance, consumers can’t consume messages, so a rebalance is basically a short window of unavailability of the entire consumer group. In addition, when partitions are moved from one consumer to another, the consumer loses its current state; if it was caching any data, it will need to refresh its caches—slowing down the application until the consumer sets up its state again. Throughout this chapter we will discuss how to safely handle rebalances and how to avoid unnecessary ones.

The way consumers maintain membership in a consumer group and ownership of the partitions assigned to them is by sending heartbeats to a Kafka broker designated as the group coordinator (this broker can be different for different consumer groups). As long as the consumer is sending heartbeats at regular intervals, it is assumed to be alive, well, and processing messages from its partitions. Heartbeats are sent when the consumer polls (i.e., retrieves records) and when it commits records it has consumed.

If the consumer stops sending heartbeats for long enough, its session will time out and the group coordinator will consider it dead and trigger a rebalance. If a consumer crashed and stopped processing messages, it will take the group coordinator a few seconds without heartbeats to decide it is dead and trigger the rebalance. During those seconds, no messages will be processed from the partitions owned by the dead consumer. When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing. Later in this chapter we will discuss configuration options that control heartbeat frequency and session timeouts and how to set those to match your requirements.
Rebalance Detayları
Rebalance işleminin gerçekleşmesi için 2 tane role ihtiyaç var.
1. Group Coordinator
2. Group Leader

Group Coordinator
Burada group coordinator kavramı var. group coordinator bir gruptaki tüm consumer'ları bilen ve takip eden broker demek. Ayrıca Rebalance işlemini de başlatan broker

Group Leader
Bir gruba ilk katılan Consumer demek. Partition'ları atama işlemini yapan şey bu. Açıklaması şöyle
When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and which are therefore considered alive) and is responsible for assigning a subset of partitions to each consumer. It uses an implementation of PartitionAssignor to decide which partitions should be handled by which consumer.

Kafka has two built-in partition assignment policies, which we will discuss in more depth in the configuration section. After deciding on the partition assignment, the consumer group leader sends the list of assignments to the GroupCoordinator, which sends this information to all the consumers. Each consumer only sees his own assignment—the leader is the only client process that has the full list of consumers in the group and their assignments. This process repeats every time a rebalance happens.
auto.offset.reset Alanı
Açıklaması şöyleç "latest","earliest" değerlerini alabilir
This property controls the behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker). The default is “latest,” which means that lacking a valid offset, the consumer will start reading from the newest records (records that were written after the consumer started running). The alternative is “earliest,” which means that lacking a valid offset, the consumer will read all the data in the partition, starting from the very beginning. Setting auto.offset.reset to none will cause an exception to be thrown when attempting to consume from invalid offset.
enable.auto.commit Alanı
Açıklaması şöyle. Bu parametre true da olsa false da olsa halen kaç tane kayıtı commit etmek istediğimizi belirtemiyoruz. "At most once" problemine sebep olur. Yani mesajlar kaybolabilir.
This parameter controls whether the consumer will commit offsets automatically, and defaults to true. Set it to false if you prefer to control when offsets are committed, which is necessary to minimize duplicates and avoid missing data. If you set enable.auto.commit to true, then you might also want to control how frequently offsets will be committed using auto.commit.interval.ms.
max_poll_records Alanı
Şöyle yaparız
import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; public Consumer createKafkaConsumer(String groupId, StringDeserializer keyDeserializer, JsonDeserializer valueDeserializer) { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); return new KafkaConsumer<>(consumerProperties, keyDeserializer, valueDeserializer); }
Açıklaması şöyle
Max poll records config property bounds a maximum number of records returned in a single call. It is set to 1000 because the single message size is pretty small so it can handle a lot of messages in a single poll if needed.

isolation.level Alanı
Bu alanı "read_committed" yaparsak ve 
1. Transaction aware Producer kullanırsak
Şöyle yaparız
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "prod-1");
2. Transaction-Aware Consumer kullanırsak. Şöyle yaparız
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
Yani offset'leri her poll()'dan sonra teker teker commitlersek "Exactly once message delivery" elde ederiz.

Consumer tarafında şöyle yaparız
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

KafkaConsumer<String, String> consumer = createKafkaConsumer();
KafkaProducer<String, String> producer = createKafkaProducer();

producer.initTransactions();

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
  ...
  producer.beginTransaction();
  
  Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
//okuduğum tüm partition'lar üzerinde yürü
  for (TopicPartition partition : records.partitions()) {
//Bu partition'dan okuduğum tüm mesajları al
    List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
//En son offseti bul
    long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
//Offset'i artır
    offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
  }

  producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID);
  producer.commitTransaction();

}
Bu kodla aynı şey biraz daha farklı yapan bir örnek burada.