ForkJoinPool etiketine sahip kayıtlar gösteriliyor. Tüm kayıtları göster
ForkJoinPool etiketine sahip kayıtlar gösteriliyor. Tüm kayıtları göster

14 Mart 2023 Salı

ForkJoinPool Work Stealing Nedir?

Giriş
Açıklaması şöyle
If a thread is getting overwhelmed and its internal queue fills up another thread instead of picking up task from the main queue can “steal” task from another threads internal queue.
Şeklen şöyle. B thread'i A thread'inin kuyruğundan iş çalıyor.
Resimdeki her threadin kuyruğunun ismi ForkJoinPool.WorkQueue. Açıklaması şöyle
ForkJoinPool is that it’s created with the following ruling points.Each thread has its own task queue. 

- Each task queue is a cicular array.
- Each thread uses push and pop to add or remove tasks to its own queue.
- ForkJoinPool uses the work-stealing algorithm to balance the workload on different threads. The pool maintains a global work queue that stores externally submitted tasks. Each worker thread will pop tasks from its own task queue. If there are no tasks in its own queue, it will try to randomly steal tasks from the shared work queues or other workers. If it fails to find tasks from both shared queues or other threads, it will go to sleep.
ForkJoinPool.WorkQueue Sınıfı
3 tane temel metod sunuyor. Açıklaması şöyle
push: used by worker thread to push task to the top of its own work queue
pop: used by worker thread to pop task from the top of its own work
poll: used by other thread to steal task from the bottom of the work queue of a different thread

12 Haziran 2019 Çarşamba

ForkJoinPool Sınıfı - Fork/Join Framework'ün En Temel Sınıfı

Giriş
Şu satırı dahil ederiz. Java 7 ile geldi.
import java.util.concurrent.ForkJoinPool;
Açıklaması şöyle.
Unfortunately, ForkJoinPool does not work well in the face of Thread.sleep(), because it designed for many short tasks that finish quickly, rather than tasks that block for a long time.
Şeklen şöyle

Önemli arayüzler şöyle
RecursiveAction : Sonuç dönmez
RecursiveTask : Sonuç döner

ExecutorService'ten Farkı Nedir?
Açıklaması şöyle. İşleri daha küçük parçalara (smaller recursive tasks) bölmek için tasarlanmıştır
A ForkJoinPool works on the work-stealing principle and was added in Java 7. The ForkJoinPool is similar to the ExecutorService, but with one difference being that the ForkJoinPool splits work units into smaller tasks (fork process) and then is submitted to the thread pool.

This is called the forking step and is a recursive process. The forking process continues until a limit is reached when it is impossible to split into further sub-tasks. All the sub-tasks are executed, and the main task waits until all the sub-tasks have finished their execution. The main task joins all the individual results and returns a final single result. This is the joining process, where the results are collated and a single data is built as an end result.
Work Stealing Nedir?
ForkJoinPool Work Stealing Nedir? yazısına taşıdım


Callable veya Runnable İş Ekleme
Açıklaması şöyle. Yani executor'dan çok farkı yok. 
Also be aware that ForkJoinPool not only allows the submission of ForkJoinTasks, it also allows the submission of Callable or Runnable tasks, so you can use ForkJoinPool in the same way that you could use the existing Executors.
The only difference would be that your task won’t split itself, but you could benefit from work stealing performance improvements if multiple tasks are submitted and there are some threads with less work than others.
ForkJoinTask İş Ekleme - İş Bölünebilir Olmalı
Buradaki avantaj eğer iş bölünebilir bir şeyse bir sürü thread tarafından çalıştırılıyor

execute()
invoke()
submit() 
metodları kullanılabilir. 
Hepsi parametre olarak ForkJoinTask nesnesi alırlar. ForkJoinTask soyut bir sınıftır bundan kalıtan 
RecursiveAction ve RecursiveTask yazılarına bakabilirsiniz. Açıklaması şöyle
There are two ways to submit a task to a ForkJoinPool:

RecursiveAction - A task which does not return any value. It does some work (e.g. copying a file from disk to a remote location and then exit). It may still need to break up its work into smaller chunks, which can be executed by independent threads or CPUs. A RecursiveAction can be implemented by sub-classing it.

