juc并发包-lucks

AQS简介

AbstractQueuedSynchronizer抽象类(以下简称AQS)是整个java.util.concurrent包的核心。在JDK1.5时,Doug Lea引入了J.U.C包,该包中的大多数同步器都是基于AQS来构建的。AQS框架提供了一套通用的机制来管理同步状态(synchronization state)、阻塞/唤醒线程、管理等待队列。

我们所熟知的ReentrantLock、CountDownLatch、CyclicBarrier等同步器,其实都是通过内部类实现了AQS框架暴露的API,以此实现各类同步器功能。这些同步器的主要区别其实就是对同步状态(synchronization state)的定义不同。

AQS框架,分离了构建同步器时的一系列关注点,它的所有操作都围绕着资源——同步状态(synchronization state)来展开,并替用户解决了如下问题:

资源是可以被同时访问?还是在同一时间只能被一个线程访问?(共享/独占功能)

访问资源的线程如何进行并发管理?(等待队列)

如果线程等不及资源了,如何从等待队列退出?(超时/中断)

这其实是一种典型的模板方法设计模式:父类(AQS框架)定义好骨架和内部操作细节,具体规则由子类去实现。
AQS框架将剩下的一个问题留给用户:

什么是资源?如何定义资源是否可以被访问?

我们来看下几个常见的同步器对这一问题的定义:

同步器 资源的定义
ReentrantLock 资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数
CountDownLatch 资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。
Semaphore 资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。
ReentrantReadWriteLock 资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,

原理简述

AQS框架分离了构建同步器时的一系列关注点,它的所有操作都围绕着资源——同步状态(synchronization state)来展开因此,围绕着资源,衍生出三个基本问题:

同步状态(synchronization state)的管理

阻塞/唤醒线程的操作

线程等待队列的管理

同步状态

同步状态,其实就是资源。AQS使用单个int(32位)来保存同步状态,并暴露出getState、setState以及compareAndSetState操作来读取和更新这个状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

* 同步状态.
*/
private volatile int state;

protected final int () {
return state;
}

protected final void setState(int newState) {
state = newState;
}

* 以原子的方式更新同步状态.
* 利用Unsafe类实现
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

线程的阻塞/唤醒

使用了LockSupport类

等待队列

等待队列,是AQS框架的核心,整个框架的关键其实就是如何在并发状态下管理被阻塞的线程。
等待队列是严格的FIFO队列,是Craig,Landin和Hagersten锁(CLH锁)的一种变种,采用双向链表实现,因此也叫CLH队列。

结点定义

CLH队列中的结点是对线程的包装,结点一共有两种类型:独占(EXCLUSIVE)和共享(SHARED)。
每种类型的结点都有一些状态,其中独占结点使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享结点使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。

结点状态 描述
CANCELLED 1 取消。表示当前结点被中断或超时,需要移出队列
SIGNAL -1 发信号。表示后驱结点被阻塞了(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。)
CONDITION -2 Condition专用。表示当前结点在Condition队列中,因为等待某个条件而被阻塞了
PROPAGATE -3 传播。适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。)
INITIAL 0 默认。新结点会处于这种状态

AQS使用CLH队列实现线程的结构管理,而CLH结构正是用前一结点某一属性表示当前结点的状态,之所以这种做是因为在双向链表的结构下,这样更容易实现取消和超时功能。

next指针:用于维护队列顺序,当临界区的资源被释放时,头结点通过next指针找到队首结点。
prev指针:用于在结点(线程)被取消时,让当前结点的前驱直接指向当前结点的后驱完成出队动作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
static final class Node {

// 共享模式结点
static final Node SHARED = new Node();

// 独占模式结点
static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;

static final int SIGNAL = -1;

static final int CONDITION = -2;

static final int PROPAGATE = -3;


* INITAL: 0 - 默认,新结点会处于这种状态。
* CANCELLED: 1 - 取消,表示当前结点被中断或超时,需要移出队列;
* SIGNAL: -1- 发信号,表示后续结点被阻塞了;(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。)
* CONDITION: -2- Condition专用,表示当前结点在Condition队列中,因为等待某个条件而被阻塞了;
* PROPAGATE: -3- 传播,适用于共享模式。(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。)
*
* waitStatus表示的是后续结点状态,这是因为AQS中使用CLH队列实现线程的结构管理,而CLH结构正是用前一结点某一属性表示当前结点的状态,这样更容易实现取消和超时功能。
*/
volatile int waitStatus;

// 前驱指针
volatile Node prev;

// 后驱指针
volatile Node next;

// 结点所包装的线程
volatile Thread thread;

// Condition队列使用,存储condition队列中的后继节点
Node nextWaiter;

Node() {
}

Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
}



