11 Aralık 2020 Cuma

Kafka Consumer API Config Parametreleri

Giriş
Şu satırı dahil ederiz. Config parametreleri string sabitler olarak ConsumerConfig sınıfında tanımlı. Bu yazıda bazen bu sabitler, bazen de string değerler kullanılıyor.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Kafka Consumer API'yi kullanırken önemli olan şey, Consumer Config işlemini doğru yapabilmek. Config için kullanılan bazı parametreleri aşağıda not aldım. Config işleminde sadece 4 tane parametre zorunlu. Bunlar şöyle
bootstrap.servers, group.id, key.deserializer, value.deserializer
bootstrap.servers
Açıklaması şöyle. Tüm broker'ları yazmaya gerek yok. Ama en az bir tane gerekiyor.
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
group.id Alanı
Açıklama şöyle. Bir topic'e yazılan mesaj, aynı gruptaki consumer'lara paylaştırılır
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
Bir başka açıklama şöyle
If all consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers. If all consumer instances have different consumer groups, then this works like a publish-subscribe and all messages are broadcast to all the consumers.
Şeklen şöyle


Burada önemli olan şey Consumer Rebalance işlemi. Açıklaması şöyle. Bu uzun açıklamada şu anlatılıyor, Eğer Kafka bir şekilde yeni Consumer geldiğini/gittiğini veya yeni bir Partition eklendiğini anlarsa Rebalance işlemini gerçekleştiriyor ve tekrar görev dağıtımı yapıyor. Rebalance işlemi süresince tüm grup mesaj işlemeyi bırakıyor ve bekliyor.
As we saw in the previous section, consumers in a consumer group share ownership of the partitions in the topics they subscribe to. When we add a new consumer to the group, it starts consuming messages from partitions previously consumed by another consumer. The same thing happens when a consumer shuts down or crashes; it leaves the group, and the partitions it used to consume will be consumed by one of the remaining consumers. Reassignment of partitions to consumers also happen when the topics the consumer group is consuming are modified (e.g., if an administrator adds new partitions).

Moving partition ownership from one consumer to another is called a rebalance. Rebalances are important because they provide the consumer group with high availability and scalability (allowing us to easily and safely add and remove consumers), but in the normal course of events they are fairly undesirable. During a rebalance, consumers can’t consume messages, so a rebalance is basically a short window of unavailability of the entire consumer group. In addition, when partitions are moved from one consumer to another, the consumer loses its current state; if it was caching any data, it will need to refresh its caches—slowing down the application until the consumer sets up its state again. Throughout this chapter we will discuss how to safely handle rebalances and how to avoid unnecessary ones.

The way consumers maintain membership in a consumer group and ownership of the partitions assigned to them is by sending heartbeats to a Kafka broker designated as the group coordinator (this broker can be different for different consumer groups). As long as the consumer is sending heartbeats at regular intervals, it is assumed to be alive, well, and processing messages from its partitions. Heartbeats are sent when the consumer polls (i.e., retrieves records) and when it commits records it has consumed.

If the consumer stops sending heartbeats for long enough, its session will time out and the group coordinator will consider it dead and trigger a rebalance. If a consumer crashed and stopped processing messages, it will take the group coordinator a few seconds without heartbeats to decide it is dead and trigger the rebalance. During those seconds, no messages will be processed from the partitions owned by the dead consumer. When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing. Later in this chapter we will discuss configuration options that control heartbeat frequency and session timeouts and how to set those to match your requirements.
Rebalance Detayları
Rebalance işleminin gerçekleşmesi için 2 tane role ihtiyaç var.
1. Group Coordinator
2. Group Leader

Group Coordinator
Burada group coordinator kavramı var. group coordinator bir gruptaki tüm consumer'ları bilen ve takip eden broker demek. Ayrıca Rebalance işlemini de başlatan broker

Group Leader
Bir gruba ilk katılan Consumer demek. Partition'ları atama işlemini yapan şey bu. Açıklaması şöyle
When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and which are therefore considered alive) and is responsible for assigning a subset of partitions to each consumer. It uses an implementation of PartitionAssignor to decide which partitions should be handled by which consumer.

Kafka has two built-in partition assignment policies, which we will discuss in more depth in the configuration section. After deciding on the partition assignment, the consumer group leader sends the list of assignments to the GroupCoordinator, which sends this information to all the consumers. Each consumer only sees his own assignment—the leader is the only client process that has the full list of consumers in the group and their assignments. This process repeats every time a rebalance happens.
auto.offset.reset Alanı
Açıklaması şöyleç "latest","earliest" değerlerini alabilir
This property controls the behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker). The default is “latest,” which means that lacking a valid offset, the consumer will start reading from the newest records (records that were written after the consumer started running). The alternative is “earliest,” which means that lacking a valid offset, the consumer will read all the data in the partition, starting from the very beginning. Setting auto.offset.reset to none will cause an exception to be thrown when attempting to consume from invalid offset.
enable.auto.commit Alanı
Açıklaması şöyle. Bu parametre true da olsa false da olsa halen kaç tane kayıtı commit etmek istediğimizi belirtemiyoruz. "At most once" problemine sebep olur. Yani mesajlar kaybolabilir.
This parameter controls whether the consumer will commit offsets automatically, and defaults to true. Set it to false if you prefer to control when offsets are committed, which is necessary to minimize duplicates and avoid missing data. If you set enable.auto.commit to true, then you might also want to control how frequently offsets will be committed using auto.commit.interval.ms.
max_poll_records Alanı
Şöyle yaparız
import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; public Consumer createKafkaConsumer(String groupId, StringDeserializer keyDeserializer, JsonDeserializer valueDeserializer) { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); return new KafkaConsumer<>(consumerProperties, keyDeserializer, valueDeserializer); }
Açıklaması şöyle
Max poll records config property bounds a maximum number of records returned in a single call. It is set to 1000 because the single message size is pretty small so it can handle a lot of messages in a single poll if needed.

isolation.level Alanı
Bu alanı "read_committed" yaparsak ve 
1. Transaction aware Producer kullanırsak
Şöyle yaparız
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "prod-1");
2. Transaction-Aware Consumer kullanırsak. Şöyle yaparız
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
Yani offset'leri her poll()'dan sonra teker teker commitlersek "Exactly once message delivery" elde ederiz.

Consumer tarafında şöyle yaparız
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

KafkaConsumer<String, String> consumer = createKafkaConsumer();
KafkaProducer<String, String> producer = createKafkaProducer();

producer.initTransactions();

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
  ...
  producer.beginTransaction();
  
  Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
//okuduğum tüm partition'lar üzerinde yürü
  for (TopicPartition partition : records.partitions()) {
//Bu partition'dan okuduğum tüm mesajları al
    List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
//En son offseti bul
    long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
//Offset'i artır
    offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
  }

  producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID);
  producer.commitTransaction();

}
Bu kodla aynı şey biraz daha farklı yapan bir örnek burada.

Hiç yorum yok:

Yorum Gönder