声明 本文是看大神博客的笔记.之所以写是为了记忆. 好记性不如烂笔头 原文地址Ressmix
思想
FJ框架的基本思想就是将一个大任务分解fork成一系列的子任务. 当多个不同的子任务都执行完毕后,. 可以将他们各自的结果合并join成一个大结果集. 最终合并成大任务的结果.
工作窃取算法
通过思想我们可以看出, 需要一些线程来执行F出的任务. 每次都创建和销毁线程, 开销很大. 所以FK框架利用了线程池来调度任务. 既然调度必然有两个要素. 1 工作线程和任务队列. 一般的线程池只有一个任务队列. 但是对于FK来说. F出各个子任务其实是平行关系. 为了提高效率减少线程竞争. 应该将这些平行的任务放在不同的队列中去. 大任务分成子任务 子1 子2 子3. 必然会对应三个任务队列. 在创建三个工作线程于队列一一对应.
由于线程处理不同的任务. 执行效率一定不一致, 有的线程先执行完比. 有的还在执行中, 我们希望执行完毕的线程去帮忙. 这就需要他出别的任务队列中窃取任务来执行. 这就是所谓的工作窃取算法
核心组件
- ForkJoinPool: ExcutorService的实现类. 负责工作线程的管理. 任务队列的维护 任务的调度流程
- ForkJoinTask: Future接口的实现类, fork是核心方法. 用于分解任务并异步执行. 而join方法在任务结果计算完毕后才会运行. 用来合并或返回计算结果.
- ForkJoinWokerThread: Thread的子类. 作为线程池中的工作线程Worker执行任务
- WorkQueue: 任务队列 用于保存任务
ForkJoinPool
1 |
|
ForkJoinTask
抽象类 FK框架提供了两个实现类
RecursiveAction: 没有返回结果的FJ任务
RecursiveTask: 有返回结果的FJ任务.
ForkJoinWokerThread
在FJ框架中 每个工作线程worker都要自己的一个任务队列. 所以需要对标准的Thread做一些特性化处理. 例如ForkJoinWokerThread
1 |
|
pool.registerWorker(this); 将所属线程池信息与自己绑定的队列信息 通过这个方法注册.
WorkQueue
这是与其他线程池最大的区别. 在FJP内部. 维护着一个WorkQueue[]数组, 它会在外部首次提交任务时初始化.
WorkQueue作为FJP的内部类. 表示双端队列. 意味着既可以作为栈使用 也可以作为队列使用. FJP正是利用了这个特点. 当工作线程从自己的队列中获取任务时, 默认总是以栈的方式在栈顶获取任务. 当工作线程尝试获取其他任务队列的任务时. 则以队列的方式.
FJP的工作队列分两类:
- 有工作线程绑定的任务队列: 数组下标为奇数. 称为task queue 队列中的任务均由工作线程调用产生 由FutureTask.fork方法产生
- 无工作线程绑定的任务对列: 数组下标为偶数. 称之为submissions queue. 由execute/submit/invoke或者FutureTask.fork产生
线程池调度示例
假设现在通过FJPsubmit方法提交了一个FutureTask任务 参考使用示例
初始
初始状态下 线程池的队列为空 workQueues = null. 也没有工作线程:

外部提交FT任务
初始化任务队列数组WorkQueues[] 大小为2的每次. 然后在某个槽位(偶数位)初始化一个任务队列WorkQueue. 并插入任务:

是非工作线程通过外部方法提交的任务. 所以这个任务队列并没绑定工作线程.
创建工作线程
首次提交任务后. 没有工作线程. 所以会创建工作线程, 同时在奇数槽的位置创建一个和它绑定的任务队列.

窃取任务
FJWT_1会随机扫描workQueues中的队列, 直到找到一个可以窃取的队列—workQueues[2] 然后从队列的base段获取任务并执行. 并将base+1.

窃取到的任务时FT, FJWT_1最终会调用它的compute方法. 该方法会新建两个子类任务. 并执行它们的fork方法: 参考示例的方法.
由于是工作线程FJWT_1来调用FT的fork方法, 所以会将这两个子任务放入FJWT_1自身队列中:

然后FJWT_1会阻塞等到任务1和任务2的结果
1 |
long sum1 = subtask1.join(); // 现在这里等待 |
可以看出任务放在哪个队列是调用线程决定的. 如果调用线程是工作线程, 则必然有自己的队列task queue. 将任务放入自己的队列中. 如果调用线程是其他线程如主线程. 则创建没有工作线程绑定的任务对sumissions queue 然后存入
新的工作线程
FJWT_1调用两个子任务1和2的fork方法. 除了将它们放入自己的任务队列外, 还会导致新增一个工作线程FJWT_2.

FJWT_2运行后会像FJWT_1那样从其队列窃取任务. 从FJWT_1队列的base端窃取一个任务(直接执行. 并不会放入自己队列)

窃取完成后, FJWT_2会执行执行任务1. 又回到了FT子类的compute方法. 假设此时又fork出两个任务—-任务3和任务4. 则FJWT_2最终会在任务3的join方法上等待.

加色FJWT_2本其他某个工作线程唤醒. 任务3和任务4的join方法依次返回了结果, 任务1的结果也会返回. 于是FJWT_1也会被唤醒(他在任务1的join上等待), 然后FJWT_1会继续执行任务2的join方法. 如果任务2不再分解, 则最终任务1和任务2的合并结果, 计算结束.
自身队列的任务执行
FJWT_1和FJWT_2唤醒执行完窃取到的任务后, 还没有结束, 它们还会看看自身队列中有无任务可以执行.
1 |
final void runTask(ForkJoinTask<?> task) { |
runTask方法中 doExec()是执行窃取的任务. 而execLocalTasks()用于执行队列本身的任务
假设此时的线程池是下面的状态:

那么FJWT_1调用execLocalTasks()方法一次执行自己队列中的所有任务. 分两种情况:
- 异步模式(asyncMode = true) 如果在构造线程池时. 设置为true. 表示以异步模式执行工作线程自身队列中的任务. 此时会从base - > top遍历并并行所有任务
- 同步弄湿(asyncMode = false 默认值) 表示同步执行工作线程自身队列中的任务, 此时会从top - > base遍历并执行所有任务
任务入队总是从top端, 所以当以同步模式遍历时, 其相当于出栈操作 pop
如果异步, 相当于队列的出队曹汝 从base端poll元素
异步模式比较适用于不需要返回值的任务.
默认是同步模式, FJWT_1从栈顶开始执行并移除任务. 先执行任务2并移除, 在执行任务1






近期评论