3 Aralık 2020 Perşembe

CompletableFuture.thenCombine metodu - İki Future Paralel Çalışır ve Bittikten Sonra Bir Future Bir Başka Future İle Birleşir

Giriş
Açıklaması şöyle. İki tane future bittikten sonra, bir future bir başka future ile birleşir. Böylece sonuçlarını alıp birleştirilebiliriz.
If the dependent CompletableFuture is intended to combine the results of two previous CompletableFutures by applying a function on them and returning a result, we can use the method thenCombine()
Şeklen şöyle


thenCombine  ve thenCompose Farkı
Açıklaması şöyle
While thenCompose() is used to chain one future dependent on the other, thenCombine combines two independent futures when they are both done
İmzaları şöyle. 
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, 
 BiFunction<? super T,? super U,? extends V> fn)

<U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
Burada thenCombine(), T ve U döndüren iki iş bitince, bunları asenkron bir başka işe verir ve V döndürür. Yani paralel işler içindir

thenCompose ise T döndüren iş bitince, bunu asenkrona yeni bir işe verir ve U döndürür. Yani ardışık adımlar içindir.

Örnek
Bir sürü siteden Posts nesneleri indirmek isteyelim. Şöyle yaparız
public CompletableFuture<Collection<Post>> fetchPosts() {
  final List<CompletableFuture<Collection<Post>>> futures = ...//Start fetching
  return futures.stream()
    .reduce(combineApiCalls())
    .orElse(CompletableFuture.completedFuture(emptyList()));
}


private BinaryOperator<CompletableFuture<Collection<Post>>> combineApiCalls() {
  return (c1, c2) -> c1.thenCombine(c2, (posts1, posts2) -> {
      return Stream.concat(posts1.stream(), posts2.stream())
        .collect(toList());
    });
}

CompletableFuture<Collection<Post>> future = fetchPosts(); //wrap everything
Collection<Post> list = future.join();
Açıklaması şöyle
What we’re doing here is wrapping everything in one single CompletableFuture that the client will trigger on their side when subscribing to it, for example by using get or join.
We could have decided to organise our CompletableFutures in a different way, for example by using thenCompose; however, we have to remember from my last post that with thenCompose we wait for the previous CompletableFuture to complete. That would mean that we’d be executing our tasks sequentially again and this would be inefficient.
This is why is so important to understand how each method works.
Örnek
Şöyle yaparız
CompletionStage<String> stage1 = supplyAsync(() -> sleepAndGet("parallel1"));
CompletionStage<String> stage2 = supplyAsync(() -> sleepAndGet("parallel2"));
CompletionStage<String> stage = stage1.thenCombine(stage2,
       (s1, s2) -> (s1 + " " + s2).toUpperCase());
assertEquals("PARALLEL1 PARALLEL2", stage.toCompletableFuture().get());
Örnek
Şöyle yaparız
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> task.compute(1));
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> task.compute(2));
List<Integer> list = a.thenCombine(b, List::of).join();
Örnek - Future + Callback
Şöyle yaparız. Callback her iki future nesnesinin sonuç tipini alacak şekildedir.
ForkJoinPool commonPool = new ForkJoinPool(2000);

return CompletableFuture.supplyAsync(() -> serviceA(user), commonPool)
  .thenCombine(CompletableFuture.supplyAsync(() -> serviceB(user), commonPool),
this::persist);

}

private String persist(String serviceA, String serviceB) {
    return "...";
}

Hiç yorum yok:

Yorum Gönder