forkjoin详解 2.示例 3.深入

Fork/Join框架是Java7提供的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。使用工作窃取(work-stealing)算法,主要用于实现“分而治之”。

用起来的感觉像是归并算法

2.示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48


package demo.forkjoin;

import java.util.concurrent.RecursiveTask;

public class extends RecursiveTask<Long> {
/**
* 起始值
*/
private Long start;
/**
* 结束值
*/
private Long end;
/**
* 临界值
*/
public static final Long critical = 100_000L;

public (Long start, Long end) {
this.start = start;
this.end = end;
}


protected Long compute() {
Long length = end - start;
if (length <= critical) {
// 拆分完毕就相加
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
// 没有拆分完毕就开始拆分
Long middle = (start + end) / 2;
// 拆分并压入线程队列
ForkJoinWork left = new ForkJoinWork(start, middle);
left.fork();
ForkJoinWork right = new ForkJoinWork(middle + 1, end);
right.fork();

//合并
return left.join() + right.join();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// ForkJoinWorkDemo.java

package demo.forkjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.stream.LongStream;


public class ForkJoinWorkDemo {
private static final Long start = 0L;
private static final Long end = 10_000_000_000L;


public static void test() {
long t0 = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinWork task = new ForkJoinWork(start, end);
Long invoke = forkJoinPool.invoke(task);
long t1 = System.currentTimeMillis();
// invoke=-5340232216128654848 time: 76883
System.out.println("invoke=" + invoke + " time: " + (t1 - t0));
}

public static void test2() {
//普通实现
Long x = start;
Long y = end;
long l = System.currentTimeMillis();
for (Long i = 0L; i <= y; i++) {
x += i;
}
long l1 = System.currentTimeMillis();
// invoke = -5340232216128654848 time: 81475
System.out.println("invoke = " + x + " time: " + (l1 - l));
}

public static void test3() {
//Java 8 并行流的实现
long l = System.currentTimeMillis();
long reduce = LongStream.rangeClosed(start, end).parallel().reduce(0, Long::sum);
long l1 = System.currentTimeMillis();
// invoke = -5340232216128654848 time: 1727
System.out.println("invoke = " + reduce + " time: " + (l1 - l));
}

public static void main(String[] args) {
// test();
// test2();
test3();
}
}

三种方式经过比较:

  1. fork/join是基于分而治之的思想来实现
  2. fork/join能提高执行效率:更充分的使用CPU
  3. java8并行流做了优化,效果很明显

3.深入

后续再详细学习…