并发学习(八) AbstractQueuedSynchronizer 源码分析(独占/共享)

简介

AbstractQueuedSynchronizer (AQS) 抽象队列同步器。AQS 是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。

原理

AQS是通过维护一个FIFO队列来管理多线程的。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个结点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。大致示意图如下:

aqs.png

当头结点释放了同步状态后,如果后继结点为阻塞状态,头结点会去唤醒后继结点的线程,后继结点获取同步状态后,会移除原头结点。

重要方法介绍

首先要知道,AQS主要做了以下三件事情:

  • 同步状态的管理

  • 线程的阻塞和唤醒

  • 同步队列的维护

下面三个protected final方法是AQS中用来访问/修改同步状态的方法:

  • int getState(): 获取同步状态

  • void setState(): 设置同步状态

  • boolean compareAndSetState(int expect, int update):基于CAS,原子设置当前状态

以下的方法需要工具类(如CountDownLatch,ReentrantLock)来复写

方法 说明
boolean tryAcquire(int arg) 独占式获取同步状态
boolean tryRelease(int arg) 独占式释放同步状态
int tryAcquireShared(int arg) 共享式获取同步状态
boolean tryReleaseShared(int arg) 共享式释放同步状态
boolean isHeldExclusively() 检测当前线程是否获取独占锁

以下的方法是一组模板方法,同步组件可直接调用:

方法 说明
void acquire(int arg) 独占式获取同步状态,该方法将会调用 tryAcquire 尝试获取同步状态。获取成功则返回,获取失败,线程进入同步队列等待。
void acquireInterruptibly(int arg) 响应中断版的 acquire
boolean tryAcquireNanos(int arg,long nanos) 超时+响应中断版的 acquire
void acquireShared(int arg) 共享式获取同步状态,同一时刻可能会有多个线程获得同步状态。比如读写锁的读锁就是就是调用这个方法获取同步状态的。
void acquireSharedInterruptibly(int arg) 响应中断版的 acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos) 超时+响应中断版的 acquireShared
boolean release(int arg) 独占式释放同步状态
boolean releaseShared(int arg) 共享式释放同步状态

源码分析

在线程获取同步状态失败的时候,会将线程封装成一个结点,置于队列尾部,结点除了有线程以外,还有前驱、后继、状态等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
static final class Node {

/** 共享类型结点,标记结点在共享模式下等待 */
static final Node SHARED = new Node();

/** 独占类型结点,标记结点在独占模式下等待 */
static final Node EXCLUSIVE = null;

/** 等待状态 - 取消 */
static final int CANCELLED = 1;

/**
* 等待状态 - 通知。某个结点是处于该状态,当该结点释放同步状态后,
* 会通知后继结点线程,使之可以恢复运行
*/
static final int SIGNAL = -1;

/** 等待状态 - 条件等待。表明结点等待在 Condition 上 */
static final int CONDITION = -2;

/**
* 等待状态 - 传播。表示无条件向后传播唤醒动作
*/
static final int PROPAGATE = -3;

/**
* 等待状态,取值如下:
* CANCELLED ---- 1,
* 0--------------0,
* SIGNAL ------ -1,
* CONDITION --- -2,
* PROPAGATE --- -3,
*
* 初始情况下,waitStatus = 0
*/
volatile int waitStatus;

/**
* 前驱结点
*/
volatile Node prev;

/**
* 后继结点
*/
volatile Node next;

/**
* 结点对应线程
*/
volatile Thread thread;

/**
* 下一个等待结点,用在 ConditionObject 中
*/
Node nextWaiter;

/**
* 判断结点是否是共享结点
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* 获取前驱结点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

/** addWaiter 方法会调用该构造方法 */
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

/** Condition 中会用到此构造方法 */
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

独占模式

acquire

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquire会调用子类复写的 tryAcquire 方法进行尝试获取同步状态

