1.AQS介绍
当我们想要看一个技术源码时,我们第一步是先搞清楚它的功能是什么,它能解决什么问题,带着需求看源码,才会更容易理解它的设计思想。
AQS: java.util.concurrent.locks.AbstractQueuedSynchronizer
Java标准库的一个抽象类。
它主要提供的功能有 获取、释放资源的功能。
在获取资源时,如果获取不到,直接将该线程放到等待队列中,并且将改线程休眠。
释放资源时,会唤醒等待队列的一个线程,去获取资源。
它将并发场景下资源控制,线程控制,队列控制等技术问题,封装在AQS内部。使用者可以简单的去继承AQS,并将获取、释放资源的业务逻辑,放到AQS提供的对应抽象方法tryAcquire(int arg)、tryRelease(int arg)中。
PS:Java标准库的ReentrantLock,CountDownLatch,Semaphore等,都是利用AQS的能力,轻易实现的。
当你对AQS这两个抽象方法的功能含义理解后,你也能轻易写出一个Java的锁。
2.AQS源码解析
1.获取资源 acquire(long arg)
public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
可以看到该方法先调用抽象方法tryAcquire看能不能获取到资源.(该抽象方法由使用方按照自己的逻辑去实现)如果没有获取到资源。进而调用acquireQueued方法。
该方法的返回值,表示在执行acquireQueued的方法获取到资源后,是否有状态中断,如果有中断状态,执行selfInterrupt()响应。
acquireQueued方法内调用parkAndCheckInterrupt()方法,在park休眠代码后调用Thread.interrupted()查看当前的中断状态,并且该方法会清除当前线程的中断中台值,因此在selfInterrupt()中,会再次设置中断状态。
addWaiter()方法将该线程节点放到队尾。
题外话:Java线程的interrupt()方法,仅仅是将线程的一个中断标志位。
如果想要响应中断,需要自己监听对应的值。
Thread获取有interrupted()静态方法和isInterrupted()方法。前者会清除中断标记,后者不会。
复制代码
下面是比较关键的acquireQueued可以看到刚才加到队尾的节点,执行该方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果是头节点,说明这个线程前面已经没有
//其他线程在等待了,并且获取到资源。该节点直接运行
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取资源失败,线程需要被休眠,使用park方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
这么代码在执行parkAndCheckInterrupt()后,线程就被休眠了。
当执行release释放资源后,会唤醒队列头部节点的线程,然后继续执行for循环,子啊尝试获取资源成功后,就真正的占有资源。然后线程执行业务逻辑去了。自此获取资源结束。
2.释放资源release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
方法也很简单,调用抽象方法tryRelease,看看业务实现类是否允许释放。
允许释放时,获取队列的头结点,调用unparkSuccessor进行释放后面结点中持有的线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
可以看到LockSupport.unpark node中的线程。在获取资源那park阻塞的线程便可继续执行了。
代码回到上面acquireQueued中,可以看到for循环里面继续执行,获取资源。这样
3.资源控制
AQS的state字段作为申请或者释放资源的承载字段,并且都是cas操作,且是线程可见的,所以在多线程并发操作下,也会保持state数据的正确性。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码
这是native的方法,cas是比较高效的。
3.AQS使用
3.1自己实现一个ReentrantLock
3.1.1 需求
通过上面两个核心方法介绍后,我们对AQS的功能也有了足够的了解。我这里有个需求,要实现一个锁,就让你利用AQS,那如何实现呢?
3.1.2 设计
我们设计一个类,继承AQS,来实现tryAcquire实现获取锁
tryRelease释放锁的逻辑。获取锁时,直接调用acquire,释放锁调用release即可。我们可认为state表示当前已经获取到锁的次数(其实这里主要是可重入锁,如果不可重入的,state就只有0,1两个值,1表示有线程获取锁,0表示没有线程获取到锁),因为考虑到可重入,一个线程可以加锁多次,所以state才可以突破1.
tryAcquire的逻辑大概是,看当前锁是否被占用,没占用则获取,并且设置state=1,返回true;如果被占用,则看当前锁持有的线程是否是自身,如果是自身,则state+1,然后返回true。如果不是自身,直接返回false。
tryRelease:检测持有锁的线程是否当前线程,如果是,对state-1,如果state=0,则可释放锁返回true,将持有锁线程设置为null。
有了上面的技术设计,现在实现一个锁就很简单了,下面看代码吧。
3.1.3 实现
public class MyReentrantLock implements Lock {
private volatile AQSI aqsi;
public MyReentrantLock() {
aqsi = new AQSI();
}
final private class AQSI extends AbstractQueuedLongSynchronizer {
protected void lock() {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
} else {
acquire(1);
}
}
@Override
protected boolean tryAcquire(long arg) {
Thread current = Thread.currentThread();
long state = getState();
if (state == 0) {
if (compareAndSetState(0, arg)) {
setExclusiveOwnerThread(current);
return true;
}
} else {
if (getExclusiveOwnerThread() == current) {
long nextState = state + arg;
setState(nextState);
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(long arg) {
long nextState = getState() - arg;
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new RuntimeException("当前加锁的线程不是该线程");
}
//可以释放
boolean canRelease = false;
if (nextState == 0) {
setExclusiveOwnerThread(null);
canRelease = true;
}
setState(nextState);
return canRelease;
}
}
@Override
public void lock() {
aqsi.lock();
}
@Override
public void unlock() {
aqsi.release(1);
}
//...还有其他接口,这里省略
}
复制代码
我们创建一个类MyReentrantLock去实现Lock接口。重点是lock和unlock接口。
其实重点都是内部类AQSI里,可以看到上面实现AQS两个抽象方法。
很简单吧,我们就实现了非公平的可重入锁。
3.1.4 一个小问题
这里给读者提个问题。tryRelease方法中,判断可以释放后, setState(nextState)为啥没有使用cas,直接set会不会有线程安全问题呀!
回答:其实不会啦,即使有很多线程去unlock释放锁,当时上面已经判断了,只允许持有锁的线程去释放,因此执行setState的方法时,只有一个线程去执行,不会有并发问题。
大家可以去ReentrantLock的源码,也是这么实现的。真的很敬佩jdk的作者,代码的边界case和简洁性都处理到极致。
3.2 CountDownLatch
CountDownLatch也是基于AQS实现的,读者可以按照上面的需求、设计的步奏,自己试着实现一次,在跟源码对比,这样一定印象深刻。
近期评论