并发编程-独占式AQS源码详解

1. 框架概述

AQS是AbstractQueuedSynchronizer的简称,抽象队列同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类的实现都依赖于它,比如常用的ReentrantLock/CountDownLatch/Semaphore

AQS维护了一个volatile int state 代表共享资源,一个FIFO线程等待队列用来记录争用资源而进入等待的线程,这里有一点需要强调,AQS同步队列中的线程是处于WAITING状态的,而竞争synchronized同步块的线程是处于BLOCKED状态的

线程获取AQS框架下的锁先是尝试CAS乐观锁去获取,获取不到才会转换为悲观锁,如线程获取ReentrantLock在CAS阶段是处于RUNNABLE状态的,获取失败进入等待队列才会转换成WAITING状态。

AQS定义了两种组员共享方式:Exclusive 和 Share

用户自定义同步器时只需要实现共享资源state的获取与释放方式,至于具体的线程等待队列的维护,AQS已经实现好了。自定义同步器需要实现的几个方法:

  • isHeldExclusively() 该线程是否正在独占资源,只有用到Condition才需要实现它
  • tryAcquire(int) 独占方式获取资源,获取成功返回ture
  • tryRelease(int) 独占方式释放资源,释放成功返回ture
  • tryAcquireShared(int) 共享方式获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int) 共享方式释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程tryRelease()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会向上累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

2. 源码详解

本节依照acquire-release、acquireShared-releaseShared的次序来讲解AQS的源码实现。

2.1 acquire(int)

该方法是在独占模式下获取独占资源的顶层入口,如果获取资源成功tryAcquire返回true,该函数直接返回,且整个过程忽略中断的影响;否则调用addWaiter将线程包装成Node对象进入阻塞队列,并不断acquireQueued获取资源。

这里使用模板方法设计模式:acquire定义在抽象类中实现,调用的tryAcquire方法抽象类并没有给出实现逻辑,而是交给子类去实现。

1
2
3
4
5
6
// AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

函数流程如下:

  1. tryAcquire() 尝试直接去获取资源,如果成功acquire()方法直接返回,表示获取资源成功;若失败,则需要进行一系列处理
  2. 首先,调用addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式;
  3. 然后,调用acquireQueued() 尝试在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待资源的过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的(关于中断的介绍请参考文章线程中断),获取资源后通过selfInterrupt(),将该线程的中断标志置为true。
2.1.1 tryAcquire(int)

此方法尝试获取独占资源,如果成功返回true,否则返回false。

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

AQS中该方法没有具体的执行逻辑,这是因为这是AQS定义的一个方法模板,具体的实现需要自定义同步类自己完成,能不能重入,竞争资源时可不可以加塞,都需要子类自己设计。如果子类没有实现该方法,就会调用AQS的默认实现,如上直接抛出异常。

2.1.2 addWaiter(Node)

此方法作用是将当前线程加入到阻塞队列的队尾,并返回当前线程所在节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//快速入队失败,调用enq方法入队
enq(node);
return node;
}

此处先介绍一下NodeNode节点是对每一个竞争同步代码的线程的封装,主要包含了当前线程对象以及线程的状态。变量waitStatus表示当前Node节点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE

  • CANCELLED : 值为1,表示当前节点处于结束状态,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node节点
  • SIGNAL 值为-1,表示当前节点线程取消或者释放资源的时候,需要unpark其后继节点
  • CONDITION 值为-2,表示当前节点处于条件队列,在转变(状态被设为0)之前不会被当做同步队列节点
  • PROPAGATE 值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态
  • 0 代表初始状态。
2.1.3 enq(Node)

此方法用于将node加入队尾。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

如果你看过AtomicInteger.getAndIncrement()函数源码,那么相信你一眼便看出这段代码的精华。CAS自旋volatile变量,是一种很常用很经典的用法。

2.1.4 acquireQueued(Node, int)