* 添加节点到等待队列, 如果有必须就初始化Node.
* 自选CAS添加等待队列尾部. 直到成功
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


* Creates and enqueues node for current thread and given mode.
* 创建node并入队, 设置当期线程和指定锁的类型 是独占还是共享
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试快速设置 将node加入队列尾. 如果失败了就交个end方法. 自旋去
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

CAS操作

CAS,即CompareAndSet,在Java中CAS操作的实现都委托给一个名为UnSafe类,

方法名 修饰符 描述
compareAndSetState protected final CAS修改同步状态值
compareAndSetHead private final CAS修改等待队列的头指针
compareAndSetTail private final CAS修改等待队列的尾指针
compareAndSetWaitStatus private static final CAS修改结点的等待状态
compareAndSetNext private static final CAS修改结点的next指针

ReentrantLock的公平策略原理

假设现在有3个线程:ThreadA、ThreadB、ThreadC,一个公平的独占锁,3个线程会依次尝试去获取锁:ReentrantLock lock=new ReentrantLock(true);

线程操作如下

1
2
3
4
5
6
7
8
9
10
11
//ThreadA    lock

//ThreadB lock

//ThreadC lock

//ThreadA release

//ThreadB release

//ThreadC release

TA首先获得锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

public void lock() {
sync.lock();
}

static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}
}

public final void acquire(int arg) { // arg =1
if (
!tryAcquire(arg) //尝试获取资源
&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


*
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) { //acquires =1
final Thread current = Thread.currentThread(); //获得当期线程
int c = getState(); // 获得同步状态
if (c == 0) { // 0代表锁并未被占用
//如果等待队列中. 在当期线程前没有其他线程. 则用CAS获得锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//更新成功. 将锁的占有线程设置为当期线程
setExclusiveOwnerThread(current);
return true;
}
}
//c!=0 说明锁被占用. 判断是否是重入的情况
else if (current == getExclusiveOwnerThread()) {
// state+1
int nextc = c + acquires;
// 重入次数过大. 溢出了
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//设置状态
setState(nextc);
return true;
}
return false;
}

public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

ReentrantLock中 state的状态代表 0 表示锁可用. 1 锁占用 >1 表示锁被占用 值表示被重入的次数

TB开始获取锁

