Maven
Şu satırı dahil ederiz
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.10.1</version></dependency>
Consumer API
Consumer Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class PulsarConfig { private final ConsumerProperties consumerProperties; public PulsarConfig(ConsumerProperties consumerProperties) { this.consumerProperties = consumerProperties; } @SneakyThrows(PulsarClientException.class) @Bean public PulsarClient buildClient() { return PulsarClient.builder() .serviceUrl(consumerProperties.getUrl()) .build(); } @Bean public Consumer<Notification> consumer(PulsarClient client, MessageListener<Notification> listener) throws PulsarClientException { return client.newConsumer(Schema.AVRO(Notification.class)) .topic(consumerProperties.getTopic()) .subscriptionName(consumerProperties.getSubscriptionName()) .messageListener(listener) .subscribe(); } }
Message Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; @Component @Slf4j public class NotificationMessageListener implements MessageListener<Notification> { @Override public void received(Consumer<Notification> consumer, Message<Notification> msg) { try { log.info("Topic Name: {}", msg.getTopicName()); log.info("Message Id: {}", msg.getMessageId()); log.info("Producer Name: {}", msg.getProducerName()); log.info("Publish Time: {}", msg.getPublishTime()); //log.info("Message received: {}", new String(msg.getData())); Notification notification = msg.getValue(); log.info("Message received => Username: {}, Email: {}, Subject: {}, content: {}", notification.getRecipientName(), notification.getRecipientEmail(), notification.getSubject(), StringUtils.abbreviate(notification.getContent(), 100)); log.info("###########################################################"); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } }
Producer API
Producer Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class PulsarConfig { private final ProducerProperties producerProperties; public PulsarConfig(ProducerProperties producerProperties) { this.producerProperties = producerProperties; } @SneakyThrows(PulsarClientException.class) @Bean public PulsarClient buildClient() { return PulsarClient.builder() .serviceUrl(producerProperties.getUrl()) .build(); } @Bean public Producer<Notification> producer(PulsarClient client) throws PulsarClientException { return client.newProducer(Schema.AVRO(Notification.class)) .topic(producerProperties.getTopic()) .producerName(producerProperties.getProducerName()) .create(); } }
send metodu
Örnek
Şöyle yaparız
void sendMsgAsync(Notification notification) { producer.sendAsync(notification) .thenAccept(msgId -> log.info("Notification message with ID {} successfully sent", msgId)); }