通过tryAcquire()addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了,下一步该干什么?进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。这个函数非常关键,上源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final boolean acquireQueued(final Node node, int arg) {
//获取资源失败了吗?
boolean failed = true;
try {
//标识等待过程中是否被中断过
boolean interrupted = false;
for (;;) {
//获得当前节点的前驱
final Node p = node.predecessor();
//如果前驱是head,那就有资格去尝试获取
if (p == head && tryAcquire(arg)) {
//获取资源成功,将自己设置成head
setHead(node);
//help GC,原头结点断开与队列的链接,等待被回收
p.next = null;
failed = false;//表示获取资源成功
return interrupted;
}
//先判断此次获取失败后可不可以 WAITTING,如果不能,继续重复循环
//执行park让线程进入WAITTING状态,并判断等待过程中有没有中断,发生过就改状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

那么怎么判断线程是不是应该执行park()呢?继续看下面代码,shouldParkAfterFailedAcquire方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态),万一排在队列前边的线程都取消了只是瞎站着,那就需要往前加塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的状态
int ws = pred.waitStatus;
//如果前驱节点状态是SIGNAL,说明前驱节点释放资源后会通知本节点,可以安全的执行park()
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//如果前驱节点是取消状态CANCELLED,执行加塞操作,跳过所有取消节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// waitStatus must be 0 or PROPAGATE
// 把前驱节点的状态设置成SIGNAL,前驱节点执行完释放资源就会通知本节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//返回false表示此次循环不能更改线程状态,返回到acquireQueued方法即系执行循环获取资源
return false;
}

整个流程用一句话概括,如果前驱结点的状态不是SIGNAL,那么自己就不能放心去休息,需要去找个安全的休息点,找到安全点后可以再尝试下看能不能获取资源,再次获取失败就可以放心进入WAITTING状态。

parkAndCheckInterrupt方法就是让线程执行park()进入WAITTINGZ状态,并返回该线程的中断标志

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

注意,Thread.interrupted()方法在获取线程中断标志的同时会将该标志复位为false

2.1.5 小结

源码再贴一遍:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

获取独占资源流程如下:
独占式AQS获取资源流程

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 否则addWaiter()将该线程加入等待队列的尾部;
  3. acquireQueued()使线程在等待队列中休息,当前驱节点为head 会去尝试获取资源,获取到资源后将自己设置为head,获取失败寻找安全点等待。注意此处寻找到安全点后不会立即park(),而是在下一次循环尝试获取失败后才会执行park()。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的,并且中断标志被Thread.interrupted()重置为false了,所以获取资源后才再进行自我中断selfInterrupt(),将中断标志重置为true。

2.2 release(int)

release是独占模式下线程释放共享资源的顶层接口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
源码:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒后继节点
return true;
}
return false;
}

逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了。所以自定义同步器在设计tryRelease()的时候要明确这一点

2.2.1 tryRelease(int)
1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

2.2.2 unparkSuccessor(Node)

此方法用于唤醒等待队列中下一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void unparkSuccessor(Node node) {
//获取当前节点的状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//置0
//获取对列中下一个需唤醒的线程节点
Node s = node.next;
//若后继节点已取消,找到最靠近head的有效节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
//waitStatus<=0的都是有效节点,都可以唤醒
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}

一句话概括,用用unpark()唤醒等待队列中最前边的那个有效线程。

疑问:head的后继节点取消的情况下,要寻找离head最近的有效节点,为什么要从tail开始往前找??

3. ReentrantLock

ReentrantLock自身没有继承AQS,但是它持有一个AQS的子类Sync的对象实例sync,Sync又派生了两个子类 FairSync 和 NonfairSync。ReentrantLock实例化时,无参的默认构造函数会使用NonfairSync对sync进行初始化;而接受一个布尔型变量的构造函数根据用户传入的参数决定使用公平锁还是非公平锁。

公平性是针对锁获取而言的,如果是公平锁,那么锁的获取顺序应该符合请求的绝对时间顺序,也就是FIFO,该原则保证公平的代价是进行大量的线程切换。非公平锁虽然可能造成线程饥饿,但是极少的线程切换保证了其更大的吞吐量,因此ReentrantLock默认实现非公平锁。

3.1 获取锁

下面代码是非公平锁和公平锁分别获取资源的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程对象
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//如果资源空闲,CAS设置状态量
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果资源被占用,判断持有锁的线程是不是本线程,是的话重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

重入锁的意义就是持有锁的线程可以多次重复进入临界区,而不需要在同步队列中等待,每次进入状态量加1,进入几次就要释放几次,释放1次状态量减1,当状态量为0时,完全释放资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//注意与非公平锁的区别
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

比较以上两个获取资源的函数,发现唯一的区别在于公平锁在设置状态量之前多做了一次判断 !hasQueuedPredecessors(),该函数返回是否有线程排在当前线程前面,如果没有则可以获得锁。hasQueuedPredecessors源码如下

1
2
3
4
5
6
7
8
9
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//队列中不止一个线程
//并且第二个线程节点为空或者第二个节点不是是自己
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

3.2 释放锁

释放操作没有公平与非公平之分,所以释放操作是在父类Sync中实现,下面看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//如果当前线程不是占用线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//状态量等于0,才是真正释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

因为释放锁之前,当前线程还持有锁,其他线程无权访问,所以修改状态没有用CAS,直接使用setState

共享式同步器 请看下一篇 并发编程-共享式AQS源码详解