6 Nisan 2023 Perşembe

StructuredTaskScope Sınıfı - Structured Concurrency İçindir

Giriş
Şu satırı dahil ederiz
import jdk.incubator.concurrent.StructuredTaskScope;
Bu sınıfı ilk olarak burada gördüm. Açıklaması şöyle
JDK 19 introduces structured concurrency, which is a multi-threaded programming method designed to simplify multi-threaded programming through the structured concurrency API, not to replace java.util.concurrent.

Structured concurrency treats multiple tasks running in different threads as a single unit of work, simplifying error handling, improving reliability, and enhancing observability. That is, structured concurrency preserves the readability, maintainability, and observability of single-threaded code.
Eskiden şöyle yapardık. Ama bu kullanım zor çünkü her bir thread bağımsız olarak hata döndürebilir ve thread'ler arasında bir ilişki yok.
TrafficResponse retrieveData() throws ExecutionException, InterruptedException {

  // CREATE INDEPENDENT TASK VIA THE EXECUTORSERVICE
  Future<TrafficData> highwayTask =
    this.executorService.submit(this::retrieveHighwayData);
  Future<TrafficData> localTask =
    this.executorService.submit(this::retrieveLocalData);

  // WAIT FOR THE TASKS TO FINISH
  TrafficData highwayTraffic = highwayTask.get();
  TrafficData localTraffic = localTask.get();

  // RETURN COMBINED RESPONSEE
  return new TrafficResponse(highwayTraffic, localTraffic);
}
Kullanım
1. Önce bir StructuredTaskScope yaratılır. Yaratan thread sahibidir. Her zaman try-with-resources ile kullanılır. 
2. StructuredTaskScope taratmak için new StructuredTaskScope.ShutdownOnSuccess veya new StructuredTaskScope.ShutdownOnFailure kullanılır. 

3. fork(Callable) çağrısı yapılır. Bu sanki ExecutorService.submit(Callable) gibidir. Her çağrı yeni bir virtual thread yaratır. StructuredTaskScope.Subtask<T> döndürülür. Bu aynı zamanda Supplier<T> tipindendir.
3. join() çağrısı yapılır. Bu çağrı ya tüm işler başarılı veya başarısız olarak bitinceye kadar veya iş shutdown() ile kapatılıncaya kadar bekler.
Aynı kodu şöyle yaparız
TrafficResponse retrieveData() throws ExecutionException, InterruptedException {

  // CREATE A NEW SCOPE
  try (var scope = new StructuredTaskScope.ShutdownOnFailure) {

    // FORK RELATED SUBTASKS
    Supplier<TrafficData> highwaySubtask = scope.fork(this::retrieveHighwayData);
    Supplier<TrafficData> localSubtask = scope.fork(this::retrieveLocalData);

    // JOIN ALL TASKS IN THE SCOPE
    scope.join()
      .throwIfFailed();  // AND PROPAGATE ERRORS

    // GET THE RESULTS
    return new TrafficResponse(highwaySubtask.get(),
                               localTraffic.get());
  }
}
StructuredTaskScope ve ScopedValue İlişkisi
Örnek
Şöyle yaparız. Burada ScopedValue içinde yaratılan StructuredTaskScope nesnesi ScopedValue değerine erişebiliyor.
private static final ScopedValue<String> HOMETWON = ScopedValue.newInstance();

ScopedValue.runWhere(USERNAME, "Heidelberg", () -> {
  try (var scope = new StructuredTaskScope<String>()) {
    scope.fork(() -> doStuff());
    // ...
  }
});

String doStuff() {
  String name = USERNAME.get();
  // => "Heidelberg"
}

StructuredTaskScope.ShutdownOnFailure Sınıfı
Açıklaması şöyle
StructuredTaskScope.ShutdownOnFailure shuts down the scope on the first failed subtask. If you require all results (“invoke all”), this policy ensures that other subtasks get discarded if any of them fails.
Örnek
Şöyle yaparız
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
  Future<User> mediumUser = scope.fork(() -> getMediumUser());    
  Future<SubscriptionTier> subscriptionTier = scope.fork(() -> getSubscriptionTier());    
  Future<UserInterests> userInterests = scope.fork(() -> getUserInterests());

  scope.join();
  scope.throwIfFailed(IllegalArgumentException::new);
  return new Response(mediumUser.resultNow(),
                      subscriptionTier.resultNow(), 
                      userInterests.resultNow());
}
StructuredTaskScope.ShutdownOnSuccess Sınıfı
Açıklaması şöyle
StructuredTaskScope.ShutdownOnSuccess captures the first successfully completed subtask result and shuts down the scope afterward. This will interrupt any unfinished threads and wake up the scope’s owner. Choose this policy if you only need the result of a singular subtask (“invoke any”).
Örnek
Şöyle yaparız. İlk cevap döndürenin cevabı kabul edilir
public Weather readWeather() 
  throws ExecutionException, InterruptedException, TimeoutException {
  try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()){

    // need to call fork and pass runnables or callables
    scope.fork(Weather::readWeatherFromServerA());
    scope.fork(Weather::readWeatherFromServerB());
    scope.fork(Weather::readWeatherFromServerC());

    // now we block, blocking is cheap in virtual threas
    // so no issue here
    scope.join();

    Weather weather = scope.result();
    return weather;
}
Eğer hangisinin ilk cevap döndürdüğünü görmek istersek şöyle yaparız
public Weather readWeather() throws InterruptedException, ExecutionException {

  try(var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()){

    Future<Weather> futureA = scope.fork(Weather::readWeatherFromServerA);
    Future<Weather> futureB = scope.fork(Weather::readWeatherFromServerB);
    Future<Weather> futureC = scope.fork(Weather::readWeatherFromServerC);

    scope.join();

    System.out.println("futureA.state() = " + futureA.state());
    System.out.println("futureB.state() = " + futureB.state());
    System.out.println("futureC.state() = " + futureC.state());

    var weather = scope.result();

    return weather;
  }
}
Çıktı şuna benzer
WARNING: Using incubator modules: jdk.incubator.concurrent
futureA.state() = FAILED
futureB.state() = FAILED
futureC.state() = SUCCESS
Custom StructuredTaskScope
Örnek
Şöyle yaparız. Burada başarıyla biten işler bir listede toplanıyor
public class OrderedSuccessfulScope<T> extends StructuredTaskScope<T> {

  private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();

  @Override
  protected void handleComplete(Subtask<? extends T> subtask) {
    if (subtask.state() != Subtask.State.SUCCESS) {
      return;
    }

    subtasks.add(subtask);
  }

  // ADDITONAL FUNCTIONALITY PROVIDED BY THE CUSTOM SCOPE
  public Stream<Subtask<? extends T>> completedSuccessfully() {
    super.ensureOwnerAndJoined();
    return subtasks.stream();
  }
}
Açıklaması şöyle
The super.ensureOwnerAndJoined() call ensures that all forked tasks are joined, and the current thread is the owner of the scope, or it throws an Exception.





Hiç yorum yok:

Yorum Gönder