6 Eylül 2019 Cuma

Java 9 Flow SubmissionPublisher Sınıfı

Giriş
Flow.Publisher arayüzünü asenkron olarak gerçekleştirir. Bu sınıfla ilgili bir örnek burada.

constructor
Şöyle yaparız.
SubmissionPublisher<String> publisher =
  new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
close metodu
Şöyle yaparız.
publisher.close();
offer metodu
Buffer doluysa ve onDrop true dönerse yeniden eklemeye çalışır. false dönerse çalışmaz. Açıklaması şöyle.
 onDrop - if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
Örnek
Şöyle yaparız.
publisher.offer("item", (subscriber, value) -> false);
Örnek
Şöyle yaparız.
int result = publisher.offer("item" + i, (subscriber, value) -> {
  // sleep for a small period before deciding whether to retry or not
  try {
    Thread.sleep(200);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return false;  // you can switch to true to see that drops are reduced
});
subscribe​ metodu
Şöyle yaparız
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
submit metodu
Kod şöyle. Burada dropHandler null olarak veriliyor. 
public int submit(T item) {
  return doOffer(item, Long.MAX_VALUE, null);
}
Eğer içerdeki buffer doluysa publisher yer açılıncaya kadar bekler.

Örnek
Elimizde şöyle bir Subscriber kod olsun
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
  private Flow.Subscription subscription;    

  @Override
  public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(Integer item) {
    System.out.println("Received: " + item + " on " + Thread.currentThread().getName());
    try {
      Thread.sleep(1000); // Simulate slow processing
    } catch (InterruptedException e) {
       e.printStackTrace();
     }
     subscription.request(1);
  }

  @Override
  public void onError(Throwable throwable) {
    System.err.println("Error: " + throwable.getMessage());
  }

  @Override
  public void onComplete() {
    System.out.println("Completed");
  }
};
Şöyle kullanalım
SubmissionPublisher<Integer> publisher = 
  new SubmissionPublisher<>(ForkJoinPool.commonPool(), 10);
System.out.println("getMaxBufferCapacity = " + publisher.getMaxBufferCapacity());
publisher.subscribe(subscriber);
// Publish items
for (int i = 1; i <= 40; i++) {
  System.out.println("Submitting " + i);
  publisher.submit(i);
}

// Wait for subscriber to finish processing and close the publisher
Thread.sleep(30_000);
publisher.close();
Çıktı şöyle. Submit işlemi içerdeki buffer doluncaya kadar hızlıca başlıyor. Buffer dolunca birer birer yer açıldıkça devam ediyor.  Yavaş tüketici her nesneyi geç te olsa okuyabiliyor.
getMaxBufferCapacity = 16
Submitting 1
Submitting 2
Submitting 3
...
Submitting 17
Submitting 18
Received: 1 on ForkJoinPool.commonPool-worker-19
Received: 2 on ForkJoinPool.commonPool-worker-19
Submitting 19
Received: 3 on ForkJoinPool.commonPool-worker-19
Submitting 20
...
Submitting 37
Received: 21 on ForkJoinPool.commonPool-worker-19
Submitting 38
Received: 22 on ForkJoinPool.commonPool-worker-19
Submitting 39
Received: 23 on ForkJoinPool.commonPool-worker-19
Submitting 40
Received: 24 on ForkJoinPool.commonPool-worker-19
Received: 25 on ForkJoinPool.commonPool-worker-19
...
Received: 40 on ForkJoinPool.commonPool-worker-19


Hiç yorum yok:

Yorum Gönder