RecursiveTask - A task that returns a result to the ForkJoinPool. It may split its work up into smaller tasks and merge the result of the smaller tasks into one result. The splitting of work into sub-tasks and merging may take place at several levels.,ScheduledThreadPoolExecutor,ForkJoinPool
constructor
Şöyle kurarız.
ForkJoinPool pool = new ForkJoinPool();  
constructor - int
Şöyle yaparız.
ForkJoinPool pool = new ForkJoinPool(3);
constructor - int parallelism, ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode
Örnek ver

commonPool - Hazır Bir ForkJoinPool
Java 8'den itibaren JVM içinde ortak bir thread pool kullanılmaya başlandı. Açıklaması şöyle.
ForkJoinPool#commonPool() is a static thread-pool, which is lazily initialized when is actually needed. Two major concepts use the commonPool inside JDK: CompletableFuture and  Parallel Streams. There is one small difference between those two features: with  CompletableFuture, you are able to specify your own thread-pool and don't use the threads from the commonPool, you cannot in case of  Parallel Streams.
Pool içinde normalde işlemci sayısı - 1 kadar thread bulunur. Bu sayıyı değiştirmek için şöyle yaparırz
-Djava.util.concurrent.ForkJoinPool.common.parallelism=1000
Örnek
Şöyle yaparız
ForkJoinPool pool = ForkJoinPool.commonPool();  
execute metodu
İmzası şöyle.
public void execute(Runnable task)  
public void execute(ForkJoinTask<?> task)  
invoke metodu
İmzası şöyle. Aynı callable gibi bir sonuç döndürür. 
public <T> T invoke(ForkJoinTask<T> task)  
managedBlock metodu
Açıklaması şöyle
With managed blocking, the thread tells the thread pool that it may be blocked before it calls the potentially blocking method, and also informs the pool when the blocking method is finished. The thread pool then knows that there is a risk of a starvation deadlock, and may spawn additional threads if all of its threads are currently in some blocking operation and there are still other tasks to execute. 
Örnek
Şöyle yaparız
class BlockingGetUnicorns implements ForkJoinPool.ManagedBlocker {
  @Override
  public boolean block() {
    ...
  }
  @Override
  public boolean isReleasable() { return false; }
}

CompletableFuture<List<String>> fetchUnicorns  = 
  CompletableFuture.supplyAsync(() -> {
    BlockingGetUnicorns getThem = new BlockingGetUnicorns();
    try {
      ForkJoinPool.managedBlock(getThem);
     } catch (InterruptedException ex) {
       throw new AssertionError();
     }
     return getThem.unicorns;
});
shutdown metodu - Durdurma
Şöyle yaparız.
pool.shutdown();  
pool.awaitTermination(1, TimeUnit.MINUTES);  
submit metodu - Runnable veya Callable veya RecursiveAction veya RecursiveTask
İmzası şöyle. Future döndürür. 
public ForkJoinTask<?> submit(Runnable task)  
  
public <T> ForkJoinTask<T> submit(Runnable task, T result)  
  
public <T> ForkJoinTask<T> submit(Callable<T> task)  
  
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)  
Örnek - RecursiveTask
Şöyle yaparız.
pool.submit(new RecursiveTaskA()); 
Örnek - RecursiveTask
Şöyle yaparız.
ForkJoinPool pool = new ForkJoinPool(3);
ForkJoinTask<F.Promise<Void>> result = pool
  .submit(() -> {
    ...
    return F.Promise.<Void>pure(null);
});

15 Kasım 2017 Çarşamba

RecursiveAction Sınıfı - Sonuç Dönmez

Giriş
İskeleti şöyle. Verilen işi daha küçük parçaya bölerek yeni bir RecursiveAction yaratır.
public class Tasker extends RecursiveAction {
  private final List<String> myList;

  public Tasker(List<String> list) {
    this.myList = list;
  }

  @Override
  protected void compute() {
    ...
  }
}
Bu sınıf bir sonuç dönmez. Yani bir nevi Runnable gibidir. Örneğin dosyayı bir yerden bir yere kopyalamak için kullanılabilir.

