forkjoinpool框架原理

声明 本文是看大神博客的笔记.之所以写是为了记忆. 好记性不如烂笔头 原文地址Ressmix

思想

FJ框架的基本思想就是将一个大任务分解fork成一系列的子任务. 当多个不同的子任务都执行完毕后,. 可以将他们各自的结果合并join成一个大结果集. 最终合并成大任务的结果.

工作窃取算法

通过思想我们可以看出, 需要一些线程来执行F出的任务. 每次都创建和销毁线程, 开销很大. 所以FK框架利用了线程池来调度任务. 既然调度必然有两个要素. 1 工作线程和任务队列. 一般的线程池只有一个任务队列. 但是对于FK来说. F出各个子任务其实是平行关系. 为了提高效率减少线程竞争. 应该将这些平行的任务放在不同的队列中去. 大任务分成子任务 子1 子2 子3. 必然会对应三个任务队列. 在创建三个工作线程于队列一一对应.

由于线程处理不同的任务. 执行效率一定不一致, 有的线程先执行完比. 有的还在执行中, 我们希望执行完毕的线程去帮忙. 这就需要他出别的任务队列中窃取任务来执行. 这就是所谓的工作窃取算法

核心组件

  • ForkJoinPool: ExcutorService的实现类. 负责工作线程的管理. 任务队列的维护 任务的调度流程
  • ForkJoinTask: Future接口的实现类, fork是核心方法. 用于分解任务并异步执行. 而join方法在任务结果计算完毕后才会运行. 用来合并或返回计算结果.
  • ForkJoinWokerThread: Thread的子类. 作为线程池中的工作线程Worker执行任务
  • WorkQueue: 任务队列 用于保存任务

ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

*@param parallelism 并行级别 默认为CPU的核心数
*@param factory 工作线程工厂
*@param handler 异常处理器
*@param mode 调度模式 true FIFO_queue 队列模式 false 表示LIFO_queue 栈模式
*@param workerNamePrefix 工作线程的名称前缀
* ctl 和线程池中的是一样的 表示状态和线程数量的核心
*/
private (int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}


public (int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}

ForkJoinTask

抽象类 FK框架提供了两个实现类

RecursiveAction: 没有返回结果的FJ任务
RecursiveTask: 有返回结果的FJ任务.

ForkJoinWokerThread

在FJ框架中 每个工作线程worker都要自己的一个任务队列. 所以需要对标准的Thread做一些特性化处理. 例如ForkJoinWokerThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21


public class ForkJoinWorkerThread extends Thread {

final ForkJoinPool pool; // 该工作线程归属的线程池
final ForkJoinPool.WorkQueue workQueue; // 对应的任务队列


* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}

}

pool.registerWorker(this); 将所属线程池信息与自己绑定的队列信息 通过这个方法注册.

WorkQueue

这是与其他线程池最大的区别. 在FJP内部. 维护着一个WorkQueue[]数组, 它会在外部首次提交任务时初始化.

WorkQueue作为FJP的内部类. 表示双端队列. 意味着既可以作为栈使用 也可以作为队列使用. FJP正是利用了这个特点. 当工作线程从自己的队列中获取任务时, 默认总是以栈的方式在栈顶获取任务. 当工作线程尝试获取其他任务队列的任务时. 则以队列的方式.

FJP的工作队列分两类:

  • 有工作线程绑定的任务队列: 数组下标为奇数. 称为task queue 队列中的任务均由工作线程调用产生 由FutureTask.fork方法产生
  • 无工作线程绑定的任务对列: 数组下标为偶数. 称之为submissions queue. 由execute/submit/invoke或者FutureTask.fork产生

线程池调度示例

假设现在通过FJPsubmit方法提交了一个FutureTask任务 参考使用示例

初始

初始状态下 线程池的队列为空 workQueues = null. 也没有工作线程:

20191007122622.png

外部提交FT任务

初始化任务队列数组WorkQueues[] 大小为2的每次. 然后在某个槽位(偶数位)初始化一个任务队列WorkQueue. 并插入任务:

20191007122820.png

是非工作线程通过外部方法提交的任务. 所以这个任务队列并没绑定工作线程.

创建工作线程

首次提交任务后. 没有工作线程. 所以会创建工作线程, 同时在奇数槽的位置创建一个和它绑定的任务队列.

20191007123047.png

窃取任务

FJWT_1会随机扫描workQueues中的队列, 直到找到一个可以窃取的队列—workQueues[2] 然后从队列的base段获取任务并执行. 并将base+1.

20191007123250.png

窃取到的任务时FT, FJWT_1最终会调用它的compute方法. 该方法会新建两个子类任务. 并执行它们的fork方法: 参考示例的方法.

由于是工作线程FJWT_1来调用FT的fork方法, 所以会将这两个子任务放入FJWT_1自身队列中:

20191007123547.png

然后FJWT_1会阻塞等到任务1和任务2的结果

1
2
long sum1 = subtask1.join(); // 现在这里等待
long sum2 = subtask2.join();

可以看出任务放在哪个队列是调用线程决定的. 如果调用线程是工作线程, 则必然有自己的队列task queue. 将任务放入自己的队列中. 如果调用线程是其他线程如主线程. 则创建没有工作线程绑定的任务对sumissions queue 然后存入

新的工作线程

FJWT_1调用两个子任务1和2的fork方法. 除了将它们放入自己的任务队列外, 还会导致新增一个工作线程FJWT_2.

20191007124045.png

FJWT_2运行后会像FJWT_1那样从其队列窃取任务. 从FJWT_1队列的base端窃取一个任务(直接执行. 并不会放入自己队列)

20191007124529.png

窃取完成后, FJWT_2会执行执行任务1. 又回到了FT子类的compute方法. 假设此时又fork出两个任务—-任务3和任务4. 则FJWT_2最终会在任务3的join方法上等待.

20191007124726.png

加色FJWT_2本其他某个工作线程唤醒. 任务3和任务4的join方法依次返回了结果, 任务1的结果也会返回. 于是FJWT_1也会被唤醒(他在任务1的join上等待), 然后FJWT_1会继续执行任务2的join方法. 如果任务2不再分解, 则最终任务1和任务2的合并结果, 计算结束.

自身队列的任务执行

FJWT_1和FJWT_2唤醒执行完窃取到的任务后, 还没有结束, 它们还会看看自身队列中有无任务可以执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}

runTask方法中 doExec()是执行窃取的任务. 而execLocalTasks()用于执行队列本身的任务

假设此时的线程池是下面的状态:

20191007125430.png

那么FJWT_1调用execLocalTasks()方法一次执行自己队列中的所有任务. 分两种情况:

  • 异步模式(asyncMode = true) 如果在构造线程池时. 设置为true. 表示以异步模式执行工作线程自身队列中的任务. 此时会从base - > top遍历并并行所有任务
  • 同步弄湿(asyncMode = false 默认值) 表示同步执行工作线程自身队列中的任务, 此时会从top - > base遍历并执行所有任务

任务入队总是从top端, 所以当以同步模式遍历时, 其相当于出栈操作 pop
如果异步, 相当于队列的出队曹汝 从base端poll元素
异步模式比较适用于不需要返回值的任务.

默认是同步模式, FJWT_1从栈顶开始执行并移除任务. 先执行任务2并移除, 在执行任务1

20191007131527.png
20191007131535.png

请参考Fork/Join框架分析