先lock->sync.lock(1) -> 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final void acquire(int arg) { // arg =1
if (
//尝试获取锁 这里是失败的. 因为A已经获得锁了. c!=0 并且 current != getExclusiveOwnerThread()
!tryAcquire(arg) //尝试获取资源
//将当期线程包装成节点加入等待队列
&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断自己
selfInterrupt();
}

private Node addWaiter(Node mode) {
// 将线程包装成节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 视图快速将自己设置成尾节点. 先尝试一次, 如果成功就不用end() 失败. 自旋去 tail!=null 说明队列中有排队的node. 那么让当期node排在他后面. 把自己改为tail即可
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果tial==null 说明等待队列还没有初始化. 需要自己帮忙初始化等待队列. 第一个线程获得锁的时候改完state就完成了. 不会初始化队列
// 第二个加锁的线程, 发现队列中没有排队的. 但是cas改变状态又失败了. 说明自己是第二个加锁, 自己需要初始化队列. 初始化一个null node当head. 自己排在他的后面,
// 在aqs等待队列中, 对头head 代表当期锁的执行线程. 不代表排队. 第二个才算是排队的. 即tail != null的时候
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

tail == null 说明aqs队列没有初始化, 那就必须初始化.

现在TB已经加入等待队列尾部了. 接下来会调用acquireQueued方法. ,这也是AQS中最重要的方法之一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//从等待列表中获得node的前节点, 果然获取不到. 就要确保前置节点能唤醒自己的情况下. 自己进入阻塞状态,
// 正常情况下: 该方法会一直阻塞当期线程. 知道获得锁才返回. 但是如果执行过程中,抛异常(tryAcquire方法) 那么会将当期节点移除等待队列. 继续向上抛出异常.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获得node的前一个节点p
final Node p = node.predecessor();
// 如果p是首节点, 然后去尝试获得锁. 因为TA在持有锁, 这里返回false
if (p == head && tryAcquire(arg)) {
// 将自己设置为head 表示自己占用锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否需要阻塞线程. 也就是判断TB是否要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

在AQS中,等待队列中的线程都是阻塞的,当某个线程被唤醒时,只有该线程是首结点(线程)时,才有权去尝试获取锁。

上述方法中,将ThreadB包装成结点插入队尾后,先判断ThreadB是否是首结点(注意不是头结点,头结点是个dummy结点),发现确实是首结点(node.predecessor==head),于是调用tryAcquire尝试获取锁,但是获取失败了(此时ThreadA占有着锁),就要判断是否需要阻塞当前线程。

判断是否需要阻塞线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//获得node的前节点的等待状态
if (ws == Node.SIGNAL) // 如果是SIGNAL(-1): 后置节点需要唤醒. 自己就可以放心阻塞了. 将来会唤醒我的.
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 也就是CANCELLED 取消状态. 说明前置节点因为中断或者取消. 需要在等待队列中移除
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 知道遇到前置节点的状态小于等为止.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 将前置节点的后置节点设置成自己.
pred.next = node;
} else { //如果既不是SIGNAL也不是CANCELED. 那么就将前置节点的waitStatus设置成SIGNAL.
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//阻塞在这里. 并检查线程是否被中断.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

TC开始获取锁

流程和TB获取锁是一样的. 1)尝试获取锁, 失败. 因为A在占用. 2)将自己包装成node.并加入等待队列尾部 3) 检查自己的前置节点是否是首节点 false. 4) 检查自己是否需要阻塞. 检查前置节点的等待状态发现是0(因为刚加入). 那就将其设置为SIGNAL(-1). 自己进入阻塞状态

TA释放锁

TA使用完了临界区资源. 需要释放锁了. 调用unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void unlock() {
sync.release(1);
}
// arg =1
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// h要值. 表示有线程在等待. h.waitStatus !=0 表示h不是新加入的节点. 在h!=null的情况. waitStatus==0 那么说明后置节点在唤醒中
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //释放成功, 则唤醒首节点.
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 将当期状态-1
//获得锁的线程和释放锁的线程不一样. 抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c ==0 表示锁处于空闲状态了
if (c == 0) {
free = true;
// 将锁的拥有者设置为null
setExclusiveOwnerThread(null);
}
// 将状态设置为 更新状态值
setState(c);
return free;
}

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
获得首节点的状态
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 将状态设置为0 表示后置节点即将被唤醒

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*
*/
Node s = node.next; //得到后置节点
//当后置节点为null或者s.waitStats=1 也就是取消状态时
if (s == null || s.waitStatus > 0) {
s = null;
//从尾部开始向前遍历. 直到找到一个waitStatus<=0的. 并将其唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//正常状态下 直接唤醒后置节点的线程
if (s != null)
LockSupport.unpark(s.thread);
}

TB被唤醒并继续执行

现在head把他的后置节点唤醒了. ThreadB会继续从以下位置开始执行,先返回一个中断标识,用于表示ThreadB在阻塞期间有没被中断过:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    //继续从这里开始执行 .返回中断标识, 表示自己在阻塞期间没有被状态过.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

因为自旋的缘故. 会继续执行下面代码

