3 Aralık 2020 Perşembe

CompletableFuture.allOf metodu - Yeni Bir CompletableFuture Döner

Giriş
Açıklaması şöyle.
Completes when all of several CompletableFutures completes.
Bu metodun bir eksikliğinin açıklaması şöyle. Yani tüm işlerin sonucu almanın yolu yok. Sadece Void dönüyor.
Additionally, built-in Java 8 concurrency classes provides pretty inconvenient API to combine several CompletionStage-s. CompletableFuture.allOf/CompletableFuture.anyOf methods accept only CompletableFuture as arguments; you have no mechanism to combine arbitrary CompletionStage-s without converting them to CompletableFuture first. Also, the return type of the aforementioned CompletableFuture.allOf is declared as CompletableFuture<Void>. Hence, you are unable to extract conveniently individual results of each future supplied. CompletableFuture.anyOf is even worse in this regard;...
Genel Kullanım
Örnek
Tüm işlerin bitmesini beklemek için şöyle yaparız
CompletableFuture.allOf(IntStream.rangeClosed(1, 10)
  .boxed()
  .map(this::userFlow)
  .toArray(CompletableFuture[]::new)
).get();

CompletableFuture<String> userFlow(int user) {
  ... 
}
Örnek
Tüm işlerin bitmesini beklemek için şöyle yaparız
int TIMEOUT_IN_MILLIS = 100;

@Test
public void allOfOrTimeout() throws InterruptedException, ExecutionException,
TimeoutException
{ getAllOfFuture().get(TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS); } private CompletableFuture<Void> getAllOfFuture() { return CompletableFuture.allOf( CompletableFuture.runAsync(() -> sleep(1)), CompletableFuture.runAsync(() -> sleep(2)), CompletableFuture.runAsync(() -> sleep(3)), CompletableFuture.runAsync(() -> sleep(4)), CompletableFuture.runAsync(() -> sleep(5)), CompletableFuture.runAsync(() -> sleep(6)), CompletableFuture.runAsync(() -> sleep(7)), CompletableFuture.runAsync(() -> sleep(8)) ); } public static void sleep(int millis) { try { Thread.sleep(millis); System.out.format("Had a nap for %s milliseconds.\r\n", millis); } catch (InterruptedException e) { e.printStackTrace(); } };
Çıktı olarak şunu alırız
Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.
Had a nap for 5 milliseconds.
Had a nap for 6 milliseconds.
Had a nap for 7 milliseconds.
Had a nap for 8 milliseconds.
1. Her CompletableFuture İçin join ile İle Tüm Sonuçları Almak
Örnek
Şöyle yaparız
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Create a list of CompletableFuture objects representing the parallel computations
List<CompletableFuture<Integer>> futures = numbers.stream()
  .map(number -> CompletableFuture.supplyAsync(() -> compute(number)))
  .collect(Collectors.toList());

// Combine all CompletableFuture objects into a single CompletableFuture
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures
  .toArray(new CompletableFuture[0]));

// Wait for all computations to complete
allFutures.join();

// Aggregate the results
int sum = futures.stream()
  .map(CompletableFuture::join)
  .reduce(0, Integer::sum);

// Print the final result
System.out.println("Sum of numbers: " + sum);
2. thenApply + Her CompletableFuture İçin join ile İle Tüm Sonuçları Almak
Örnek
Şöyle yaparız
@Service
public class AggregatorService {
  @Autowired
  private AsyncRestTemplate restTemplate;

  public CompletableFuture<AggregatedResponse> getAggregatedResponse() {
    CompletableFuture<User[]> usersFuture = CompletableFuture.supplyAsync(() -> {
      return restTemplate.getForObject("http://localhost:8080/users", User[].class);
    });
        
    CompletableFuture<Product[]> productsFuture = CompletableFuture.supplyAsync(() -> {
      return restTemplate.getForObject("http://localhost:8080/products", Product[].class);
    });
        
    CompletableFuture<Order[]> ordersFuture = CompletableFuture.supplyAsync(() -> {
      return restTemplate.getForObject("http://localhost:8080/orders", Order[].class);
    });
        
    return CompletableFuture.allOf(usersFuture, productsFuture, ordersFuture)
      .thenApply(v -> new AggregatedResponse(
        usersFuture.join(),
        productsFuture.join(), 
        ordersFuture.join()));
    }
}
2. thenRun + Her CompletableFuture İçin join İle Tüm Sonuçları Almak
Örnek - CompletableFuture[] + thenRun
Yani tüm işlerin sonucu almanın yolu yok. Sadece Void dönüyor. İşlerin sonucu birleştirmek için thenRun() kullanılır. Şöyle yaparız
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // Some long-running operation
    return "Result 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    // Some long-running operation
    return "Result 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    // Some long-running operation
    return "Result 3";
});

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

allFutures.thenRun(() -> {
    // All futures completed
    String result1 = future1.join();
    String result2 = future2.join();
    String result3 = future3.join();
    System.out.println(result1 + ", " + result2 + ", " + result3);
});
Örnek
Şöyle yaparız.
CompletableFuture.allOf(
  CompletableFuture.runAsync(() ->foo ())//make it synchronized call
  ,
  CompletableFuture.runAsync(() ->bar())// make it synchronized call
).thenRun(() -> {
            //do after complete 2 async call.
}).get();
Örnek - Yanlış Çalışıyor
Şöyle yaparız. Burada allOf() Void döndüğü için bir tane future exception ile biterse bile default bir değer dönme imkanı yok. Sadece null dönebiliyoruz. Bu durumda da thenRun() da exception fırlatıyor.
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    // Some long-running operation
    return 10;
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    int result = 10 / 0; // Causes an ArithmeticException
    return result;
});

CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
    // Some long-running operation
    return 20;
});

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

allFutures.exceptionally(ex -> {
    System.out.println("Exception occurred: " + ex.getMessage());
    return null; // Default value to return if there's an exception
}).thenRun(() -> {
    // All futures completed
    int result1 = future1.join();
    int result2 = future2.join();
    int result3 = future3.join();
    System.out.println(result1 + ", " + result2 + ", " + result3);
});

Hiç yorum yok:

Yorum Gönder