Maven
Şu satırı dahil ederiz
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.10.1</version></dependency>
Consumer API
Consumer Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class PulsarConfig {
private final ConsumerProperties consumerProperties;
public PulsarConfig(ConsumerProperties consumerProperties) {
this.consumerProperties = consumerProperties;
}
@SneakyThrows(PulsarClientException.class)
@Bean
public PulsarClient buildClient() {
return PulsarClient.builder()
.serviceUrl(consumerProperties.getUrl())
.build();
}
@Bean
public Consumer<Notification> consumer(PulsarClient client,
MessageListener<Notification> listener)
throws PulsarClientException {
return client.newConsumer(Schema.AVRO(Notification.class))
.topic(consumerProperties.getTopic())
.subscriptionName(consumerProperties.getSubscriptionName())
.messageListener(listener)
.subscribe();
}
}Message Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
@Component
@Slf4j
public class NotificationMessageListener implements MessageListener<Notification> {
@Override
public void received(Consumer<Notification> consumer, Message<Notification> msg) {
try {
log.info("Topic Name: {}", msg.getTopicName());
log.info("Message Id: {}", msg.getMessageId());
log.info("Producer Name: {}", msg.getProducerName());
log.info("Publish Time: {}", msg.getPublishTime());
//log.info("Message received: {}", new String(msg.getData()));
Notification notification = msg.getValue();
log.info("Message received => Username: {}, Email: {},
Subject: {}, content: {}",
notification.getRecipientName(),
notification.getRecipientEmail(),
notification.getSubject(),
StringUtils.abbreviate(notification.getContent(), 100));
log.info("###########################################################");
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
}Producer API
Producer Arayüzü
Örnek
Şöyle yaparız
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PulsarConfig {
private final ProducerProperties producerProperties;
public PulsarConfig(ProducerProperties producerProperties) {
this.producerProperties = producerProperties;
}
@SneakyThrows(PulsarClientException.class)
@Bean
public PulsarClient buildClient() {
return PulsarClient.builder()
.serviceUrl(producerProperties.getUrl())
.build();
}
@Bean
public Producer<Notification> producer(PulsarClient client)
throws PulsarClientException {
return client.newProducer(Schema.AVRO(Notification.class))
.topic(producerProperties.getTopic())
.producerName(producerProperties.getProducerName())
.create();
}
}send metodu
Örnek
Şöyle yaparız
void sendMsgAsync(Notification notification) {
producer.sendAsync(notification)
.thenAccept(msgId ->
log.info("Notification message with ID {} successfully sent", msgId));
}













