29 Eylül 2022 Perşembe

Pulsar API

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>2.10.1</version>
</dependency>
Consumer API
Consumer Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class PulsarConfig {
  private final ConsumerProperties consumerProperties;

  public PulsarConfig(ConsumerProperties consumerProperties) {
    this.consumerProperties = consumerProperties;
  }

  @SneakyThrows(PulsarClientException.class)
  @Bean
  public PulsarClient buildClient() {
    return PulsarClient.builder()
      .serviceUrl(consumerProperties.getUrl())
      .build();
  }

  @Bean
  public Consumer<Notification> consumer(PulsarClient client,
                                         MessageListener<Notification> listener) 
    throws PulsarClientException {
      return client.newConsumer(Schema.AVRO(Notification.class))
        .topic(consumerProperties.getTopic())
        .subscriptionName(consumerProperties.getSubscriptionName())
        .messageListener(listener)
        .subscribe();
  }
}
Message Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;

@Component
@Slf4j
public class NotificationMessageListener implements MessageListener<Notification> {
  @Override
  public void received(Consumer<Notification> consumer, Message<Notification> msg) {
    try {
      log.info("Topic Name: {}", msg.getTopicName());
      log.info("Message Id: {}", msg.getMessageId());
      log.info("Producer Name: {}", msg.getProducerName());
      log.info("Publish Time: {}", msg.getPublishTime());

      //log.info("Message received: {}", new String(msg.getData()));
      Notification notification = msg.getValue();
      log.info("Message received => Username: {}, Email: {}, 
               Subject: {}, content: {}", 
               notification.getRecipientName(),
               notification.getRecipientEmail(), 
               notification.getSubject(), 
              StringUtils.abbreviate(notification.getContent(), 100));

      log.info("###########################################################");
      consumer.acknowledge(msg);
    } catch (Exception e) {
      consumer.negativeAcknowledge(msg);
    }
  }
}
Producer API
Producer Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PulsarConfig {

  private final ProducerProperties producerProperties;

  public PulsarConfig(ProducerProperties producerProperties) {
    this.producerProperties = producerProperties;
  }

  @SneakyThrows(PulsarClientException.class)
  @Bean
  public PulsarClient buildClient() {
    return PulsarClient.builder()
      .serviceUrl(producerProperties.getUrl())
      .build();
  }

  @Bean
  public Producer<Notification> producer(PulsarClient client) 
    throws PulsarClientException {
    
    return client.newProducer(Schema.AVRO(Notification.class))
      .topic(producerProperties.getTopic())
      .producerName(producerProperties.getProducerName())
      .create();
  }
}
send metodu
Örnek
Şöyle yaparız
void sendMsgAsync(Notification notification) {
  producer.sendAsync(notification)
     .thenAccept(msgId -> 
        log.info("Notification message with ID {} successfully sent", msgId));
}




Hiç yorum yok:

Yorum Gönder