Giriş
İskeleti şöyle. Verilen işi daha küçük parçaya bölerek yeni bir RecursiveAction yaratır.
compute metodu
Örnek
İskeleti şöyle. Verilen işi daha küçük parçaya bölerek yeni bir RecursiveAction yaratır.
public class Tasker extends RecursiveAction {
private final List<String> myList;
public Tasker(List<String> list) {
this.myList = list;
}
@Override
protected void compute() {
...
}
}
Bu sınıf bir sonuç dönmez. Yani bir nevi Runnable gibidir. Örneğin dosyayı bir yerden bir yere kopyalamak için kullanılabilir.compute metodu
Örnek
Şöyle yaparız. Burada iş sadece left ve right olarak bölünerek devam ediyor. left'in veya right'in bitmesini beklemeye gerek yok.
class Task extends RecursiveAction {private List<Integer> list;public Task(List<Integer> list) {this.list = list;}@Overrideprotected void compute() {if (list.size() >= 10) {int size = (list.size() / 2)+1;List<Integer> leftPartition = list.subList(0, size);List<Integer> rightPartition = list.subList(size, list.size());invokeAll(new Task(leftPartition),new Task(rightPartition));} else {print();}}private void print() {for (int i = 0; i < list.size(); i++)System.out.println("Thread:" + Thread.currentThread() + ",Value:" + list.get(i));}}
Örnek
Şöyle yaparız. Burada left ayrı bir thread içinde çalışır, right'ı ben hesaplarım
Örnek
Örnek
@Override
protected void compute() {
if(myList.size()==1){
...
}
else{
List<String> temp = new ArrayList<>();
temp.add (myList.get( myList.size()-1 ) );
myList.remove( myList.size()-1 );
MyAction left = new Tasker(myList);
MyAction right = new Tasker(temp);
left.fork();
right.compute();
left.join();
}
}
Örnek
Elimizde şöyle bir kod olsun. Aslında bu kodun bir özelliği yok. Sanki yeni bir Thread başlatıp içine Runnable listesi vermiş gibiyiz. Her Runnable sırayla çalıştırılır.
public class TaskExecutor extends RecursiveAction {
private List<Runnable> tasks;
private int start;
private int end;
private AtomicInteger nextTaskIndex;
public TaskExecutor(List<Runnable> tasks, int start, int end,
AtomicInteger nextTaskIndex) {
this.tasks = tasks;
this.start = start;
this.end = end;
this.nextTaskIndex = nextTaskIndex;
}
@Override
protected void compute() {
int taskIndex = nextTaskIndex.getAndIncrement();
while (taskIndex < end) {
tasks.get(taskIndex).run();
System.out.println("Task " + taskIndex + " has completed on thread "
+ Thread.currentThread().getName());
taskIndex = nextTaskIndex.getAndIncrement();
}
}
}
public static void main(String[] args) {
// Create a list of tasks to execute
List<Runnable> tasks = ...
// Create a Fork/Join pool with the default number of threads
ForkJoinPool pool = new ForkJoinPool();
// Create a task to execute the list of tasks in order
AtomicInteger nextTaskIndex = new AtomicInteger(0);
TaskExecutor task = new TaskExecutor(tasks, 0, tasks.size(), nextTaskIndex);
// Submit the task to the pool and wait for it to complete
pool.invoke(task);
}
Şöyle yaparız. Burada her action bir daha fork() metodunu çağırmıyor. tanımlı ForkJoinPool nesnesine submit() çağrısı yapıyor
ForkJoinPool commonPool = new ForkJoinPool(2000);commonPool.submit(new UserFlowRecursiveAction(IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList())));// Stop ConditioncommonPool.shutdown();commonPool.awaitTermination(60, TimeUnit.SECONDS);public class UserFlowRecursiveAction extends RecursiveAction {private final List<Integer> workload;public UserFlowRecursiveAction(List<Integer> workload) {this.workload = workload;}@Overrideprotected void compute() {if (workload.size() > 1) {commonPool.submit(new UserFlowRecursiveAction(workload.subList(1, workload.size())));}int user = workload.get(0);ForkJoinTask<String> taskA = commonPool.submit(() -> ...);ForkJoinTask<String> taskB = commonPool.submit(() -> ...);IntStream.rangeClosed(1, 3.forEach(i -> commonPool.submit(() -> foo(taskA.join(), taskB.join())));}}
Hiç yorum yok:
Yorum Gönder