AQS是个啥?了解后,你也能轻易写出一个Java的锁!

1.AQS介绍

当我们想要看一个技术源码时,我们第一步是先搞清楚它的功能是什么,它能解决什么问题,带着需求看源码,才会更容易理解它的设计思想。

AQS: java.util.concurrent.locks.AbstractQueuedSynchronizer
Java标准库的一个抽象类。

它主要提供的功能有 获取、释放资源的功能。
在获取资源时,如果获取不到,直接将该线程放到等待队列中,并且将改线程休眠。
释放资源时,会唤醒等待队列的一个线程,去获取资源。

它将并发场景下资源控制,线程控制,队列控制等技术问题,封装在AQS内部。使用者可以简单的去继承AQS,并将获取、释放资源的业务逻辑,放到AQS提供的对应抽象方法tryAcquire(int arg)、tryRelease(int arg)中。

PS:Java标准库的ReentrantLock,CountDownLatch,Semaphore等,都是利用AQS的能力,轻易实现的。
当你对AQS这两个抽象方法的功能含义理解后,你也能轻易写出一个Java的锁。

2.AQS源码解析

1.获取资源 acquire(long arg)

    public final void acquire(long arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

可以看到该方法先调用抽象方法tryAcquire看能不能获取到资源.(该抽象方法由使用方按照自己的逻辑去实现)如果没有获取到资源。进而调用acquireQueued方法。
该方法的返回值,表示在执行acquireQueued的方法获取到资源后,是否有状态中断,如果有中断状态,执行selfInterrupt()响应。

acquireQueued方法内调用parkAndCheckInterrupt()方法,在park休眠代码后调用Thread.interrupted()查看当前的中断状态,并且该方法会清除当前线程的中断中台值,因此在selfInterrupt()中,会再次设置中断状态。

addWaiter()方法将该线程节点放到队尾。

题外话:Java线程的interrupt()方法,仅仅是将线程的一个中断标志位。
如果想要响应中断,需要自己监听对应的值。

Thread获取有interrupted()静态方法和isInterrupted()方法。前者会清除中断标记,后者不会。
复制代码

下面是比较关键的acquireQueued可以看到刚才加到队尾的节点,执行该方法。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果是头节点,说明这个线程前面已经没有
                //其他线程在等待了,并且获取到资源。该节点直接运行
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取资源失败,线程需要被休眠,使用park方法
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

这么代码在执行parkAndCheckInterrupt()后,线程就被休眠了。
当执行release释放资源后,会唤醒队列头部节点的线程,然后继续执行for循环,子啊尝试获取资源成功后,就真正的占有资源。然后线程执行业务逻辑去了。自此获取资源结束。

2.释放资源release(int arg)

   public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码

方法也很简单,调用抽象方法tryRelease,看看业务实现类是否允许释放。
允许释放时,获取队列的头结点,调用unparkSuccessor进行释放后面结点中持有的线程

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
复制代码

可以看到LockSupport.unpark node中的线程。在获取资源那park阻塞的线程便可继续执行了。

代码回到上面acquireQueued中,可以看到for循环里面继续执行,获取资源。这样

3.资源控制

AQS的state字段作为申请或者释放资源的承载字段,并且都是cas操作,且是线程可见的,所以在多线程并发操作下,也会保持state数据的正确性。

  protected final boolean compareAndSetState(int expect, int update) {
     // See below for intrinsics setup to support this
     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 }
复制代码

这是native的方法,cas是比较高效的。

3.AQS使用

3.1自己实现一个ReentrantLock

3.1.1 需求

通过上面两个核心方法介绍后,我们对AQS的功能也有了足够的了解。我这里有个需求,要实现一个锁,就让你利用AQS,那如何实现呢?

3.1.2 设计

我们设计一个类,继承AQS,来实现tryAcquire实现获取锁
tryRelease释放锁的逻辑。获取锁时,直接调用acquire,释放锁调用release即可。我们可认为state表示当前已经获取到锁的次数(其实这里主要是可重入锁,如果不可重入的,state就只有0,1两个值,1表示有线程获取锁,0表示没有线程获取到锁),因为考虑到可重入,一个线程可以加锁多次,所以state才可以突破1.

tryAcquire的逻辑大概是,看当前锁是否被占用,没占用则获取,并且设置state=1,返回true;如果被占用,则看当前锁持有的线程是否是自身,如果是自身,则state+1,然后返回true。如果不是自身,直接返回false。

tryRelease:检测持有锁的线程是否当前线程,如果是,对state-1,如果state=0,则可释放锁返回true,将持有锁线程设置为null。

有了上面的技术设计,现在实现一个锁就很简单了,下面看代码吧。

3.1.3 实现

public class MyReentrantLock implements Lock {

    private volatile AQSI aqsi;

    public MyReentrantLock() {
        aqsi = new AQSI();
    }
    final private class AQSI extends AbstractQueuedLongSynchronizer {
        protected void lock() {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
            } else {
                acquire(1);
            }
        }

        @Override
        protected boolean tryAcquire(long arg) {

            Thread current = Thread.currentThread();
            long state = getState();
            if (state == 0) {
                if (compareAndSetState(0, arg)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else {
                if (getExclusiveOwnerThread() == current) {
                    long nextState = state + arg;
                 
                    setState(nextState);
                    return true;
                }

            }
            return false;
        }

        @Override
        protected boolean tryRelease(long arg) {

            long nextState = getState() - arg;
            if (getExclusiveOwnerThread() != Thread.currentThread()) {
                throw new RuntimeException("当前加锁的线程不是该线程");
            }
            //可以释放
            boolean canRelease = false;
            if (nextState == 0) {
                setExclusiveOwnerThread(null);
                canRelease = true;
            }

            setState(nextState);
            return canRelease;
        }
    }


    @Override
    public void lock() {
        aqsi.lock();
    }

    @Override
    public void unlock() {
        aqsi.release(1);
    }
    
    //...还有其他接口,这里省略

}
复制代码

我们创建一个类MyReentrantLock去实现Lock接口。重点是lock和unlock接口。
其实重点都是内部类AQSI里,可以看到上面实现AQS两个抽象方法。
很简单吧,我们就实现了非公平的可重入锁。

3.1.4 一个小问题

这里给读者提个问题。tryRelease方法中,判断可以释放后, setState(nextState)为啥没有使用cas,直接set会不会有线程安全问题呀!

回答:其实不会啦,即使有很多线程去unlock释放锁,当时上面已经判断了,只允许持有锁的线程去释放,因此执行setState的方法时,只有一个线程去执行,不会有并发问题。

大家可以去ReentrantLock的源码,也是这么实现的。真的很敬佩jdk的作者,代码的边界case和简洁性都处理到极致。

3.2 CountDownLatch

CountDownLatch也是基于AQS实现的,读者可以按照上面的需求、设计的步奏,自己试着实现一次,在跟源码对比,这样一定印象深刻。