  • 获取成功:不执行任何代码直接返回
  • 获取失败:将线程封装为node,置于尾部通过自旋尝试获取同步状态。在每次尝试的过程中进行相关判断,判断是否还要继续尝试,如果返回不用继续尝试,该线程将会被 LockSupport.park 方法阻塞住。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试以快速方式将节点添加到队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
//通过CAS将结点插入尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}

/*
* 快速插入节点失败
* 出现 pred == null 或者 CAS失败的情况
* 调用 enq 方法,不停的尝试插入节点
*/
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
/*
* 尾部为空,说明是初始情况,此时设置头结点
*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
* 将节点插入队列尾部。这里是先将新节点的前驱设为尾节点,之后在尝试将新节点设为尾节
* 点,最后再将原尾节点的后继节点指向新的尾节点。除了这种方式,我们还先设置尾节点,
* 之后再设置前驱和后继,即:
*
* if (compareAndSetTail(t, node)) {
* node.prev = t;
* t.next = node;
* }
*
* 但如果是这样做,会导致一个问题,即短时内,队列结构会遭到破坏。考虑这种情况,
* 某个线程在调用 compareAndSetTail(t, node)成功后,该线程被 CPU 切换了。此时
* 设置前驱和后继的代码还没带的及执行,但尾节点指针却设置成功,导致队列结构短时内会
* 出现如下情况:
*
* +------+ prev +-----+ +-----+
* head | | <---- | | | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* tail 节点完全脱离了队列,这样导致一些队列遍历代码出错。如果先设置
* 前驱,在设置尾节点。及时线程被切换,队列结构短时可能如下:
*
* +------+ prev +-----+ prev +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* 这样并不会影响从后向前遍历,不会导致遍历逻辑出错。
*
* 参考:
* https://www.cnblogs.com/micrari/p/6937995.html
*/
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

/**
* 在每次尝试的过程中进行相关判断,判断是否还要继续尝试,如果返回不用继续尝试(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()为真)
* 该线程将会被 LockSupport.park 方法阻塞住。
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前驱结点
final Node p = node.predecessor();
// 如果前驱结点是头结点,再次尝试获取同步状态,因为前驱为头结点,说明前驱已经获得了同步状态
// 在前驱结点释放同步状态后,tryAcquire(arg)也返回true
// 此时将node设置为头结点,并且前驱出队
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果获取同步状态失败,则要判断是否继续尝试获取(是否应该阻塞自己)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 在抛出异常的情况下会执行cancelAcquire(node)
if (failed)
cancelAcquire(node);
}
}


private void setHead(Node node) {
// 仅有一个线程可以成功获取同步状态,所以这里不需要进行同步控制
head = node;
node.thread = null;
node.prev = null;
}

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
/*
* 前驱节点等待状态为 SIGNAL,表示当前线程应该被阻塞。
* 线程阻塞后,会在前驱节点释放同步状态后被前驱节点线程唤醒
*/
if (ws == Node.SIGNAL)
return true;
/*
* 前驱节点等待状态为 CANCELLED,则以前驱节点为起点向前遍历,
* 向前遍历移除其他等待状态为 CANCELLED 的节点。
*/
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 等待状态为 0 或 PROPAGATE,设置前驱节点等待状态为 SIGNAL,
* 并再次尝试获取同步状态。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private void cancelAcquire(Node node) {
// 忽略结点为空的情况
if (node == null)
return;

node.thread = null;

// 跳过状态为CANCELED的结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 记录pred节点的后继为predNext,后续CAS会用到。
Node predNext = pred.next;

//当前结点设置为CANCELED
node.waitStatus = Node.CANCELLED;


// If we are the tail, remove ourselves.
/*
* 如果当前节点是tail,通过CAS将前驱结点设为tail。
* prev 的 next 引用置空,断开与后继节点的联系,完成清理工作。
*/
if (node == tail && compareAndSetTail(node, pred)) {
/*
* 执行到这里,表明 pred 节点被成功设为了尾节点,这里通过 CAS 将 pred 节点的后继节点
* 设为 null。注意这里的 CAS 即使失败了,也没关系。失败了,表明 pred 的后继节点更新
* 了。pred 此时已经是尾节点了,若后继节点被更新,则是有新节点入队了。这种情况下,CAS
* 会失败,但失败不会影响同步队列的结构。
*/
compareAndSetNext(pred, predNext, null);
} else {
// 如果node还有后继节点,这种情况要做的事情是把pred和后继非取消节点拼起来。
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
/*
* 如果node的后继节点next非取消状态的话,则用CAS尝试把pred的后继置为node的后继节点
* 这里if条件为false或者CAS失败都没关系,这说明可能有多个线程在取消,总归会有一个能成功的。
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
/*
* 唤醒后继节点对应的线程。这里简单讲一下为什么要唤醒后继线程,考虑下面一种情况:
* head node1 node2 tail
* ws=0 ws=1 ws=-1 ws=0
* +------+ prev +-----+ prev +-----+ prev +-----+
* | | <---- | | <---- | | <---- | |
* | | ----> | | ----> | | ----> | |
* +------+ next +-----+ next +-----+ next +-----+
*
* 头结点初始状态为 0,node1、node2 和 tail 节点依次入队。node1 自旋过程中调用
* tryAcquire 出现异常,进入 cancelAcquire。head 节点此时等待状态仍然是 0,它
* 会认为后继节点还在运行中,所以它在释放同步状态后,不会去唤醒后继等待状态为非取消的
* 节点 node2。如果 node1 再不唤醒 node2 的线程,该线程面临无法被唤醒的情况。此
* 时,整个同步队列就回全部阻塞住。
*/
unparkSuccessor(node);
}


/*
* 取消节点的next之所以设置为自己本身而不是null,
* 是为了方便AQS中Condition部分的isOnSyncQueue方法,
* 判断一个原先属于条件队列的节点是否转移到了同步队列。
*
* 因为同步队列中会用到节点的next域,取消节点的next也有值的话,
* 可以断言next域有值的节点一定在同步队列上。
*
* 在JVM层面就是该结点与GC ROOT依然是可达状态的,所以将其与同步队列断开
* 在GC层面,和设置为null具有相同的效果。
*/
node.next = node; // help GC
}
}


