JAVA多线程之ArrayBlockingQueue看Con

今天所要介绍的Condition,大家平时业务开发中可能少会用,一般会在框架或中间件比较常见,本文以JAVA中ArrayBlockingQueue中的Condition的使用以及实现原理。

ArrayBlockingQueue中Condition的属性定义和构造函数

final ReentrantLock lock;

/** Condition for waiting takes */
//等待在非空
private final Condition notEmpty;

/** Condition for waiting puts */
//等待队列没有满
private final Condition notFull;
复制代码

ArrayBlockingQueue构造函数,两个Condition都是从ReentrantLock的newConditon方法构造的
ConditionObject对象,

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
复制代码

通过看ReentrantLock的newCondition方法可以看到实际Condition实现类就是ConditionObject,这就是今天要分析的重点对象.

final ConditionObject newCondition() {
    return new ConditionObject();
}
复制代码

ConditionObject类结构

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */\
    //头节点
    private transient ConditionNode firstWaiter;
    /** Last node of condition queue. */
    //尾节点
    private transient ConditionNode lastWaiter;

    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() { }
复制代码

ArrayBlockingQueue的notfull和notEmpty的使用如下
1 notFull是放入队列前,如果队列满了,则执行notFull.await,让线程等待, 在元素入队后,通过调用notEmpty.signal唤醒消息者线程.
2.notEmpty是消息队列时,如果队列为空,则执行notEmpty.await让消费者线程等待,
元素消费者消息完后,执行notFull.signal唤醒等待的生产者线程去往队列放元素.

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
复制代码

下面我们着重分析下ConditionObject中await和signal是如何实现的。

ConditionObject中await实现

主要步骤:
1.创建一个新建等待节点加入条件等待队列
2.释放所有锁(即重置state变量为0)
3.判断node是否在CLH双端等待队列中,如果不在则直接LockSupport.park阻塞线程,r然后检查是否中断
4.如果CLH的等待队列中,则通过CLH队列中获取锁,
5.如果node的nextWaiter不为空,则表示有新的Condtion等待节点,则Conditon的单向队列中移除取消状态的节点
6.判断是否需要中断或者抛出异常

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //加入条件等待队列尾部
    Node node = addConditionWaiter();
    //全部释放锁
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    //是否在条件等待队列中
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //获取等待队列
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    //判断当前node下一个等待不为空,说明已经是在等待队列中,
    //则清除取消状态的等待节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
       //说明是处于中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
复制代码

如果最后的等待Node状态不是Node.Condition,则需要将取消的节点从等待队列中去掉,
然后创建一个Node,参数中线程是当前线程,Node的状态是Node.Condition,加入到队列
的尾部。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
复制代码

完成释放锁的操作,从这里可以看出,执行await操作是会释放锁的,,
首先获取state的状态值,然后重置为0,成为无锁状态.返回有锁之前的state
的值, 如果失败,则重置Node的waitStatus状态为CANCELLED状态

final long fullyRelease(Node node) {
    boolean failed = true;
    try {
        long savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
复制代码

等待队列中的节点去争抢锁,这个之前在ReentrantLock中分析过,这里就不再赘述,

final boolean acquireQueued(final Node node, long arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取新建node的前驱
            final Node p = node.predecessor();
            //如果前驱是head,并且tryAcquire或
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

ConditionObject中signal实现

ConditionObect的signal去唤醒firstWaiter(即Condition等待队列中第一个节点)

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
复制代码

doSignal是从firstWaiter开始循环,唤醒第一个没有被取消的等待节点

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
复制代码

transferForSignal则是将等待的node的waitStatus重置为0,然后从Condition队列中转移加入CLH的
双端等待队列。

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //加入CLH等待队列
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果等待状态是取消状态,或者waitStatus设置为Signal失败,则唤醒node的线程去争抢锁,
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
复制代码

下图是表示从Condition的等待队列中,会转移到CLH的等待对队列中.
image.png

总结
今天介绍的ConditonObject主要是条件等待队列,可以创建多个条件等待队列,
ReentrantLock是只有CLH的双向队列,都是在双向队列上进行的等待,相比之下,
Condition控制更加灵活,控制粒度更细。