并发编程-容器阻塞队列BlockingQueue

阻塞队列常用于生产者-消费者场景。

BQ有4套出队入队操作:

  • offer(e) & poll() 这套操作不会阻塞线程,队列满/空的时候返回特殊值 false/null
  • add(e) & remove() 该操作对offer(e) & pool()返回的特殊值抛出异常
  • put(e) & take() 阻塞方法,遇到队列满/空的时候会阻塞,直到收到通知可以继续执行
  • offer(e,time,unit) & poll(time,unit) 超时阻塞方法,超时返回 false/null

Jdk7中给出了7种BQ:

  • ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。
  • LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
  • PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。
  • DelayQueue 优先级队列实现的无界阻塞队列
  • SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
  • LinkenTransferQueue
  • LinkedBlockingDeque

本文将以LinkedBlockingQueue为例进行源码解读

1. Condition

任意的一个java对象,都拥有一组监视器方法(定义在Object类中),主要包括wait()、wait(long timeout)、notify()、notifyAll()方法,这些方法与sychronized关键字配合使用,可以实现等待/通知模式。Condition接口也通过类似Object的监视器方法,与Lock配合可以实现等待/通知模式。但是这两种方式在使用方式以及功能特性上还是有差别的:

  1. 每个Object监视器只有一个等待队列,而Condition接口可以支持多个等待队列
  2. 当前线程释放锁进入等待状态,Object监视器在等待过程中是不相应中断的,而Condition接口是可以的
  3. Object监视器不支持线程等待到将来的某个特定时间,Condition接口支持

1.1 Condition的原理

将在另一篇中解析AQS.ConditionObject类的源码

1.2 LBQ中的Condition

LBQ的入队和出队使用了两把重入锁,相应的也有两个条件队列notFull和notEmpty:

  • 当队列满的时候执行入队操作,入队线程会进入notFull等待,当有元素出队则通知入队线程–队列notFull,可以继续执行;
  • 当队列为空执行出队操作,出队线程会进入notEmpty等待,当有元素入队后则通知出队线程–队列notEmpty,可以继续执行。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

具体如何使用的,见下文LBQ源码分析

2. offer(e) & poll()

这套方法是在接口 Queue 中定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//以下代码摘自: java.util.concurrent.LinkedBlockingQueue
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//满了直接返回失败
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
//c是更新之前的计数
if (c + 1 < capacity)
//更新之后还未满,唤醒一个入队线程
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
//更新之前是空的,更新完就不空了,唤醒一个阻塞的出队线程
signalNotEmpty();
return c >= 0;
}

offer(e)方法总结:

  1. 开始先检查参数是否为null,null则抛出NPE异常;
  2. 然后判断队列是否已经满了,满了直接返回false;
  3. 以上检查都通过,构造新节点,获取入队锁putLock
  4. 二重检查,判断队列是否未满,如果未满执行入队,计数器加1,如果计数器更新之后还小于capacity,则唤醒一个入队线程(如果有入队线程阻塞)
  5. 最后判断一下该线程入队前是否为空队列,如果之前是空的,入队完成就可以唤醒一个阻塞的出队线程。
  6. 最后入队成功返回true
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
    return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
    if (count.get() > 0) {
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
    notEmpty.signal();
    }
    } finally {
    takeLock.unlock();
    }
    if (c == capacity)
    signalNotFull();
    return x;
    }

poll()方法总结:

  1. 首先检查队列是否空,若空直接返回null,不空继续执行;
  2. 获取出队锁takelock
  3. 二重检查,检查队列是否不空,不空执行出队,计数器减1,计数器更新之后还大于0(出队后队列还不空),唤醒一个出队线程(如果有阻塞的出队线程)
  4. 释放锁,然后判断此次出队前队列是否满的,若出队前满则此次出队结束就有余位了,唤醒一个阻塞入队线程执行

3. add(e) & remove()

这套方法也是在 Queue 中定义,add方法继承自Collection接口,内部调用了offer(e) & pool(),对队空或队满返回的特殊值做异常处理,队满执行入队操作抛 IllegalStateException 异常;队空做出队操作抛 NoSuchElementException 异常 。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
以下代码摘自: java.util.AbstractQueue
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

4. put(e) & take()