compute metodu
Örnek
Şöyle yaparız. Burada iş sadece left ve right olarak bölünerek devam ediyor. left'in veya right'in bitmesini beklemeye gerek yok.
class Task extends RecursiveAction {
  private List<Integer> list;

  public Task(List<Integer> list) {
    this.list = list;
  }

  @Override
  protected void compute() {
    if (list.size() >= 10) {
      int size = (list.size() / 2)+1;
      List<Integer> leftPartition = list.subList(0, size);
      List<Integer> rightPartition = list.subList(size, list.size());
      invokeAll(new Task(leftPartition),new Task(rightPartition));
    } else {
      print();
    }
  }

  private void print() {
    for (int i = 0; i < list.size(); i++)
      System.out.println("Thread:" + Thread.currentThread() + ",Value:" + list.get(i));
  }
}
Örnek
Şöyle yaparız. Burada left ayrı bir thread içinde çalışır, right'ı ben hesaplarım
@Override
protected void compute() {
  if(myList.size()==1){
    ...
  }
  else{
    List<String> temp = new ArrayList<>();
    temp.add (myList.get( myList.size()-1 )  );
    myList.remove( myList.size()-1 );

    MyAction left = new Tasker(myList);
    MyAction right = new Tasker(temp);

    left.fork();
    right.compute();
    left.join();
  }
}
Örnek
Elimizde şöyle bir kod olsun. Aslında bu kodun bir özelliği yok. Sanki yeni bir Thread başlatıp içine Runnable listesi vermiş gibiyiz. Her Runnable sırayla çalıştırılır.
public class TaskExecutor extends RecursiveAction {
  private List<Runnable> tasks;
  private int start;
  private int end;
  private AtomicInteger nextTaskIndex;

  public TaskExecutor(List<Runnable> tasks, int start, int end, 
    AtomicInteger nextTaskIndex) {
    this.tasks = tasks;
    this.start = start;
    this.end = end;
    this.nextTaskIndex = nextTaskIndex;
  }

  @Override
  protected void compute() {
     int taskIndex = nextTaskIndex.getAndIncrement();
     while (taskIndex < end) {
        tasks.get(taskIndex).run();
        System.out.println("Task " + taskIndex + " has completed on thread " 
          + Thread.currentThread().getName());
        taskIndex = nextTaskIndex.getAndIncrement();
      }
  }
}
Kullanmak için şöyle yaparız
public static void main(String[] args) {
  // Create a list of tasks to execute
  List<Runnable> tasks = ...

  // Create a Fork/Join pool with the default number of threads
  ForkJoinPool pool = new ForkJoinPool();

  // Create a task to execute the list of tasks in order
  AtomicInteger nextTaskIndex = new AtomicInteger(0);
  TaskExecutor task = new TaskExecutor(tasks, 0, tasks.size(), nextTaskIndex);

  // Submit the task to the pool and wait for it to complete
  pool.invoke(task);
}

Örnek
Örnek
Şöyle yaparız. Burada her action bir daha fork() metodunu çağırmıyor. tanımlı ForkJoinPool nesnesine submit() çağrısı yapıyor
ForkJoinPool commonPool = new ForkJoinPool(2000);

commonPool.submit(new UserFlowRecursiveAction(IntStream.rangeClosed(1, 10)
  .boxed()
  .collect(Collectors.toList())));

// Stop Condition
commonPool.shutdown();
commonPool.awaitTermination(60, TimeUnit.SECONDS);

public class UserFlowRecursiveAction extends RecursiveAction {

  private final List<Integer> workload;

  public UserFlowRecursiveAction(List<Integer> workload) {
    this.workload = workload;
  }

  @Override
  protected void compute() {
    if (workload.size() > 1) {
      commonPool.submit(new UserFlowRecursiveAction(workload.subList(1, workload.size())));
    }

    int user = workload.get(0);

    ForkJoinTask<String> taskA = commonPool.submit(() -> ...);
    ForkJoinTask<String> taskB = commonPool.submit(() -> ...);

    IntStream.rangeClosed(1, 3
      .forEach(i -> commonPool.submit(() -> foo(taskA.join(), taskB.join())));
  }
}