在Java中,ForkJoin
框架是并行编程的一个重要工具,它主要用于处理可以分解为多个子任务的复杂任务。ForkJoin
框架的核心是ForkJoinPool
,它是一个线程池,专门用于执行ForkJoinTask
任务。通过将大任务分解为多个小任务,并在多个线程中并行执行这些小任务,ForkJoin
框架可以显著提高程序的执行效率。
1.核心组件
• ForkJoinPool
• 这是ForkJoin
框架的核心线程池,用于管理和调度ForkJoinTask
任务。
• 它使用工作窃取算法(work-stealing algorithm),允许空闲的线程从其他线程的任务队列中窃取任务来执行,从而提高线程的利用率。
• 示例代码:
ForkJoinPool pool = new ForkJoinPool();
• ForkJoinTask
• 这是ForkJoin
框架中任务的抽象基类,所有自定义的任务都需要继承这个类。
• 通常使用RecursiveTask
(有返回值的任务)或RecursiveAction
(无返回值的任务)这两个子类来实现具体任务。
• 示例代码:
public class MyTask extends RecursiveTask<Integer> {
private int threshold;
private int start;
private int end;
public MyTask(int threshold, int start, int end) {
this.threshold = threshold;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start <= threshold) {
for (int i = start; i < end; i++) {
sum += i;
}
} else {
int middle = (start + end) / 2;
MyTask leftTask = new MyTask(threshold, start, middle);
MyTask rightTask = new MyTask(threshold, middle, end);
leftTask.fork(); // 异步执行子任务
rightTask.fork(); // 异步执行子任务
sum = leftTask.join() + rightTask.join(); // 等待子任务完成并获取结果
}
return sum;
}
}
2.工作窃取算法
• 工作窃取算法是一种高效的线程调度算法,用于解决线程空闲时的负载均衡问题。
• 每个线程都有自己的双端队列(deque),用于存储任务。
• 当一个线程的任务队列为空时,它可以从其他线程的任务队列中“窃取”任务来执行。
• 这种算法可以有效减少线程的空闲时间,提高线程的利用率。
3.使用示例
• 下面是一个完整的使用ForkJoin
框架计算数组和的示例:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinExample {
public static void main(String[] args) {
int[] array = new int[1000000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
int sum = pool.invoke(task);
System.out.println("Sum: " + sum);
}
}
class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 1000;
private int[] array;
private int start;
private int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, middle);
SumTask rightTask = new SumTask(array, middle, end);
leftTask.fork();
rightTask.fork();
return leftTask.join() + rightTask.join();
}
}
}
4.优势
• 并行处理:通过将任务分解为多个子任务,并在多个线程中并行执行,可以显著提高程序的执行效率。
• 负载均衡:工作窃取算法可以有效解决线程空闲时的负载均衡问题,提高线程的利用率。
• 易于使用:ForkJoin
框架提供了简单易用的API,使得并行任务的实现变得非常方便。
5.注意事项
• 任务分解粒度:任务分解的粒度需要适中。如果任务分解得太细,会导致线程的调度开销过大;如果任务分解得太粗,又无法充分利用多核CPU的优势。
• 线程池大小:合理配置ForkJoinPool
的线程池大小,通常建议设置为CPU核心数的两倍左右。
• 线程安全:虽然ForkJoin
框架本身是线程安全的,但在任务执行过程中,如果需要访问共享资源,仍然需要注意线程安全问题。
通过合理使用ForkJoin
框架,可以有效提高Java程序的并行处理能力,从而提升程序的性能。
public class ForkJoinPoolTest extends RecursiveTask<Long> {
private static final int THRESHOLD = 500;
long[] array;
int start;
int end;
ForkJoinPoolTest(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
public static void main(String[] args) {
long[] arrays = new long[1000];
long expecteSum = 0;
for (int i = 0; i < arrays.length; i++) {
arrays[i] = i;
expecteSum = expecteSum + arrays[i];
}
System.out.println(expecteSum);
ForkJoinPoolTest forkJoinPoolTest = new ForkJoinPoolTest(arrays, 0, arrays.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(forkJoinPoolTest);
long endTime = System.currentTimeMillis();
System.out.println("fork/join sum:" + result + ",耗时:" + (endTime - startTime));
}
@Override
protected Long compute() {
//如果任务太小
if (end - start <= THRESHOLD) {
System.out.println("start=" + start + ",end=" + end);
long sum = 0;
for (int i = start; i < end; i++) {
sum = sum + this.array[i];
try {
TimeUnit.MILLISECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return sum;
}
int middle = (end + start) / 2;
ForkJoinPoolTest joinPoolTest1 = new ForkJoinPoolTest(this.array, start, middle);
ForkJoinPoolTest joinPoolTest2 = new ForkJoinPoolTest(this.array, middle, end);
invokeAll(joinPoolTest1, joinPoolTest2);
Long result1 = joinPoolTest1.join();
Long result2 = joinPoolTest2.join();
long result = result1 + result2;
System.out.println("result = " + (result1 + result2));
return result;
}
}