浅谈ThreadPoolExecutor源码

1、前言

相信各位童鞋在多线程开发过程中一定使用过线程池。什么是线程池呢?线程池是一个管理线程的概念,我们可以把线程的创建、销毁、调度都交给线程池维护。一方面能够极大的减少程序员的开发量;另一方面,线程池维护着一定数量的存活线程,能减少不断创建销毁线程浪费的资源。顺便说一下,“池”这个概念是个很通用的概念,基本上频繁使用的、创建或销毁很耗费资源或很复杂的事物都可以“池”化。比如:数据库连接池、线程池、JVM常量池等等。我们如果引申一下,各种缓存,算不算一个个的“池”呢?我觉得广义上,也可以算是池。

话扯远了,我们还是回到线程池上。我们应该如何创建一个线程池呢?Java中已经提供了创建线程池的一个类:Executor而我们创建时,一般使用它的子类:ThreadPoolExecutor。下面笔者就尝试浅谈一下ThreadPoolExecutor源码

2、重点属性

2.1运行状态与worker数量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
复制代码

ctl这个属性很有趣,一个线程安全的Integer存储着两个值,最高3位放置线程池状态;剩下29位放置worker数量,所以worker最多可以有2^29-1个

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

可以看到这几个线程池状态最高位三个有值,其他位都是0。这几个状态都是什么意思?他们之间状态又是怎么转换的?
这里引用一张图:blog.csdn.net/qq_24384579…

20190621155050101.jpg

  • RUNNING:默认状态,一旦线程池创建,worker是0的时候,线程池就是这个状态。这个状态可以创建核心线程,可以向等待队列里加入任务
  • SHUTDOWN:调用shutdown接口的时候转换到这个状态。这个状态不接受新任务,但是能处理已经在池子里的任务。他是个柔性停止。
  • STOP:调用shutdownNow接口会转换到这个状态。这个时候不接受新任务,不处理池子里的任务,会把现有的线程中断。
  • TIDYING:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。TIDYING是一个终止前置态,我理解应该是个短期态。当线程池变为TIDYING状态时,会执行钩子函数terminated(),执行完后会进入下一个状态。
  • TERMINATED:线程池彻底终止,终态了。

2.2等待队列与worker集合

/** 等待队列 */
private final BlockingQueue<Runnable> workQueue;
/** worker集合 */
private final HashSet<Worker> workers = new HashSet<Worker>();
复制代码

workQueue是等待队列,从线程池获取线程的时候不满足条件的时候,就往这里加,然后等待。那么有几种队列呢?

  • ArrayBlockingQueue 是一个有界阻塞队列,用Array实现,先到先得。
  • LinkedBlockingQueue 是一个链表结构的阻塞队列,可以手动设置队列大小,如果不设置默认为int类型的最大值,因此使用这个的时候需要注意,如果队列大小设置的过大,是有OOM异常风险的。由于他是链表结构,因此比ArrayBlockingQueue的队列吞吐量高一些。
  • DelayQueue 是延迟队列。是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。
  • PriorityBlockingQueue 是一个有优先级的队列,可以人为设置优先级,优先级最高的最先出队列。
  • SynchronousQueue 这个队列很特殊。他不存元素,一个线程想要插入队列,就必须等待另一个线程把自己的元素从队列里移除。

worker集合其实就是个池子,worker里维护着线程,需要运行就在这里获取。那么为什么用worker包装一下呢,详情见worker部分。

2.3锁与线程数

/** 可重入锁 */
private final ReentrantLock mainLock = new ReentrantLock();
/** 最大线程数 */
private int largestPoolSize;
/** 存活时间,非核心线程,超过这个时间没有用就被回收了 */
private volatile long keepAliveTime;
/** 核心线程数 */
private volatile int corePoolSize;
/** 最大线程数 */
private volatile int maximumPoolSize;
复制代码

其中mainLock是个可重入锁,用于锁住重要的代码,其中addWorker里面就锁住了workerslargestPoolSize其中workers因为不是线程安全的;largestPoolSize也需要改变所以必须加锁。進一步mainLock 为什么设计成可重入锁呢?我理解因为在增加worker或者tryTerminate的时候需要反复的lock,例如addWorkerFailed的时候就会出现addWorkerFailed里面lock一次,还没有释放锁就tryTerminate又lock一次。如果是非可重入的,那就锁死了。(诶嘿,面试题来了,ThreadPoolExecutor中究竟有几把锁?)

