并发编程-共享式AQS源码详解

上篇文章详细的阐述了AQS在独占模式下的底层原理,本篇主要讲述共享式同步器的原理。

1. acquireShared(int)

此方式是共享模式下线程获取贡献资源的入口,他会获取指定量的资源,获取成功直接返回,失败则进入等待队列,知道获取到资源为止,整个过程忽略终端。下面看源码:

1
2
3
4
5
public final void acquireShared(int arg) {
//
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

这里 tryAcquireShared 依然需要自定义同步器去实现,但是AQS已经将返回值的语义定义好了,重载该函数的时候执行逻辑要符合下列语义:
-返回负值表示获取失败

  • 返回0表示获取成功,但是没有剩余资源
  • 返回正数表示获取成功,还有剩余资源

tryAcquireShared获取失败则执行 doAcquireShared 方法,看下面源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void doAcquireShared(int arg) {
//将线程以共享方式加入同步队列尾部
final Node node = addWaiter(Node.SHARED);
//获取失败吗,默认true(失败)
boolean failed = true;
try {
//记录等待过程是否被中断过
boolean interrupted = false;
for (;;) {
//拿到前驱节点
final Node p = node.predecessor();
if (p == head) {//如果前驱是头结点
//尝试获取
int r = tryAcquireShared(arg);
if (r >= 0) {
//自己获取资源的同时,如果还有剩余资源,唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)//补上中断标志
selfInterrupt();
failed = false;
return;
}
}
//前驱不是头结点,获取失败后寻找安全点
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

整个过程与acquireQueued()很相似,区别在于唤醒等待线程的条件不同。setHeadAndPropagate方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,看代码:

1
2
3
4
5
6
7
8
9
10
11
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //与独占式不同原head并没有释放资源
setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

2. releaseShared()

上一小节已经把acquireShared()说完了,这一小节就来讲讲它的反操作releaseShared()吧。此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

2.1 doReleaseShared()

此方法主要用于唤醒后继。下面是它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

3. Semaphore

一个具象化的例子:
停车场运作,假设停车场有10个车位,刚开始都是空的。如果同时来了11辆车,看守者只能允许10辆车进入,另一辆排队等候,当有车为空出来,等候车辆进入填满空车位。Semaphore就相当于停车场看守者。

和RentrantLock不同Semaphore没有实现Lock接口,获取资源有响应中断模式和忽略中断模式,中断模式获取资源:

1
2
3
4
5
6
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int i) throws InterruptedException {
sync.acquireSharedInterruptibly(i);
}

释放资源统一使用:

1
2
3
4
5
6
public void release() {
sync.releaseShared(1);
}
public void release(int i) {
sync.releaseShared(i);
}

内部同步器sync重载的tryAcquireShared-tryRealseShared源码如下,代码逻辑简单易懂,实现自定义的同步器一般也只需要实现这几个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//非公平
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//公平
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}