并发学习(九) AbstractQueuedSynchronizer 源码分析(Condition 实现原理)

Condition 接口

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知队模式,COndition 接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

对比项 Object Monitor Methods Condition
前置条件 获取对象的锁 调用Lock.lock()获取锁 调用Lock.newCondition()获取Condition对象
调用方式 直接调用 如object.wait() 直接调用 如 condition.await()
等待队列个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
当前线程释放锁并进入等待状态,在等待状态中不响应中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。

Condition的实现分析

ConditionObject是同步器AQS的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也比较合理。每个Condition都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键

以下是Condition与AQS的关系

2-1533098227-1.jpeg

等待队列

等待队列是一个FIFO的队列,在队列中的每个结点都包含了一个线程引用,该线程是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁,构造成结点加入等待队列并进入等待状态。并且值得一提的是,该结点Node复用了AQS中结点的定义。

一个Condition包含了一个等待队列,Condition拥有首结点(firstWaiter)和为结点(lastWaiter)。当前线程调用Condition.wait()方法,将会以当前线程构造结点,并将结点从尾部加入等待队列。
Condition用哦与首尾结点的引用,而新增结点只需要将原有的尾结点nextWaiter指向它,并更新尾结点即可。上述结点引用更新的过程并没有使用CAS保证,因为在调用await()方法的线程必定是获取了锁的线程,也就是说过程是由锁来保证线程安全的。

当有线程在获取独占锁的情况下调用signal或singalAll方法时,队列中的等待线程将会被唤醒,重新竞争锁。另外,需要说明的是,一个锁对象可同时创建多个 ConditionObject 对象,这意味着多个竞争同一独占锁的线程可在不同的条件队列中进行等待。在唤醒时,可唤醒指定条件队列中的线程。

源码分析

等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
//await是响应中断方法
public final void await() throws InterruptedException {
//如果线程中断,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//将线程封装成结点加入到队尾
Node node = addConditionWaiter();
/** 释放lock的锁 跟reentrantLock里的Lock方法是一样的逻辑,就是把头节点释放 并唤醒头节点的后续节点 第一个waitStatus不是cancelled的节点获取锁**/
int savedState = fullyRelease(node);
int interruptMode = 0;


// 如果当前队列不在同步队列中,说明刚刚被await, 还没有人调用signal方法,则直接将当前线程挂起
while (!isOnSyncQueue(node)) {
// 线程将在这里被挂起,停止运行
LockSupport.park(this);

/*
* Mode meaning to reinterrupt on exit from wait
* private static final int REINTERRUPT = 1;
* 中断在 node 转移到同步队列“期间”或“之后”发生,此时表明有线程正在调用 singal/singalAll 转移节点。在该种中断模式下,再次设置线程的中断状态。向后传递中断标志,由后续代码去处理中断。
*
* Mode meaning to throw InterruptedException on exit from wait
* private static final int THROW_IE = -1;
* 中断在 node 转移到同步队列“前”发生,需要当前线程自行将 node 转移到同步队列中,并在随后抛出 InterruptedException 异常。
*/
// 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了
// 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

/*
* 被转移到同步队列的节点 node 将在 acquireQueued 方法中重新获取同步状态,注意这里
* 的这里的 savedState 是上面调用 fullyRelease 所返回的值,与此对应,可以把这里的
* acquireQueued 作用理解为 fullyAcquire(并不存在这个方法)。
*
* 如果上面的 while 循环没有产生中断,则 interruptMode = 0。但 acquireQueued 方法
* 可能会产生中断,产生中断时返回 true。这里仍将 interruptMode 设为 REINTERRUPT,
* 目的是继续向后传递中断,acquireQueued 不会处理中断。
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;

/*
* 正常通过 singal/singalAll 转移节点到同步队列时,nextWaiter 引用会被置空。
* 若发生线程产生中断(THROW_IE)或 fullyRelease 方法出现错误等异常情况,
* 该引用则不会被置空
*/
if (node.nextWaiter != null) // 清除后续节点是cancelled的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
/*
* 根据 interruptMode 觉得中断的处理方式:
* THROW_IE:抛出 InterruptedException 异常
* REINTERRUPT:重新设置线程中断标志
*/
reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
/*
* 清理等待状态为 CANCELLED 的节点。fullyRelease 内部调用 release 发生异常或释放同步状
* 态失败时,节点的等待状态会被设置为 CANCELLED。所以这里要清理一下已取消的节点
*/
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建Condition结点并插入尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

//清除状态为 CANCELED 的结点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
/*
* trail 为 null,表明 next 之前的节点等待状态均为 CANCELLED,此时更新
* firstWaiter 引用的指向。
* trail 不为 null,表明 next 之前有节点的等待状态为 CONDITION,这时将
* trail.nextWaiter 指向 next 节点。
*/
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

/**
* 这个方法用于完全释放同步状态。这里解释一下完全释放的原因:为了避免死锁的产生,锁的实现上
* 一般应该支持重入功能。对应的场景就是一个线程在不释放锁的情况下可以多次调用同一把锁的
* lock 方法进行加锁,且不会加锁失败,如失败必然导致导致死锁。锁的实现类可通过 AQS 中的整型成员
* 变量 state 记录加锁次数,每次加锁,将 state++。每次 unlock 方法释放锁时,则将 state--,
* 直至 state = 0,线程完全释放锁。用这种方式即可实现了锁的重入功能。
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取同步状态数值
int savedState = getState();
// 调用 release 释放指定数量的同步状态
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 如果 relase 出现异常或释放同步状态失败,此处将 node 的等待状态设为 CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

//判断结点是否在同步队列中
final boolean isOnSyncQueue(Node node) {
/*
* 在上一篇文章中能知道同步队列中的结点状态只有CANCELED、0、SIGNAL、PROPAGATE并没有涉及Condition
* node.prev 仅会在节点获取同步状态后,调用 setHead 方法将自己设为头结点时被置为
* null,所以只要节点在同步队列上,node.prev 一定不会为 null
*/
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;


/*
* 如果节点后继不为 null,则表明节点在同步队列上。因为条件队列使用的是 nextWaiter 指
* 向后继节点的,条件队列上节点的 next 指针均为 null。
*/
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*
* 仅以 node.next != null 条
* 件断定节点在同步队列是不充分的。节点在入队过程中,是先设置 node.prev,后设置
* node.next(上一篇文章解释了两次这个问题)。如果设置完 node.prev 后,线程被切换了,此时 node.next 仍然为
* null,但此时 node 确实已经在同步队列上了,所以这里还需要进行后续的判断。
*/
return findNodeFromTail(node);
}

//从后往前(由于同步队列上的的节点 prev 引用不会为空 而 next可能为空)
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}


