6 Nisan 2020 Pazartesi

CompletableFuture.thenCompose metodu - Nested CompletableFuture Sonuçlarını Düzleştirir

Giriş
Kısaca iki tane işi asenkron olarak arka arkaya çalıştırır.
1. Birinci işi asenkron çalıştırır. 
2. Birinci işin sonucunu ikinci işe girdi olarak verir ve ikinci işi de asenkron çalıştırır 
3. Nihayet ikinci işin sonucunu döner

İmzası şöyle. Yani parametre olarak bir function alır. Bu function genellikle bir lamdadır ve bir başka CompletableFuture nesnesi döner.
<U> CompletionStage<U> thenCompose​(Function<? super T,? extends CompletionStage<U>> fn)
thenCompose  ve thenApply Farkı
Açıklaması şöyle
You should use the thenApply method if you want to transform a CompletionStage with a fast function. You should use the thenCompose method if you want to transform a CompletionStage with a slow function.

You should use the thenCompose method if you want to transform two CompletionStages sequentially. You should use the thenCombine method if you want to transform two CompletionStages in parallel.
Burada iki kullanım çeşidinden bahsediliyor.
1. Bir işin sonucunu başka bir şeye dönüştürmek yani transform() işi. 
2. İki işin sonucunu birleştirmek, yani combine() işi

Benim açıklamam ise şöyle
... thenCompose() is used to chain one future dependent on the other, ...
Yani thenCompose T döndüren iş bitince, bunu asenkrona yeni bir işe verir ve U döndürür. Yani ardışık adımlar içindir.

1. transform() işi
thenCompose() aynı thenApply() gibi birinci işin sonucunu ikinci bir iş ile dönüştürür. Farkı şu.

Zaten bir function varsa ve düz nesne dönüyorsa thenApply() kullanılabilir. Eğer elimizdeki function zaten CompletableFuture dönüyorsa thenCompose() kullanılır. Böylece iç içe iki CompletableFuture<CompletableFuture<Foo>> nesnesi bir kademe düzleştirilerek CompletableFuture<Foo> dönülür. Bu kullanım aynen stream.flatMap() metoduna benziyor

Bu kullanım aynı zamanda otomatik olarak ikinci işin de asenkron çalışacağı anlamına geliyor. Bu fark zaten iki metodun imzasındaki farklılık ile de görülebilir. İmzalar şöyle. thenCompose() metodunun asenkron çalıştığı görülebilir. thenCompose imzasında birinc işin döndürdüğü T tipini alan ve CompletionState dönen bir function görülebilir.
<U> CompletableFuture<U>   thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
Örnek
Elimizde şöyle bir kod olsun. Burada thenApply() kullanılıyor ancak sonucu alabilmek için nested completable future içinden erişmek gerekiyor.
CompletionStage<String> stage1 = supplyAsync(() -> sleepAndGet("single"));
CompletionStage<String> stage = stage1.thenApply(s -> s.toUpperCase());
assertEquals("SINGLE", stage.toCompletableFuture().get());
thenCompose() ile her şey daha basit. Şöyle yaparız
CompletionStage<String> stage1 = supplyAsync(() -> sleepAndGet("sequential1"));
CompletionStage<String> stage = stage1.thenCompose(
       s -> supplyAsync(() -> sleepAndGet((s + " " + "sequential2").toUpperCase())));
assertEquals("SEQUENTIAL1 SEQUENTIAL2", stage.toCompletableFuture().get());
Örnek
Şöyle yaparız
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future2 = future1
  .thenComposeAsync(s -> CompletableFuture.supplyAsync(() -> s + " World"));

future2.thenAccept(result -> System.out.println(result));
Örnek
Elimizde şöyle bir kod olsun.
public CompletableFuture<UserInfo> getUserInfo(int userId)
public CompletableFuture<UserRating> getUserRating(UserInfo userInfo)
Şöyle yaparız. Böylece iç içe iki CompletableFuture<CompletableFuture<UserRating>> nesnesi bir kademe düzleştirilerek CompletableFuture<UserRating> dönülür.
CompletableFuture<CompletableFuture<UserRating>> f =
    getUserInfo(1).thenApply(this::getUserRating);

CompletableFuture<UserRating> relevanceFuture =
    getUserInfo(1).thenCompose(this::getUserRating);
Örnek - 
Şöyle yaparız
CompletableFuture<Integer> future = 
    CompletableFuture.supplyAsync(() -> 1)
                     .thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));
Örnek 
Şöyle yaparız
public Stream<CompletableFuture<String>> findPricesStream(String product) {
  return shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
    .map(future -> future.thenApply(Quote::parse))
    .map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))); }
Şeklen şöyle


2. combine() işi
Örnek
Elimizde şöyle bir kod olsun
CompletableFuture<Void> chain = new CompletableFuture<>();

chain.thenComposeAsync(nil -> createUser())
  .thenAcceptAsync(logNewUserId())
  .thenComposeAsync(nil ->
    registerAddress()
      .thenAcceptBothAsync(registerPaymentDetails(), (address, paymentDetailsSuccess) -> {
       System.out.println("Registered address was : " + address);
       System.out.println("Registered payment details success : " + paymentDetailsSuccess);
      })
  )
  .thenComposeAsync(nil -> sendEmail())
  .thenAcceptAsync(result -> System.out.println("Email sent : " + result));

chain.complete(null);
Açıklaması şöyle
As you can see, we create a chain CompletableFuture to hold all our CompletableFutures. That means that our tasks will start only when we complete this chain CompletableFuture. The createUser task starts right after the chain task is completed and once it completes, we have defined a callback to log the newly generated user id by using thenAcceptAsync.

Right after that, by using thenAcceptBothAsync we combine two CompletableFutures that can be executed at the same time: registerAddress and registerPaymentDetails. This combination depends at the same time on the completion of the createUser CompletableFuture, something that has been done using thenComposeAsync method.

At last, we have the sendEmail CompletableFuture, which depends on the completion of both registerAddress and registerPaymentDetails, and finally we just define its callback by using thenAcceptAsync, which prints the result of the operation.
3. İstisnalar
Aslında thenCompose() ardışık işler için tasarlanmış olsa da thenCombine() gibi de kullanılabilir.

Örnek
Akış olarak şöyle. İki  tane paralel asenkron işin sonucu birleştirmek için kullanılır.
              @Async
              Service A(thread1,thread2) \
MicroService /                             (Merge from Response of ServiceA and ServiceB)
             \ @Async
              Service B(thread1,thread2) /
Şöyle yaparız. Bu bence biraz amacı dışında bir kullanım ama mümkün.
public CompletableFuture<Integer> mergeResult()
  throws InterruptedException, ExecutionException {
  CompletableFuture<Integer> futureData1 = aservice.getData();
  CompletableFuture<Integer> futureData2 = bservice.getData();

  CompletableFuture<Integer> result = futureData1.thenCompose(fd1Value -> 
    futureData2.thenApply(fd2Value -> fd1Value + fd2Value)); // add future integer data
  return result;  
}

Hiç yorum yok:

Yorum Gönder