这是阻塞接口,定义在 BlockingQueue 接口中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//以下代码摘自: java.util.concurrent.LinkedBlockingQueue
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 除非设置,否则保持计数器的值为-1表示失败
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//这里使用while进行判断,是因为await的线程被唤醒时从await返回,需要再进行一次判断
//如果使用if的话就直接往下运行了,运行结果会不稳定。
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
//返回旧的计数然后计数+1
c = count.getAndIncrement();
//入队之后如果还有位置,给notFull队列发信号,唤醒put线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//这个c是入队之前的计数,入队之前为空,入队后有元素了,所以要唤醒一个出队线程
if (c == 0)
signalNotEmpty();
}

put(e)方法总结:

  1. 检查参数为空抛NPE异常
  2. 使用参数构造新节点,获取入队锁putLock
  3. 当队满时,调用 notFull.await() 阻塞当前线程,注意此处使用while语句进行判断,原因后文分析。
  4. 队不满执行入队,计数器 +1
  5. 判断计数器更新后队是否未满,未满则唤醒阻塞的入队线程(如果存在的话)
  6. 解锁
  7. 判断此次入队前是否为空队列,如果是,此次入队完成后就不是了,所以要唤醒一个阻塞的出队线程。
  8. 无返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//1
try {
while (count.get() == 0) {//2
notEmpty.await();
}
x = dequeue();//3
c = count.getAndDecrement();
if (c > 1)//4
notEmpty.signal();
} finally {
takeLock.unlock();//5
}
if (c == capacity)//6
signalNotFull();
return x;//7
}

take()方法总结:

  1. 获取出队锁takeLock
  2. 判断队列是否为空,为空就调用notEmpty.await()阻塞线程
  3. 不空就执行出队操作,计数器 -1
  4. 如果出队后队列仍然不空,唤醒一个阻塞的出队线程(如果存在的话)
  5. 解锁
  6. 若此次出队之前队列满,执行完本次出队就不满了,可以唤醒一个入队线程
  7. 返回出队的元素

5. offer(e,time,unit) & poll(time, unit)

超时阻塞方法,定义在 BlockingQueue 接口中,该组方法在put/take的基础上加上了超时返回的功能,出队超时返回null,入队超时返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
以下代码摘自: java.util.concurrent.LinkedBlockingQueue
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);//1
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)//超时了
return false;
//没超时阻塞,nanos之后自动唤醒
nanos = notFull.awaitNanos(nanos);
//唤醒后返回到这里,继续while循环判断队列是否满,还是满就妥妥的超时了
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)//超时了
return null;
//没超时阻塞,nanos之后自动唤醒
nanos = notEmpty.awaitNanos(nanos);
//唤醒后返回到这,继续为了循环判断队列是否为空,还是为空妥妥的超时
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

6. await之前的判断为什么用while

用put作为例子解释一下

1
2
3
4
5
6
7
8
9
10
11
12
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();//1
}
enqueue(node);
c = count.getAndIncrement(); //2
if (c + 1 < capacity)
notFull.signal();//3
} finally {
putLock.unlock();//4
}

  1. 假设A线程入队操作结束后(执行到2位置),队列还剩一个空位,那么程序会唤醒阻塞队列中的put线程(3位置)B线程
  2. B线程从await返回后需要竞争put锁(从等待池进入锁池),但这时候有个C线程也来竞争put锁并且成功,C执行入队之后队列已经满了
  3. C释放锁之后B获得锁,从await返回(位置1),如果这里使用 if 判断,1位置之后继续向下执行入队操作,显然会出错,因为最后一个空位让C线程用掉了
  4. 但是使用 while 判断,await返回之后,还在循环体内,继续循环判断队列是否满,发现满了,再次await。

所以使用while判断其实是在这里进行了一次 double check, 不管是使用await还是wait,都需要while进行判断,不然在多线程环境中就会出错。

7. 其他方法

  • peek() 返回头结点,队列空返回null
  • element() 调用peak(),peak()返回null则抛异常 NoSuchElementException
  • remove(o) 移除指定的元素,参数接受null,若没找到该元素返回false
  • contains(o) 判断是否包含指定元素,参数为空或不包含返回false
  • remainingCapacity() 返回剩余容量
  • size() 返回现有元素数量
  • clear() 原子性的清除所有元素
  • drainTo(c) 将队列中的元素放到集合c中,返回转换的元素个数