Giriş
Şu satırı dahil ederiz
JDK 19 introduces structured concurrency, which is a multi-threaded programming method designed to simplify multi-threaded programming through the structured concurrency API, not to replace java.util.concurrent.Structured concurrency treats multiple tasks running in different threads as a single unit of work, simplifying error handling, improving reliability, and enhancing observability. That is, structured concurrency preserves the readability, maintainability, and observability of single-threaded code.
Eskiden şöyle yapardık. Ama bu kullanım zor çünkü her bir thread bağımsız olarak hata döndürebilir ve thread'ler arasında bir ilişki yok.
TrafficResponse retrieveData() throws ExecutionException, InterruptedException { // CREATE INDEPENDENT TASK VIA THE EXECUTORSERVICE Future<TrafficData> highwayTask = this.executorService.submit(this::retrieveHighwayData); Future<TrafficData> localTask = this.executorService.submit(this::retrieveLocalData); // WAIT FOR THE TASKS TO FINISH TrafficData highwayTraffic = highwayTask.get(); TrafficData localTraffic = localTask.get(); // RETURN COMBINED RESPONSEE return new TrafficResponse(highwayTraffic, localTraffic); }
Kullanım
1. Önce bir StructuredTaskScope yaratılır. Yaratan thread sahibidir. Her zaman try-with-resources ile kullanılır.
2. StructuredTaskScope taratmak için new StructuredTaskScope.ShutdownOnSuccess veya new StructuredTaskScope.ShutdownOnFailure kullanılır.
3. fork(Callable) çağrısı yapılır. Bu sanki ExecutorService.submit(Callable) gibidir. Her çağrı yeni bir virtual thread yaratır. StructuredTaskScope.Subtask<T> döndürülür. Bu aynı zamanda Supplier<T> tipindendir.
3. join() çağrısı yapılır. Bu çağrı ya tüm işler başarılı veya başarısız olarak bitinceye kadar veya iş shutdown() ile kapatılıncaya kadar bekler.
Aynı kodu şöyle yaparız
TrafficResponse retrieveData() throws ExecutionException, InterruptedException { // CREATE A NEW SCOPE try (var scope = new StructuredTaskScope.ShutdownOnFailure) { // FORK RELATED SUBTASKS Supplier<TrafficData> highwaySubtask = scope.fork(this::retrieveHighwayData); Supplier<TrafficData> localSubtask = scope.fork(this::retrieveLocalData); // JOIN ALL TASKS IN THE SCOPE scope.join() .throwIfFailed(); // AND PROPAGATE ERRORS // GET THE RESULTS return new TrafficResponse(highwaySubtask.get(), localTraffic.get()); } }
StructuredTaskScope ve ScopedValue İlişkisi
Örnek
Şöyle yaparız. Burada ScopedValue içinde yaratılan StructuredTaskScope nesnesi ScopedValue değerine erişebiliyor.
private static final ScopedValue<String> HOMETWON = ScopedValue.newInstance(); ScopedValue.runWhere(USERNAME, "Heidelberg", () -> { try (var scope = new StructuredTaskScope<String>()) { scope.fork(() -> doStuff()); // ... } }); String doStuff() { String name = USERNAME.get(); // => "Heidelberg" }
StructuredTaskScope.ShutdownOnFailure Sınıfı
Açıklaması şöyle
StructuredTaskScope.ShutdownOnFailure shuts down the scope on the first failed subtask. If you require all results (“invoke all”), this policy ensures that other subtasks get discarded if any of them fails.
Örnek
Şöyle yaparız
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {Future<User> mediumUser = scope.fork(() -> getMediumUser());Future<SubscriptionTier> subscriptionTier = scope.fork(() -> getSubscriptionTier());Future<UserInterests> userInterests = scope.fork(() -> getUserInterests());scope.join();scope.throwIfFailed(IllegalArgumentException::new);return new Response(mediumUser.resultNow(),subscriptionTier.resultNow(),userInterests.resultNow());}
StructuredTaskScope.ShutdownOnSuccess Sınıfı
Açıklaması şöyle
StructuredTaskScope.ShutdownOnSuccess captures the first successfully completed subtask result and shuts down the scope afterward. This will interrupt any unfinished threads and wake up the scope’s owner. Choose this policy if you only need the result of a singular subtask (“invoke any”).
Örnek
Şöyle yaparız. İlk cevap döndürenin cevabı kabul edilir
public Weather readWeather() throws ExecutionException, InterruptedException, TimeoutException { try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()){ // need to call fork and pass runnables or callables scope.fork(Weather::readWeatherFromServerA()); scope.fork(Weather::readWeatherFromServerB()); scope.fork(Weather::readWeatherFromServerC()); // now we block, blocking is cheap in virtual threas // so no issue here scope.join(); Weather weather = scope.result(); return weather; }
Eğer hangisinin ilk cevap döndürdüğünü görmek istersek şöyle yaparız
public Weather readWeather() throws InterruptedException, ExecutionException { try(var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()){ Future<Weather> futureA = scope.fork(Weather::readWeatherFromServerA); Future<Weather> futureB = scope.fork(Weather::readWeatherFromServerB); Future<Weather> futureC = scope.fork(Weather::readWeatherFromServerC); scope.join(); System.out.println("futureA.state() = " + futureA.state()); System.out.println("futureB.state() = " + futureB.state()); System.out.println("futureC.state() = " + futureC.state()); var weather = scope.result(); return weather; } }
Çıktı şuna benzer
WARNING: Using incubator modules: jdk.incubator.concurrent futureA.state() = FAILED futureB.state() = FAILED futureC.state() = SUCCESS
Custom StructuredTaskScope
Örnek
Şöyle yaparız. Burada başarıyla biten işler bir listede toplanıyor
public class OrderedSuccessfulScope<T> extends StructuredTaskScope<T> { private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>(); @Override protected void handleComplete(Subtask<? extends T> subtask) { if (subtask.state() != Subtask.State.SUCCESS) { return; } subtasks.add(subtask); } // ADDITONAL FUNCTIONALITY PROVIDED BY THE CUSTOM SCOPE public Stream<Subtask<? extends T>> completedSuccessfully() { super.ensureOwnerAndJoined(); return subtasks.stream(); } }
Açıklaması şöyle
The super.ensureOwnerAndJoined() call ensures that all forked tasks are joined, and the current thread is the owner of the scope, or it throws an Exception.
Hiç yorum yok:
Yorum Gönder