15 Kasım 2017 Çarşamba

RecursiveAction Sınıfı - Sonuç Dönmez

Giriş
İskeleti şöyle. Verilen işi daha küçük parçaya bölerek yeni bir RecursiveAction yaratır.
public class Tasker extends RecursiveAction {
  private final List<String> myList;

  public Tasker(List<String> list) {
    this.myList = list;
  }

  @Override
  protected void compute() {
    ...
  }
}
Bu sınıf bir sonuç dönmez. Yani bir nevi Runnable gibidir. Örneğin dosyayı bir yerden bir yere kopyalamak için kullanılabilir.

compute metodu
Örnek
Şöyle yaparız. Burada iş sadece left ve right olarak bölünerek devam ediyor. left'in veya right'in bitmesini beklemeye gerek yok.
class Task extends RecursiveAction {
  private List<Integer> list;

  public Task(List<Integer> list) {
    this.list = list;
  }

  @Override
  protected void compute() {
    if (list.size() >= 10) {
      int size = (list.size() / 2)+1;
      List<Integer> leftPartition = list.subList(0, size);
      List<Integer> rightPartition = list.subList(size, list.size());
      invokeAll(new Task(leftPartition),new Task(rightPartition));
    } else {
      print();
    }
  }

  private void print() {
    for (int i = 0; i < list.size(); i++)
      System.out.println("Thread:" + Thread.currentThread() + ",Value:" + list.get(i));
  }
}
Örnek
Şöyle yaparız. Burada left ayrı bir thread içinde çalışır, right'ı ben hesaplarım
@Override
protected void compute() {
  if(myList.size()==1){
    ...
  }
  else{
    List<String> temp = new ArrayList<>();
    temp.add (myList.get( myList.size()-1 )  );
    myList.remove( myList.size()-1 );

    MyAction left = new Tasker(myList);
    MyAction right = new Tasker(temp);

    left.fork();
    right.compute();
    left.join();
  }
}
Örnek
Elimizde şöyle bir kod olsun. Aslında bu kodun bir özelliği yok. Sanki yeni bir Thread başlatıp içine Runnable listesi vermiş gibiyiz. Her Runnable sırayla çalıştırılır.
public class TaskExecutor extends RecursiveAction {
  private List<Runnable> tasks;
  private int start;
  private int end;
  private AtomicInteger nextTaskIndex;

  public TaskExecutor(List<Runnable> tasks, int start, int end, 
    AtomicInteger nextTaskIndex) {
    this.tasks = tasks;
    this.start = start;
    this.end = end;
    this.nextTaskIndex = nextTaskIndex;
  }

  @Override
  protected void compute() {
     int taskIndex = nextTaskIndex.getAndIncrement();
     while (taskIndex < end) {
        tasks.get(taskIndex).run();
        System.out.println("Task " + taskIndex + " has completed on thread " 
          + Thread.currentThread().getName());
        taskIndex = nextTaskIndex.getAndIncrement();
      }
  }
}
Kullanmak için şöyle yaparız
public static void main(String[] args) {
  // Create a list of tasks to execute
  List<Runnable> tasks = ...

  // Create a Fork/Join pool with the default number of threads
  ForkJoinPool pool = new ForkJoinPool();

  // Create a task to execute the list of tasks in order
  AtomicInteger nextTaskIndex = new AtomicInteger(0);
  TaskExecutor task = new TaskExecutor(tasks, 0, tasks.size(), nextTaskIndex);

  // Submit the task to the pool and wait for it to complete
  pool.invoke(task);
}

Örnek
Örnek
Şöyle yaparız. Burada her action bir daha fork() metodunu çağırmıyor. tanımlı ForkJoinPool nesnesine submit() çağrısı yapıyor
ForkJoinPool commonPool = new ForkJoinPool(2000);

commonPool.submit(new UserFlowRecursiveAction(IntStream.rangeClosed(1, 10)
  .boxed()
  .collect(Collectors.toList())));

// Stop Condition
commonPool.shutdown();
commonPool.awaitTermination(60, TimeUnit.SECONDS);

public class UserFlowRecursiveAction extends RecursiveAction {

  private final List<Integer> workload;

  public UserFlowRecursiveAction(List<Integer> workload) {
    this.workload = workload;
  }

  @Override
  protected void compute() {
    if (workload.size() > 1) {
      commonPool.submit(new UserFlowRecursiveAction(workload.subList(1, workload.size())));
    }

    int user = workload.get(0);

    ForkJoinTask<String> taskA = commonPool.submit(() -> ...);
    ForkJoinTask<String> taskB = commonPool.submit(() -> ...);

    IntStream.rangeClosed(1, 3
      .forEach(i -> commonPool.submit(() -> foo(taskA.join(), taskB.join())));
  }
}

Hiç yorum yok:

Yorum Gönder