// 检测线程在等待期间是否发生了中断
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}


final boolean transferAfterCancelledWait(Node node) {
// 中断在节点被转移到同步队列前发生,此时自行将节点转移到同步队列上,并返回 true
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}

/*
* 如果上面的条件分支失败了,则表明已经有线程在调用 signal/signalAll 方法了,这两个
* 方法会先将节点等待状态由 CONDITION 设置为 0 后,再调用 enq 方法转移节点。下面判断节
* 点是否已经在同步队列上的原因是,signal/signalAll 方法可能仅设置了等待状态,还没
* 来得及转移节点就被切换走了。所以这里用自旋的方式判断 signal/signalAll 是否已经完
* 成了转移操作。这种情况表明了中断发生在节点被转移到同步队列期间。
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

/**
* 根据中断类型做出相应的处理:
* THROW_IE:抛出 InterruptedException 异常
* REINTERRUPT:重新设置中断标志,向后传递中断
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

/** 中断线程 */
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//将等待队列头结点转移到同步队列
public final void signal() {
// 检查线程是否获取了独占锁,未获取独占锁调用 signal 方法会抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取头结点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
//将头结点指针指向first.nextWaiter 如果为null 说明队列已空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//原头结点与队列分离
first.nextWaiter = null;
/*
* 调用 transferForSignal 将节点转移到同步队列中,如果失败,且 firstWaiter
* 不为 null,则再次进行尝试。transferForSignal 成功了,while 循环就结束了。
*/
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

//将等待队列头结点转移到同步队列
final boolean transferForSignal(Node node) {
/*
* 将该结点(头结点)状态设置为0,如果CAS失败,则转移失败
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* 调用 enq 方法将 node 转移到同步队列中,并返回 node 的前驱节点 p
*/
Node p = enq(node);
// ws是前驱结点的状态
int ws = p.waitStatus;
/*
* 如果 ws > 0,则表明前驱节点处于取消状态,此时应唤醒 node 对应的线程去获取同步状态
* 如果 ws <= 0,这里通过 CAS 将节点 p 的等待设为 SIGNAL。
*
* 如果 ws > 0 (CANCELED) 此时立即唤醒node获取同步状态
* 如果 ws <= 0,则用CAS将前驱结点的状态设为SIGNAL,如果CAS失败则要唤醒node结点的线程,防止前面线程释放
* 同步状态后,node没有唤醒导致整个同步队列就回全部阻塞住
*/
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

//*************************** signalAll *******************************

public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
//将first结点分离
first.nextWaiter = null;
//转移到同步队列中
transferForSignal(first);
first = next;
} while (first != null);
}

关于await()(JDK1.8)中的缺陷

在JDK1.8中的await()中,如果两个线程同时进行await(),此时有可能会破坏等待队列的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);

// 错误发生的代码
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

如果有2个线程t1,t2,对应的结点node1,node2,此时队列为空,两个线程同时到if (t == null),且都满足条件,此时firstWaiter指向了node1,而t2随之再次修改firstWaiter指向了node2,此时node1就不在等待队列中了,他会一直阻塞下去,因为在signal方法中,是通过将firstWaiter转移到同步队列实现的

参考资料