25 Şubat 2020 Salı

RecursiveTask Sınıfı - Sonuç Döner

Giriş
Şu satırı dahil ederiz
import java.util.concurrent.RecursiveTask;
İstenilen CHUNK büyüklüğüne erişinceye kadar task'lar fork edilir. İstenilen büyüklükteki listeyi işleyecek bir metod yazılır

Bazı Örnekler
- Bir dizideki en büyük sayıyı bulmak
- Bir dizideki belli bir önek ile başlayan string sayısını bulmak
- Bir dizideki belirli bir kelime sayısını bulmak

Örnek
Şöyle yaparız. Sayıları 100'lük parçalara böler ve her parçadaki toplamı döner.
class SumTask extends RecursiveTask<Long> {
  private static final int THRESHOLD = 100;
  private long[] numbers;
  private int start;
  private int end;

  public SumTask(long[] numbers, int start, int end) {
    this.numbers = numbers;
    this.start = start;
    this.end = end;
  }

  @Override
  protected Long compute() {
    int length = end - start;
    if (length <= THRESHOLD) {
      long sum = 0;
      for (int i = start; i < end; i++) {
        sum += numbers[i];
      }
      return sum;
    } else {
      int middle = start + length / 2;
      SumTask leftTask = new SumTask(numbers, start, middle);
      SumTask rightTask = new SumTask(numbers, middle, end);

      leftTask.fork();
      Long rightResult = rightTask.compute();
      Long leftResult = leftTask.join();
      return leftResult + rightResult;
    }
  }
}
Kullanmak için şöyle yaparız. 10 milyon tane sayısı toplar
public static void main(String[] args) {
  long[] numbers = new long[10_00_000];
  for (int i = 0; i < numbers.length; i++) {
    numbers[i] = i;
  }
  ForkJoinPool pool = new ForkJoinPool();
  SumTask task = new SumTask(numbers, 0, numbers.length);
  long result = pool.invoke(task);
  System.out.println("Sum: " + result);
  pool.shutdown();
}
Örnek
Bir müşteri listesine ait bilgileri çekmek isteyelim. Şöyle yaparız. Aslında gerçek kod biraz daha farklıydı ancak okunaklı değildi. Ben okunaklı olsun diye biraz değiştirdim
List<Long> customerIds = ...

FetchCustomerRecursiveTask fetchCustomerRecursiveTask = new FetchCustomerRecursiveTask(customerIds);

ForkJoinPool forkJoinCommonPool = ForkJoinPool.commonPool();

List<Customer> customerWithTotalAmountSpendList = forkJoinCommonPool .invoke(fetchCustomerRecursiveTask);
Ana task sınıfı şöyle
public class FetchCustomerRecursiveTask extends RecursiveTask<List<Customer>> { private static final int CHUNK_SIZE= 2; private List<Long> customerIds; public FetchCustomerRecursiveTask(List<Long> customerIds) { this.customerIds = customerIds; } @Override protected List<Customer> compute() { int listSize = customerIds.size(); if (listSize <= CHUNK_SIZE) { return processIdList(); //Process the list } int listMiddle = listSize / 2; //Split the list FetchCustomerRecursiveTask leftSide = new FetchCustomerRecursiveTask(customerIds.subList(0, listMiddle)); FetchCustomerRecursiveTask rightSide = new FetchCustomerRecursiveTask( customerIds.subList(listMiddle, listSize)); leftSide.fork(); //Fork rightSide.fork(); List<Customer> leftSideResult = leftSide.join(); //Extract result List<Customer> rightSideResult = rightSide.join(); return Stream.of(leftSideResult, rightSideResult) //Combine results .flatMap(Collection::stream) .collect(Collectors.toList()); } }
Eğer CHUNK büyüklüğü <= 2 ise çalışacak kod şöyle
private List<Customer> processIdList() { return customerIds.stream() .map(this::fetchCustomerWithTotalAmountSpend) .collect(Collectors.toList()); } private Customer fetchCustomerWithTotalAmountSpend(Long customerId) { ... }


Hiç yorum yok:

Yorum Gönder