private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
/*
* 通过 CAS 将等待状态设为 0,让后继节点线程多一次
* 尝试获取同步状态的机会
*/
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
//如果后继结点为CANCELED,则从tail向前遍历找到非CANCELED结点
/*
* 这里如果 s == null 处理,是不是表明 node 是尾节点?答案是不一定。新节点入队时,队列瞬时结构可能如下:
* node1 node2
* +------+ prev +-----+ prev +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* node2 节点为新入队节点,此时 tail 已经指向了它,但 node1 后继引用还未设置。
* 这里 node1 就是 node 参数,s = node1.next = null,但此时 node1 并不是尾
* 节点。所以这里不能从前向后遍历同步队列,应该从后向前。
*/
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒node后继线程
LockSupport.unpark(s.thread);
}

到这里,AQS独占式获取同步状态就分析完了,参考资料的博主(田小波 和 MottoX)分析的真的很好,分析的很清楚,也很值得学习,能写出优质的文章很不容易。

下面对整个流程进行总结:

  1. 调用 tryAcquire 方法尝试获取同步状态
  2. 获取成功,直接返回
  3. 获取失败,将线程封装到节点中,并将节点入队
    入队节点在 acquireQueued 方法中自旋获取同步状态
    若节点的前驱节点是头节点,则再次调用 tryAcquire 尝试获取同步状态
    获取成功,当前节点将自己设为头节点并返回
    获取失败,可能再次尝试,也可能会被阻塞。这里简单认为会被阻塞。

584724-20170612211300368-774544064.png

release

释放同步状态分为两步:

  1. 调用tryRelease方法尝试释放同步状态
  2. 根据条件判断是否应该唤醒后继线程

下面看源代码

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

此处 h != null && h.waitStatus != 0 分三种情况

  1. head == null

    head 还未初始化。初始情况下,head = null,当第一个节点入队后,head 会被初始为一个虚拟(dummy)节点。这里,如果还没节点入队就调用 release 释放同步状态,就会出现 h = null 的情况。

  2. head != null && waitStatus < 0

    状态为SINGAL、CONDITION、PROPAGATE此时被阻塞了,需要唤醒

  3. head != null && waitStatus == 0

    表明后继节点对应的线程仍在运行中,不需要唤醒

共享模式

在共享模式下,同一时刻会有多个线程获取共享同步状态。共享模式是实现读写锁中的读锁、CountDownLatch 和 Semaphore 等同步组件的基础。

acquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {

final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//自旋获得同步状态
boolean interrupted = false;
for (;;) {
//获取前驱结点
final Node p = node.predecessor();
/*
* 前驱是头结点,其类型可能是 EXCLUSIVE,也可能是 SHARED.
* 如果是 EXCLUSIVE(独占式),线程无法获取共享同步状态。
* 如果是 SHARED(共享式),线程则可获取共享同步状态。
* 能不能获取共享同步状态要看 tryAcquireShared 具体的实现。比如多个线程竞争读写
* 锁的中的读锁时,均能成功获取读锁。但多个线程同时竞争信号量时,可能就会有一部分线
* 程因无法竞争到信号量资源而阻塞。
*
*/
if (p == head) {
//如果前驱结点是头结点,再次尝试获得同步状态
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头结点,如果后继节点是共享类型,唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//此处在独占式已经分析过了
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//设置头结点
setHead(node);
//如果后继结点是非取消的唤醒后继结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
/*
* 节点 s 如果是共享类型节点,则应该唤醒该节点
* s == null 的情况前面和之前unparkSuccessor方法类似。
* propagate在最后有解释
*/
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果头结点是SIGNAL,则将状态设为0,唤醒后继结点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果结点状态是0,则将状态设为PROPAGATE,保证唤醒能够正常传播下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

下面是共享模式获取同步状态的分步:

  1. 获取共享同步状态
  2. 若获取成功,直接返回结束
  3. 若获取失败,则生成节点,并入队
  4. 如果前驱为头结点,再次尝试获取共享同步状态
  5. 获取成功则将自己设为头结点,如果后继节点是共享类型的,则唤醒
  6. 若失败,将节点状态设为 SIGNAL,再次尝试。若再次失败,线程进入等待状态

releaseShared

共享模式释放同步状态主要调用了doReleaseShared,而doReleaseShared方法在之前获取同步状态已经分析过了,共享节点线程在获取同步状态和释放同步状态时都会调用 doReleaseShared,所以 doReleaseShared 是多线程竞争集中的地方。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

拓展(PROPAGATE 状态存在的意义利用 PROPAGATE 传播唤醒动作)

在整段代码中,最让我不理解的地方是PROPAGATE(-3)状态,开始不知道有什么用,在参考的博客中给了很详细的解释,方便理解,在经过本人同意后,下面整段引用AbstractQueuedSynchronizer 原理分析 - 独占/共享模式 | 田小波的博客

利用 PROPAGATE 传播唤醒动作

PROPAGATE 状态是用来传播唤醒动作的,那么它是在哪里进行传播的呢?答案是在setHeadAndPropagate方法中,这里再来看看 setHeadAndPropagate 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

大家注意看 setHeadAndPropagate 方法中那个长长的判断语句,其中有一个条件是h.waitStatus < 0,当 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,这个条件就会成立。那么 PROPAGATE 状态是在何时被设置的呢?答案是在doReleaseShared方法中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {...}

// 如果 ws = 0,则将 h 状态设为 PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
...
}
}

再回到 setHeadAndPropagate 的实现,该方法既然引入了h.waitStatus < 0这个条件,就意味着仅靠条件propagate > 0判断是否唤醒后继节点线程的机制是不充分的。至于为啥不充分,请继续往看下看。

引入 PROPAGATE 所解决的问题

PROPAGATE 的引入是为了解决一个 BUG – JDK-6801020,复现这个 BUG 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.Semaphore;

public class TestSemaphore {

private static Semaphore sem = new Semaphore(0);

private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}

private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}

根据 BUG 的描述消息可知 JDK 6u11,6u17 两个版本受到影响。那么,接下来再来看看引起这个 BUG 的代码 – JDK 6u17 中 setHeadAndPropagate 和 releaseShared 两个方法源码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}

// 和 release 方法的源码基本一样
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

下面来简单说明 TestSemaphore 这个类的逻辑。这个类持有一个数值为 0 的信号量对象,并创建了4个线程,线程 t1 和 t2 用于获取信号量,t3 和 t4 则是调用 release() 方法释放信号量。在一般情况下,TestSemaphore 这个类的代码都可以正常执行。但当有极端情况出现时,可能会导致同步队列挂掉。这里演绎一下这个极端情况,考虑某次循环时,队列结构如下:
15251540602100.jpg

  1. 时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为0,并唤醒线程 t1。此时信号量数值为1。
  2. 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回0。然后线程 t1 被切换,暂停运行。
  3. 时刻3:线程 t4 调用 releaseShared 方法,因 head 的状态为0,所以 t4 不会调用 unparkSuccessor 方法。
  4. 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。但因为 propagate = 0,线程 t1 无法调用 unparkSuccessor 唤醒线程 t2,t2 面临无线程唤醒的情况。因为 t2 无法退出等待状态,所以 t2.join 会阻塞主线程,导致程序挂住。

下面再来看一下修复 BUG 后的代码,根据 BUG 详情页显示,该 BUG 在 JDK 1.7 中被修复。这里找一个 JDK 7 较早版本(JDK 7u10)的代码看一下,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

在按照上面的代码演绎一下逻辑,如下:

  1. 时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为0,并唤醒线程t1。此时信号量数值为1。
  2. 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回0。然后线程 t1 被切换,暂停运行。
  3. 时刻3:线程 t4 调用 releaseShared 方法,检测到h.waitStatus = 0,t4 将头节点等待状态由0设为PROPAGATE(-3)。
  4. 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。因 propagate = 0,propagate > 0 条件不满足。而 h.waitStatus = PROPAGATE(-3),所以条件h.waitStatus < 0成立。进而,线程 t1 可以唤醒线程 t2,完成唤醒动作的传播。

到这里关于状态 PROPAGATE 的内容就讲完了。最后,简单总结一下本章开头提的两个问题。

问题一:PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的?
答:PROPAGATE 状态用在 setHeadAndPropagate。当头节点状态被设为 PROPAGATE 后,后继节点成为新的头结点后。若 propagate > 0 条件不成立,则根据条件h.waitStatus < 0成立与否,来决定是否唤醒后继节点,即向后传播唤醒动作。

问题二:引入 PROPAGATE 状态是为了解决什么问题?
答:引入 PROPAGATE 状态是为了解决并发释放信号量所导致部分请求信号量的线程无法被唤醒的问题。

参考资料