1
2
3
4
5
6
7
8
9
10
final Node p = node.predecessor();
//唤醒线程的节点的前置节点是head, 尝试获取锁. 能成功的
//tryAcquire加锁逻辑和A的加锁逻辑一样 1) 获得当期线程 2)获得状态 3 状态==0 判断是否存在排队的前辈 设置状态为1, 将锁的拥有者设置为自己
if (p == head && tryAcquire(arg)) {
// 将直接的节点设置为Head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

TB和TC释放锁

TB释放锁的逻辑和TA释放锁的逻辑一致. 在醒来后.返回中断标识. 获取锁. 成功将自己设置为锁的使用者.并将自己设置为head.

TC释放锁, 发现自己没有后置节点了. 从队尾开始遍历.发现tail == 当前的自己. 什么也不做

ReentrantLock的非公平策略原理

公平锁和非公平锁获取逻辑主要差别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final void lock() {
// 1直接尝试获取锁. 不管等待队列
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//这里和公平锁相比缺少了是否有线程排在自己之前等待(hasQueuedPredecessors)判断.
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这就导致了 大家一起竞争锁.

CountDownLatch分析AQS的共享功能

假设现在有3个线程,ThreadA、ThreadB、mainThread,CountDownLatch初始计数为1:
CountDownLatch switcher = new CountDownLatch(1);
线程的调用时序如下:

1
2
3
4
5
//ThreadA调用await()方法等待

//ThreadB调用await()方法等待

//主线程main调用countDown()放行

AQS共享功能的原理

创建CountDownLatch

CountDownLatch的创建没什么特殊,调用唯一的构造器,传入一个初始计数值,内部实例化一个AQS子类:

CountDownLatch switcher = new CountDownLatch(1);

1
2
3
4
5
6
7
8
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

Sync(int count) {
setState(count);
}

初始计数值count其实就是同步状态值,在CountDownLatch中,同步状态State表示CountDownLatch的计数器的初始大小。

ThreadA调用await()方法等待

CountDownLatch的await方法是响应中断的,该方法其实是调用了AQS的acquireSharedInterruptibly方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //响应线程中断
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 尝试获取锁. <0 就是失败 ,>0 成功, 0 无锁状态. 这里当然是失败的 以为我们初始化的状态count =1.
doAcquireSharedInterruptibly(arg);
}


protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

之前说了在CountDownLatch中,同步状态State表示CountDownLatch的计数器的初始值,当State==0时,表示无锁状态,且一旦State变为0,就永远处于无锁状态了,此时所有线程在await上等待的线程都可以继续执行。
而在ReentrantLock中,State==0时,虽然也表示无锁状态,但是只有一个线程可以重置State的值。这就是共享锁的含义。

继续向下执行,ThreadA尝试获取锁失败后,会调用doAcquireSharedInterruptibly:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //将当期线程加入等待队列尾部. 和独占锁的流程一样, 区别在于mode是共享锁类型
boolean failed = true;
try {
for (;;) {
// node也就是当期节点的前置节点
final Node p = node.predecessor();
if (p == head) { // 当期节点的前置节点如果是head
int r = tryAcquireShared(arg); // 尝试获得共享锁 这里当然也是失败的
if (r >= 0) {
//成功 键
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 如果node的前置节点不是head节点. or 获取共享锁失败.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//这个方法和独占锁一样, 都是判断自己是否要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;// 前置节点的等待状态
if (ws == Node.SIGNAL) // SIGNAL的化 自己就可以安心等待被唤醒了
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 如果是CANCELED. 进入循环向前查找. 知道找到一个<0的节点
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //都不是 也就是锁状态可能是 0 -2 -3
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 将前置节点设置为SIGNAL.
}
return false;
}
//将自己阻塞掉. 唤醒时会返回中断状态.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

TA的加入等待队列并阻塞了.

ThreadB调用await()方法等待

流程和步骤2完全相同,调用后ThreadB也被加入到等待队列中:1)尝试获取共享锁. 失败. 2) 将TB包装成共享节点node 加入等待队列. 3) 查看当期节点的前置节点是否和head相等. 如果相等. 就再次尝试获取共享锁. 继续失败 4) 判断自己是否需要被阻塞. 查看前置节点的等待状态 如果==SIGNAL 自己可以安心等待被唤醒了 如果>0 也就是CANCELED. 就向前遍历链表. 直到找到一个小于等于0 并将节点的next设置称自己. 自己的prev设置成它. 如果 小于等于0. 那么cas的将前置节点的状态设置为SIGNAL. 5) 将自己阻塞.

主线程main调用countDown()放行

ThreadA和ThreadB调用了await()方法后都在等待了,现在主线程main开始调用countDown()方法,该方法调用后,ThreadA和ThreadB都会被唤醒,并继续往下执行,达到类似门栓的作用。

