ForkJoinPool is the core of the concurrency frameworks which follows divide and conquer approach in completing the tasks.
Fork – Recursively break the single task into multiple smaller tasks till the threshold.
Join – Join the multiple smaller tasks results and produce the output recursively.
Initialise the maximum number of worker threads.
1 |
ForkJoinPool forkJoinPool = new ForkJoinPool(3); |
RecursiveAction class is extended to recursively split the inputs and put in the ForkJoinPool. compute() method will be called, when the recursive action class is invoked by ForkJoinTask.
Below statement will call the compute method of recursive objects again to split further recursively. This will split the content into different sub content, and after execution of worker threads, the task result will be joined.
1 |
ForkJoinTask.invokeAll(subArrays); |
invoke() method forks the task and waits for the result, and it doesn’t require any manual join, and will return the task result.
1 |
forkJoinPool.invoke(myRecursiveAction); |
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
import java.util.concurrent.*; import java.util.*; public class Example { static int rows = 20; static int cols = 20; static int[][] xyValues = new int[rows][cols]; public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(3); MyRecursiveAction myRecursiveAction = new MyRecursiveAction(0, xyValues.length); forkJoinPool.invoke(myRecursiveAction); try { Thread.sleep(2000); }catch(Exception e){} System.out.println("Result:"); for(int i=0;i<rows;i++) { for(int j=0;j<cols;j++) { System.out.print(Example.xyValues[i][j] + ","); } System.out.println(); } System.out.println("END"); } } public class MyRecursiveAction extends RecursiveAction { private int startSize = 0; private int arraySize = 0; final int THRESHOLD = 2; public MyRecursiveAction(int startLength, int arrayLength) { this.startSize = startLength; this.arraySize = arrayLength; } @Override protected void compute() { if(arraySize-startSize > THRESHOLD) { System.out.println("Divide the array to execute : startIndex: " + startSize + ", Length: " + this.arraySize); List<MyRecursiveAction> subArrays = new ArrayList<MyRecursiveAction>(); int middleSize = (startSize + arraySize)/2; MyRecursiveAction firstHalfAction = new MyRecursiveAction(startSize, middleSize); MyRecursiveAction secHalfAction = new MyRecursiveAction(middleSize, arraySize); subArrays.add(firstHalfAction); subArrays.add(secHalfAction); ForkJoinTask.invokeAll(subArrays); } else { processJob(startSize, arraySize); } } protected void processJob(int startSize, int arraySize) { System.out.println("Processing the array to execute : startIndex: " + startSize + ", Length: " + this.arraySize + " - " + Thread.currentThread().getName()); for(int i=startSize;i<arraySize;i++) { for(int j=0;j<Example.cols;j++) { Example.xyValues[i][j] = 1; } } } } |
Result:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
Divide the array to execute : startIndex: 0, Length: 20 Divide the array to execute : startIndex: 0, Length: 10 Divide the array to execute : startIndex: 10, Length: 20 Divide the array to execute : startIndex: 10, Length: 15 Processing the array to execute : startIndex: 10, Length: 12 - ForkJoinPool-1-worker-2 Divide the array to execute : startIndex: 12, Length: 15 Divide the array to execute : startIndex: 0, Length: 5 Processing the array to execute : startIndex: 0, Length: 2 - ForkJoinPool-1-worker-1 Divide the array to execute : startIndex: 2, Length: 5 Processing the array to execute : startIndex: 2, Length: 3 - ForkJoinPool-1-worker-1 Processing the array to execute : startIndex: 3, Length: 5 - ForkJoinPool-1-worker-1 Processing the array to execute : startIndex: 12, Length: 13 - ForkJoinPool-1-worker-2 Processing the array to execute : startIndex: 13, Length: 15 - ForkJoinPool-1-worker-2 Divide the array to execute : startIndex: 15, Length: 20 Processing the array to execute : startIndex: 15, Length: 17 - ForkJoinPool-1-worker-2 Divide the array to execute : startIndex: 17, Length: 20 Divide the array to execute : startIndex: 5, Length: 10 Processing the array to execute : startIndex: 5, Length: 7 - ForkJoinPool-1-worker-3 Divide the array to execute : startIndex: 7, Length: 10 Processing the array to execute : startIndex: 7, Length: 8 - ForkJoinPool-1-worker-3 Processing the array to execute : startIndex: 8, Length: 10 - ForkJoinPool-1-worker-3 Processing the array to execute : startIndex: 18, Length: 20 - ForkJoinPool-1-worker-3 Processing the array to execute : startIndex: 17, Length: 18 - ForkJoinPool-1-worker-2 Result: 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, END |
1 |