那么corePoolSize,maximumPoolSize,workQueue是怎么打配合呢?一开始嘛,线程池是空的,后来有个任务提交过来了,这个时候就需要建核心线程。核心线程创建到了corePoolSize之后,再来就往workQueue里面加,让任务等着。如果workQueue也满了,那么就开始创建非核心线程,直到达到maximumPoolSize。再来,那就得按照拒绝策略拒绝任务了。

3、内部类—worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
复制代码

Worker是个很有意思的内部类,他承担着执行外部提交的任务的工作(实现的run方法)。再继续看Worker,他继承了AbstractQueuedSynchronizer ,他居然是个锁!再进一步看tryLock,结合juejin.cn/post/703887…
看,他还是个不可重入的锁。那为什么把Worker设计成一个锁呢。可以看看注释:

We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize. Additionally, to suppress interrupts until the thread actually starts running tasks, we initialize lock state to a negative value, and clear it upon start (in runWorker).

咱简单粗暴翻译一下,就是下面这段话

我们实现了一个简单的不可重入互斥锁,而不是使用可重入锁,因为我们不希望工作任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取锁。此外,为了在线程实际开始运行任务之前抑制中断,我们将锁定状态初始化为负值,并在启动时清除它(在runWorker中)。

那好了,看哪里加锁那里尝试获取锁就可以了。可以看到lock只有一处调用在runWorker方法,在运行任务的时候给加锁;那什么时候尝试获取锁呢,interruptIdleWorkers,在尝试中断的时候获取锁。结合起来就是,运行的时候是不能够中断线程的,那么在运行前就给他加锁;在中断前就需要尝试获取锁,获取不到就决不能中断。(现在能回答了吧?ThreadPoolExecutor中究竟有几把锁?)

与此同时,Worker还是线程执行的委托类,所有的runable都是委托给worker执行的。我觉得可以理解为,线程池里所有“活着的”线程,都是worker持有的线程。这些线程可能是核心线程,也可能是非核心的。

4、重点方法

execute方法:
我们执行的最多的方法,具体的逻辑其实就是上面提到的,那几个参数怎么打配合

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    /** 如果是现在小于核心线程呢,就加入核心数 */
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    /** 能走到这里,要么是大于核心线程数,要么是没加worker成功,就尝试加入队列 */
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
	/** 这里二次检查一次,因为上面的isRunning和offer是耗时的,达到这一步可能状态已经发生变化了 */
        if (! isRunning(recheck) && remove(command))
        /** 如果不是运行中,然后移除command,如果移除成功了,就拒绝 */
            reject(command);
        else if (workerCountOf(recheck) == 0) 
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) 
    /** 到这里一定是需要加入非核心线程了,因为队列一定是满了,所以先无脑加一次addWorker,如果还加入失败就得拒绝了 */
        reject(command);
}
复制代码

