Giriş
Flow.Publisher arayüzünü asenkron olarak gerçekleştirir. Bu sınıfla ilgili bir örnek burada.
constructor
Flow.Publisher arayüzünü asenkron olarak gerçekleştirir. Bu sınıfla ilgili bir örnek burada.
constructor
Şöyle yaparız.
Şöyle yaparız.
Buffer doluysa ve onDrop true dönerse yeniden eklemeye çalışır. false dönerse çalışmaz. Açıklaması şöyle.
Şöyle yaparız.
Şöyle yaparız.
Şöyle yaparız
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
close metoduŞöyle yaparız.
publisher.close();
offer metoduBuffer doluysa ve onDrop true dönerse yeniden eklemeye çalışır. false dönerse çalışmaz. Açıklaması şöyle.
ÖrnekonDrop - if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
Şöyle yaparız.
publisher.offer("item", (subscriber, value) -> false);
ÖrnekŞöyle yaparız.
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for a small period before deciding whether to retry or not
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
subscribe metoduŞöyle yaparız
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
submit metoduKod şöyle. Burada dropHandler null olarak veriliyor.
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
Eğer içerdeki buffer doluysa publisher yer açılıncaya kadar bekler.
Örnek
Elimizde şöyle bir Subscriber kod olsun
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Received: " + item + " on " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // Simulate slow processing
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
};
Şöyle kullanalımSubmissionPublisher<Integer> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 10);
System.out.println("getMaxBufferCapacity = " + publisher.getMaxBufferCapacity());
publisher.subscribe(subscriber);
// Publish items
for (int i = 1; i <= 40; i++) {
System.out.println("Submitting " + i);
publisher.submit(i);
}
// Wait for subscriber to finish processing and close the publisher
Thread.sleep(30_000);
publisher.close();
Çıktı şöyle. Submit işlemi içerdeki buffer doluncaya kadar hızlıca başlıyor. Buffer dolunca birer birer yer açıldıkça devam ediyor. Yavaş tüketici her nesneyi geç te olsa okuyabiliyor.
getMaxBufferCapacity = 16
Submitting 1
Submitting 2
Submitting 3
...
Submitting 17
Submitting 18
Received: 1 on ForkJoinPool.commonPool-worker-19
Received: 2 on ForkJoinPool.commonPool-worker-19
Submitting 19
Received: 3 on ForkJoinPool.commonPool-worker-19
Submitting 20
...
Submitting 37
Received: 21 on ForkJoinPool.commonPool-worker-19
Submitting 38
Received: 22 on ForkJoinPool.commonPool-worker-19
Submitting 39
Received: 23 on ForkJoinPool.commonPool-worker-19
Submitting 40
Received: 24 on ForkJoinPool.commonPool-worker-19
Received: 25 on ForkJoinPool.commonPool-worker-19
...
Received: 40 on ForkJoinPool.commonPool-worker-19
Hiç yorum yok:
Yorum Gönder