1
2
3
public void countDown() {
sync.releaseShared(1); //释放共享锁
}

该方法内部调用了AQS的releaseShared方法,先尝试一次释放锁,tryReleaseShared方法是一个钩子方法,由CountDownLatch实现,当同步State状态值首次变为0时,会返回true:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试一次释放锁
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 自旋的释放锁 知道count ==0
for (;;) {
int c = getState(); // 获取共享锁状态
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
* 释放锁之后 将等待队列中的线程唤醒
*/
for (;;) { // 唤醒下一个等待的节点. 自旋是为了保证state能够设置成功.
Node h = head; // 拿到首节点
if (h != null && h != tail) { //哨兵
int ws = h.waitStatus; // 获得首节点的等待状态
if (ws == Node.SIGNAL) { // ==SIGNAL CAS的将状态设置为0 直到成功
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //唤醒
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
//唤醒逻辑
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; //获得当期节点的等待状态
if (ws < 0) // 小于0 将其设置为0
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 获得后置节点
if (s == null || s.waitStatus > 0) { //如果是null或者 是CANCELED. 从尾部开始向前遍历. 直到找到下于等于0的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //唤醒他
LockSupport.unpark(s.thread);
}
TA在阻塞除开始执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //将当期线程加入等待队列尾部. 和独占锁的流程一样, 区别在于mode是共享锁类型
boolean failed = true;
try {
for (;;) {
// node也就是当期节点的前置节点
final Node p = node.predecessor();
if (p == head) { // 当期节点的前置节点如果是head
int r = tryAcquireShared(arg); // 尝试获得共享锁 这里当然也是失败的
if (r >= 0) {
//成功 键
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 从这里被唤醒.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); //如果阻塞期间被中断. 抛出中断异常作为相应中断
}
} finally {
if (failed)
cancelAcquire(node);
}
}

由于自旋的存在. 它将获取自己的前置节点和head比较, 如果等于. 就尝试获取共享锁. 会成功的. 以为我们已经将count自旋的减到0了. 接下来执行setHeadAndPropagate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//将当期节点设置为head. 并尝试唤醒并释放后续节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);// 将自己设置为head
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway

*/
//判断是否要唤醒后置节点.
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//唤醒后置节点. 也就是唤醒B
if (s == null || s.isShared())
doReleaseShared();
}
}
ThreadB从原阻塞处继续向下执行

TB也在同样的位置醒来. 1) 判断前置节点是否为head , 2) 尝试获得共享锁. 3)将自己设置为head. 并判断是否需要唤醒后置节点.

总结

AQS的独占锁(公平说). 例子还是TA TB TC顺序加锁,然后依次释放锁释放锁.

TA.lock -> sync.lock() -> FairSync.acquire(1) -> tryAcquire(1)
1) 获得当期线程 cut

2) 获得锁的状态 c -> c==0 说明锁没有被占用. -> 是否自己之前有人等待. 否 CAS的修改状态从0改到1 成功后将自己的线程cut设置为锁拥有线程 TA加锁完毕

这里还有一个细节 如果c!=0 优先判断锁的持有者线程和获取锁的线程是否相等. 这是可重入的逻辑. 如果是相等的 c+1 so 0 表示无人占用 1 表示锁被占用. N表示锁被重入的次数

TB.lock() 流程和TA加锁流程一致. 区别在于tryAcquire(1) 会失败 -> addWaiter()

1) 获得当期线程包装成node 尝试一次 将自己加入等待队列尾部 如果失败交给end()方法 自旋知道成功

2) acquireQueued() 自旋方法 判断当期节点的前置节点是否是head. true 尝试获取锁 这里是失败的 TA还在持有锁. -> shouldParkAfterFailedAcquire() 判断自己是否要被阻塞. 拿到前置节点的waitStatus. 如果等于SIGNAL 说明自己可以安心被阻塞了. 等待被唤醒 如果等于CANCELED. 那么就从等待队列尾部开始遍历. 直到找到一个<=0的节点后.将自己添加大它的后面. 将他的状态设置为SIGNAL. -> parkAndCheckInterrupt() 将自己阻塞. 在这里等待唤醒 TB的加锁流程完成了