addWorker方法: 往worker的set里面加入worker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    /** 因为里面有CAS,所以必须死循环 */
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        /** 如果线程处于非运行状态,并且 rs 不等于 SHUTDOWN 且 firstTask 不等于空且且
         *    workQueue 为空,直接返回 false
         *    1. 线程池已经 shutdown 后,还要添加新的任务,拒绝
         *    2. (第二个判断)SHUTDOWN 状态不接受新任务,但仍然会执行已经加入任务队列的任
         *    务,所以当进入 SHUTDOWN 状态,而传进来的任务为空,并且任务队列不为空的时候,是允许添加
         *   新线程的,如果把这个条件取反,就表示不允许添加 worker
	 */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            /** 如果超过了线程池最大容量,或者超过了核心或最大线程数
            (与新建的worker是核心还是非核心有关)则不允许增加worker了 */
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /** 增加worker数量,一旦CAS增加成功,那么直接跳出去,不再进入循环 */
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            /** 需要重新校验一下线程池的当前状态,如果状态已经发生转换,
            * 需要跳出去再次进入循环校验一下是不是有必要新增 */
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            /** 加锁保护workers的hashset并保护largestPoolSize */
            mainLock.lock();
            try {
                /** 在保持锁定的同时重新检查。在ThreadFactory出现故障或在获取锁之前关闭时退出。 */
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
		/** 这里t就是worker里面的线程,线程start之后就会调用worker的run方法,进而进入runWorker开始执行 */
                t.start();
                workerStarted = true;
            }
        }
    } finally {
	/** 如果worker没有启动起来,那么需要降低worker的count
        * 同时进入tryTerminate尝试减少空闲worker(没有启动嘛,当然要减少) 
        */
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
复制代码

tryTerminate方法: 尝试终止线程池,只有线程池是shutdown 且 worker和队列都是空,或 线程池是stop且worker是空的

final void tryTerminate() {
/** 死循环,必须把ctl.compareAndSet(c, ctlOf(TIDYING, 0))设置成功才可以,否则一直等下去 */
    for (;;) {
        int c = ctl.get();
	/** 如果线程池是运行中,或状态是TIDYING,或状态是已终止,
        * 或状态是shutdow但是队列还没空。这个时候一定不能终止,所以直接return
	* 队列如果为空,也许能终止,但是不为空,一定不能终止。*/
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
	/** 如果有worker,那么不能终止,但是需要中断一个空闲的worker,让信号量传递出去
	* 这句话是注释上写的,很令人费解。。。
	*/
        if (workerCountOf(c) != 0) { 
            interruptIdleWorkers(ONLY_ONE);
            /** 中断之后只中断了一个空闲worker,当然也可能一个空闲的都没有。
            * 那么需要退出,让外部方法再次进入tryTerminate,再次检查是不是所有worker都停止了 */
            return;
        }
	/** 获取全局锁 */
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {/** CAS设置线程池为待终止 */
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {/** 子类实现的终止方法 */
                    terminated();
                } finally {
                    /** 执行完子类后,设置线程池状态是终止 */
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}
复制代码

“但是需要中断一个空闲的worker,让信号量传递出去”。这句话很令人费解。。。。想要理解这句话,需要跟getTask方法结合起来看。getTask是从阻塞队列中获取任务的方法,在runWorker中执行。从下面代码看

Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
复制代码

当队列为空的情况下,会阻塞在那里。那么就明了了,worker有几种状态:1、正在运行;2、队列为空,正在阻塞或worker还没获取到task(worker空闲)。那么如果interruptIdleWorkers中断了一个空闲的worker,那么就会抛出interupt异常,解放getTask的阻塞。被外部捕获之后,会再次进入getTask,那么进入getTask的第一步就是检查是不是线程池是shutdown且队列为空。如果为空了,就返回一个null给runWoker方法,此时runWorker退出循环进入finnaly的processWorkerExit——worker退出动作。再回到tryTerminate里面,中断一个worker之后就return了,因为不知道中断之后是不是还有worker,需要调用tryTerminate再次检查是否可以中断,直到全部worker都终止然后再终止。

上文提到了runWorker和getTask,我们来看一下这两部分代码

runWorker方法: 执行worker方法,Worker类的run方法会调用这个方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
	/** 获取worker里面的task */
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
	/** 这个判断里面有两步,自己绑定的task不为空,则直接进入循环;
        *如果为空则通过getTask获取队列中的任务赋值给task,
	* 如果获取的到,则进入循环,获取不到则进入finnaly的worker终止方法 */
        while (task != null || (task = getTask()) != null) {
            /** 先锁定worker,这个worker已经运行中了,不许占用,不许中断 */
            w.lock();
            /** 如果池正在停止,请确保线程被中断;
            *如果没有,请确保线程没有中断。这个需要进行二次检查,
            *用来防止清除中断的时候执行shutdownNow */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
		/** 子类实现的,执行前逻辑 */
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {/** 执行具体任务 */
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {/** 子类实现,执行后逻辑 */
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;/** 要么任务执行完了,要么抛异常了,反正这轮task执行完了,把task设置为空 */
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
	/** 退出后需要把这个worker给终止 */
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

getTask方法: 从阻塞队列里面获取一个任务

private Runnable getTask() {
    boolean timedOut = false; 
    /** 不断的循环,直到获取到任务或者不满足,直接返回null让runWorker退出worker */
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        /** 如果线程池准备终止,而且线程池是stop或等待队列为空 */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            /** worker数量减一,并返回null,因为线程池已经准备终止了,需要让worker终止 */
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
	/** 检查一下,是不是需要减少一个worker,需要满足两个条件
	* 1、要么有非核心的worker,要么核心worker是超时的。
	* 2、满足了1还要看一下是不是可以减少一个worker,因为要么队列为空,可以直接减少,要么队列不为空得保留一个worker把队列消耗完
	*/
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
	/** 从队列中获取数据,要么就是没有等待时间的阻塞获取,要么是有等待时间的阻塞获取 */
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
复制代码