线程池是平常工作中使用得较多的并发场景,今天用一个demo带大家看一下线程池原理,逐行看源码。
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,10,0L,
TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());
for (int i=1;i<=50;i++){
AtomicInteger count = new AtomicInteger(i);
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"在做第" + count.getAndIncrement() + "个任务");
try{
Thread.sleep(3000L); // 模拟业务代码执行时间
}catch (Exception e){
e.printStackTrace();
}
});
}
}
}
复制代码
ThreadFactory可以为线程设置线程名与优先级等属性。为线程添加线程名可以通过日志快速定位到是哪个线程池出了问题,建议为每一个线程池设置不同的线程名方便定位。
demo中的线程池设置了核心线程数5个,总线程数10个,没有等待时间,阻塞队列5个。
执行结果:
让我们带着一些问题来看源码:
为什么先执行了第11-15的循环,后执行了第6-10次的循环
为什么执行了11-15次循环后,抛了异常
为什么执行了第10次循环后不再执行
参数解析
int corePoolSize : 核心线程数。默认情况下即使线程空闲也需要保留,可通过allowCoreThreadTimeOut参数修改为true,可实现核心线程的回收。
int maximumPoolSize:最大线程数。在线程池中允许存在的最大线程数。必须大于>=核心线程数。
long keepAliveTime:等待时间。当线程池中的线程的数量>核心线程数时,空间线程在等待时间后仍空间将会被释放。
TimeUnit unit:等待时间的单位。
BlockingQueue workQueue:等待队列。存放还未被执行的任务,Queue队列结构遵循先进先出原则。
ThreadFactory threadFactory:创建新线程的工厂,可以用来设置线程名,在日志中可根据线程名来快速定位问题。
RejectedExecutionHandler handler:拒绝策略。当任务超出线程池处理范围,定义如何抛弃任务。
execute方法
线程池执行ececute方法添加任务。
方法中的ctl代表的是线程池的状态与已有线程数量。ctl中前三位保存的是状态,其它位用来保存线程数量。
// Packing and unpacking ctl
// 线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
public void execute(Runnable command) {
// command就是demo代码中execute方法后的lambda表达式
if (command == null)
throw new NullPointerException();
// 得到线程池的状态与数量信息
int c = ctl.get();
// 如果当前线程数量<核心线程数
if (workerCountOf(c) < corePoolSize) {
// 为command创建一个新线程执行,并返回
if (addWorker(command, true))
return;
// 新线程执行失败,更新当前线程状态
c = ctl.get();
}
// 当前线程为可运行状态,且 当前任务可加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// double check 当此时线程不在可运行状态,需要将任务从队列中删除,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果此时当前线程数为0,再新增一个空线程
else if (workerCountOf(recheck) == 0)
// 当task为空时,会尝试去队列中取任务
addWorker(null, false);
}
// 如果不能进入队列,尝试新开非核心线程。如果失败则执行拒绝
else if (!addWorker(command, false))
reject(command);
}
复制代码
通过分析源码,其实就能很清楚地看到,当线程池处理多个任务时的执行逻辑是这样的:
- 先使用核心线程
- 核心线程数满了之后,进入等待队列
- 队列满了后,使用非核心线程
- 最大线程数和队列都满了,执行拒绝策略
addWorker方法
上面的execute方法中可以看到线程池对于任务分配的执行逻辑,addWorker方法是执行添加线程的逻辑。
@param firstTask : 新线程首先应执行的任务
@param core : 为true则使用核心线程,为false使用非核心线程。
// runState is stored in the high-order bits
// 线程池状态
// 正常执行状态,可以接受新任务,也能处理阻塞队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 不接受新提交的任务,但是能处理阻塞队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接爱新任务,也不处理队列中的任务,中断正在处理任务的线程
private static final int STOP = 1 << COUNT_BITS;
// 如果全部的任务都已终止,也没有有效线程了
private static final int TIDYING = 2 << COUNT_BITS;
// 线程池停止
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 这是goto的写法,用于重试
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 校验满足以下条件,则代表添加线程失败。
// 当线程池状态不可运行时,如果有新任务进来,或者队列不为空时,就需要返回失败
// 一般返回失败后,都会清空队列且执行拒绝策略
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 当前线程数
int wc = workerCountOf(c);
// 根据core判断是使用核心线程还是非核心线程,判断当前线程数是否在范围内
// 如果超出范围,返回失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 这是一个cas操作,如果成功则线程数+1,且退出外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果现在的线程池状态与之前不一致,则返回retry; 开始重试
if (runStateOf(c) != rs)
continue retry;
// 如果上面的cas操作失败,在通过状态判断后,会再次进行内层循环
// else CAS failed due to workerCount change; retry inner loop
}
}
// 执行到这里时,就意味着cas已经成功,已经成功添加线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 为入参任务创建一个线程,线程通过ThreadFactory创建
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 再次检查状态,如果状态为可运行或者状态为shutdown但是入参任务为空
// 是可以添加线程的
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程还没start就已经是alive状态了,是异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 记录最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
// 线程已添加的标记位
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
// 线程已添加,启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程未能正常启动,需要移除线程,线程数-1,并尝试关闭线程池
if (! workerStarted)
addWorkerFailed(w);
}
// 返回线程是否成功启动的标志位
return workerStarted;
}
复制代码
runWorker方法
当有空闲线程时,就会执行当前方法,在这个方法中通过task.run()执行业务逻辑。在 while方法中执行完当前任务,还会进入下一次循环直到队列中也没有任务。线程还是那个线程,但是执行的任务不一样,所以是这个方法中的循环实现了线程复用。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 当任务为空时,就会去阻塞队列中取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 线程池状态>=stop时,执行中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// beforeExecute和afterExecute中都是空代码
// 都是一个扩展点,可以自定义
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 下次循环,task为空,就会执行阻塞队列中的任务
task = null;
// 已完成数量+1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程执行完,就需要将当前线程移除workers集合
// 如果是正常执行完,还会再开一个新的null线程,再次尝试处理任务
// 如果没有待执行的任务,会关闭线程池
processWorkerExit(w, completedAbruptly);
}
}
复制代码
getTask方法
private Runnable getTask() {
// 超时回收标志位
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 当前线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果状态>=stop 或者 状态>=shutdown 且队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 说明此时线程池不能执行,或者不需要执行,所以cas操作执行线程-1
decrementWorkerCount();
return null;
}
// 当前线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// 超时回收标志位
// allowCoreThreadTimeOut默认为false,当为false时,核心线程即使空闲也不会被回收
// 为true时,核心线程将会在等待keepAliveTime后,仍空闲就会被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当线程不够用时,且满足释放条件时,就会cas尝试执行线程-1
// 并且会continue重新进入循环反复尝试
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 进入到这步时,说明有空闲线程可执行
try {
// 当空闲线程会被回收时,使用poll,其他情况使用take
// 这两个函数都是返回队列的头部并删除
// poll方法将会在等待时间内一直尝试获取队列头部,超时会返回null
// take方法由于不会回收,所以可以获取不到队列头就awwait
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果队列为空,下次循环可以尝试线程-1
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
总结
到了这里,一开始提出的问题基本上就已经很清楚了。
为什么先执行了第11-15的循环,后执行了第6-10次的循环
核心线程数执行完后,6-10次任务进入了队列,11-15次任务先激活了非核心线程
为什么执行了11-15次循环后,抛了异常
报错内容其实是说线程池不够用执行了拒绝策略。因为核心线程数已用完,且队列也满了,就会执行拒绝策略。执行的execute()方法中最后一个分支。
为什么执行了第10次循环后不再执行
因为在抛出异常后,也没有catch进行处理,默认这个线程池就结束了,但是由于阻塞队列中还有未完成的任务,所以还会执行第6-10次任务。
近期评论