Giriş
Şu satırı dahil ederiz
import jdk.incubator.concurrent.StructuredTaskScope;
JDK 19 introduces structured concurrency, which is a multi-threaded programming method designed to simplify multi-threaded programming through the structured concurrency API, not to replace java.util.concurrent.
Structured concurrency treats multiple tasks running in different threads as a single unit of work, simplifying error handling, improving reliability, and enhancing observability. That is, structured concurrency preserves the readability, maintainability, and observability of single-threaded code.
Eskiden şöyle
yapardık. Ama bu kullanım zor çünkü her bir thread bağımsız olarak hata döndürebilir ve thread'ler arasında bir ilişki yok.
TrafficResponse retrieveData() throws ExecutionException, InterruptedException {
// CREATE INDEPENDENT TASK VIA THE EXECUTORSERVICE
Future<TrafficData> highwayTask =
this.executorService.submit(this::retrieveHighwayData);
Future<TrafficData> localTask =
this.executorService.submit(this::retrieveLocalData);
// WAIT FOR THE TASKS TO FINISH
TrafficData highwayTraffic = highwayTask.get();
TrafficData localTraffic = localTask.get();
// RETURN COMBINED RESPONSEE
return new TrafficResponse(highwayTraffic, localTraffic);
}
Kullanım
1. Önce bir StructuredTaskScope yaratılır. Yaratan thread sahibidir. Her zaman try-with-resources ile kullanılır.
2. StructuredTaskScope taratmak için new StructuredTaskScope.ShutdownOnSuccess veya new StructuredTaskScope.ShutdownOnFailure kullanılır.
3. fork(Callable) çağrısı yapılır. Bu sanki ExecutorService.submit(Callable) gibidir. Her çağrı yeni bir virtual thread yaratır. StructuredTaskScope.Subtask<T> döndürülür. Bu aynı zamanda Supplier<T> tipindendir.
3. join() çağrısı yapılır. Bu çağrı ya tüm işler başarılı veya başarısız olarak bitinceye kadar veya iş shutdown() ile kapatılıncaya kadar bekler.
TrafficResponse retrieveData() throws ExecutionException, InterruptedException {
// CREATE A NEW SCOPE
try (var scope = new StructuredTaskScope.ShutdownOnFailure) {
// FORK RELATED SUBTASKS
Supplier<TrafficData> highwaySubtask = scope.fork(this::retrieveHighwayData);
Supplier<TrafficData> localSubtask = scope.fork(this::retrieveLocalData);
// JOIN ALL TASKS IN THE SCOPE
scope.join()
.throwIfFailed(); // AND PROPAGATE ERRORS
// GET THE RESULTS
return new TrafficResponse(highwaySubtask.get(),
localTraffic.get());
}
}
StructuredTaskScope ve ScopedValue İlişkisi
Örnek
Şöyle
yaparız. Burada
ScopedValue içinde yaratılan
StructuredTaskScope nesnesi
ScopedValue değerine erişebiliyor.
private static final ScopedValue<String> HOMETWON = ScopedValue.newInstance();
ScopedValue.runWhere(USERNAME, "Heidelberg", () -> {
try (var scope = new StructuredTaskScope<String>()) {
scope.fork(() -> doStuff());
// ...
}
});
String doStuff() {
String name = USERNAME.get();
// => "Heidelberg"
}
StructuredTaskScope.ShutdownOnFailure Sınıfı
StructuredTaskScope.ShutdownOnFailure shuts down the scope on the first failed subtask. If you require all results (“invoke all”), this policy ensures that other subtasks get discarded if any of them fails.
Örnek
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<User> mediumUser = scope.fork(() -> getMediumUser());
Future<SubscriptionTier> subscriptionTier = scope.fork(() -> getSubscriptionTier());
Future<UserInterests> userInterests = scope.fork(() -> getUserInterests());
scope.join();
scope.throwIfFailed(IllegalArgumentException::new);
return new Response(mediumUser.resultNow(),
subscriptionTier.resultNow(),
userInterests.resultNow());
}
StructuredTaskScope.ShutdownOnSuccess Sınıfı
StructuredTaskScope.ShutdownOnSuccess captures the first successfully completed subtask result and shuts down the scope afterward. This will interrupt any unfinished threads and wake up the scope’s owner. Choose this policy if you only need the result of a singular subtask (“invoke any”).
Örnek
Şöyle
yaparız. İlk cevap döndürenin cevabı kabul edilir
public Weather readWeather()
throws ExecutionException, InterruptedException, TimeoutException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()){
// need to call fork and pass runnables or callables
scope.fork(Weather::readWeatherFromServerA());
scope.fork(Weather::readWeatherFromServerB());
scope.fork(Weather::readWeatherFromServerC());
// now we block, blocking is cheap in virtual threas
// so no issue here
scope.join();
Weather weather = scope.result();
return weather;
}
Eğer hangisinin ilk cevap döndürdüğünü görmek istersek şöyle
yaparızpublic Weather readWeather() throws InterruptedException, ExecutionException {
try(var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()){
Future<Weather> futureA = scope.fork(Weather::readWeatherFromServerA);
Future<Weather> futureB = scope.fork(Weather::readWeatherFromServerB);
Future<Weather> futureC = scope.fork(Weather::readWeatherFromServerC);
scope.join();
System.out.println("futureA.state() = " + futureA.state());
System.out.println("futureB.state() = " + futureB.state());
System.out.println("futureC.state() = " + futureC.state());
var weather = scope.result();
return weather;
}
}
WARNING: Using incubator modules: jdk.incubator.concurrent
futureA.state() = FAILED
futureB.state() = FAILED
futureC.state() = SUCCESS
Custom StructuredTaskScope
Örnek
Şöyle
yaparız. Burada başarıyla biten işler bir listede toplanıyor
public class OrderedSuccessfulScope<T> extends StructuredTaskScope<T> {
private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() != Subtask.State.SUCCESS) {
return;
}
subtasks.add(subtask);
}
// ADDITONAL FUNCTIONALITY PROVIDED BY THE CUSTOM SCOPE
public Stream<Subtask<? extends T>> completedSuccessfully() {
super.ensureOwnerAndJoined();
return subtasks.stream();
}
}
The super.ensureOwnerAndJoined() call ensures that all forked tasks are joined, and the current thread is the owner of the scope, or it throws an Exception.