TC.lock流程和TB的一模一样. 也会自己阻塞掉

TA.unlock() -> sync.release(1) -> FairSync.release(1) -> tryRelease(1)

1) 获得当期线程cut 获得锁的状态 s s-1 判断解锁和加锁的线程是否一致. c==0 将线程的拥有者设置为null -> setState(c)

2) head!=null && head.waitStatus !=0 -> unparkSuccessor()唤醒后置节点 -> 获得自己的等待状态 如果<0 CAS的将其改为0 -> 获得自己的后置节点. 如果为null || next.ws >0 那么就从尾部开始遍历. 知道找到一个ws<=0的. 将其唤醒

-> parkAndCheckInterrupt() TB将在这个方法中醒来. 返回线程的中断状态. 表示自己在阻塞期间是否被中断过. 由于自旋的原因. 进入判断自己的前置节点是否为head 并尝试获得锁. 成功这里的加锁逻辑和TA的加锁逻辑一致. 成功后将自己设置为head. 完成.

TB.unlock和TC.unlock的逻辑和TA的逻辑完全一直.

响应中断的 就是唤醒时的处理逻辑不通. 响应中断的话排除一次.
响应超时 阻塞的时候设置超时时间. 如果在规定时间没有被唤醒. 就cancelAcquire(自己).这里有些逻辑. 1) 将自己持有的线程设置为null 从自己开始向前查找直到找到一个ws>0的节点. 将自己和它连接起来. 2) 将自己的ws设置为CANCELED 3)判断自己是否为tail && 将tail也就是自己CAS的设置为ws!=1的节点(n). true 将n的next 设置为null. false 可能存在并发设置tail的情况. 那么就直接将n的下一个节点设置为新的node. 并将n的ws设置为signal.

AQS独占锁(非公平锁). API对外暴露的API是一直的. 内部的不同就是

  1. lock() 非公平锁在加锁时 第一件事就是设置状态. 也就是去竞争. 公平锁是去尝试. 失败了加入等待队列等待被唤醒.
  2. tryAcquire() 非公平锁的实现第一步也是直接设置锁的状态为自己的. 公平锁要检查自己之前是否有人等待.

其他逻辑一致

AQS共享锁

new CountDownCatch(1) -> new Sync(count); -> setState(count);

TA.await() -> sync.acquireSharedInterruptibly(1) -> 此时如果线程被中断. 会抛出中断异常. 否则继续-> tryAcquireShared(arg=1) 尝试去获得共享锁. 判断的是state == 0, 所以0 表示无锁状态 其他值都表示已经有线程 这里是会失败的. 此时count也就是state =1. -> doAcquireSharedInterruptibly() -> 构建当前线程为node并设置共享属性 然后添加等待队列尾部. 和独占锁的逻辑是一致的 -> 自旋 获取当前节点n的前置节点p. 判断p是否为head. true 会尝试获得共享锁, 当然会失败. -> shouldParkAfterFailedAcquire() 判断自己是否要被阻塞.逻辑和独占锁一致. -> parkAndCheckInterrupt() 阻塞逻辑也是一致的. 线程TA在这里阻塞了

TB.await()和TA的逻辑页一致 阻塞

main线程执行countDown()-> sync.releaseShared(1); -> tryReleaseShared(1) 自旋 获取当前锁的状态 减去1 直到状态等于0 -> doReleaseShared() 自旋 获得head节点. 哨兵(h!=null && h!=tail) 获取head的等待状态 ws. 如果是signal CAS的将等待状态从-1 改成 0 直到成功. 成功后 -> unparkSuccessor() 唤醒后置节点的逻辑和独占锁一致. ws如果=0 并且将ws状态改为propagate -3, 传播. TA在parkAndCheckInterrupt()位置醒来. 因为自旋的缘故. 会继续检查自己的前置节点是否为head. 并且尝试获取共享锁. 这里就是获取成功了.因为state ==0. -> setHeadAndPropagate() -> 1 将自己设置为head 判断如何自己的后置节点不为null. 并且ws小于0 . 继续唤醒后置节点, TB就会被唤醒. 重复TA被唤醒的流程. 直到.next == null位置.

请参考多线程进阶AQS剖析