并发编程-并发工具类

在JDK的并发包中提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore提供了并发流程控制手段,Exchanger提供了两个线程之间交换数据的手段,本文将配合应用场景介绍该如何使用这几个工具类。

1. CountDownLatch

CountDownLatch是JDK 5+里面闭锁的一个实现,他允许一个或多个线程等待其他线程完成各自的工作后再执行。

闭锁(Latch):一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。

与CountDownLatch第一次交互是主线程等待其它的线程,主线程必须在启动其它线程后立即调用await方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他的N个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务,这种机制就是通过调用countDown()方法来完成的。每调用一次这个方法,在构造函数中初始化的count值就减1,所以当N个线程都调用了这个方法count的值等于0,然后主线程就能通过await方法,恢复自己的任务。

与Join的区别:调用join方法需要等待thread执行完毕才能继续向下执行,而CountDownLatch只需要检查计数器的值为零就可以继续向下执行,相比之下,CountDownLatch更加灵活一些,可以实现一些更加复杂的业务场景。

1.1 使用场景

  1. 开启多个线程分块下载一个大文件,每个线程只下载固定的一截,最后由另外一个线程来拼接所有的分段。
  2. 应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
  3. 确保一个计算不会执行,直到所需要的资源被初始化。

1.2 主要方法

1
2
3
4
5
6
7
8
9
10
//初始化计数的次数,不能重置
public CountDownLatch(int count);
//调用此方法则计数减1
public void countDown();
//得到当前的计数
Public Long getCount();
//调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断。
public void await() throws InterruptedException
//调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断或者计数器超时,返回false代表计数器超时。
Public boolean await(long timeout, TimeUnit unit)

1.3 使用案例

  1. latch.countDown(); 建议放到finally语句里。
  2. 对这个计数器的操作都是原子操作,同时只能有一个线程去操作这个计数器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CountDownLatchTest {
private final CountDownLatch latch = new CountDownLatch(3);
private final ReentrantLock lock = new ReentrantLock();
private int count;

public int getCount(){
return this.count;
}

public class RunnableTask implements Runnable{
@Override
public void run() {
try {
lock.lock();
count += 100;
}finally {
latch.countDown();
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException{
CountDownLatchTest demo = new CountDownLatchTest();
int i = 3;
while(i-- > 0){
new Thread(demo.new RunnableTask()).start();
}
demo.latch.await();
System.out.println(demo.getCount());
}
}

三个线程分别对count加100,等三个线程执行完后,主线程输出count的值。输出300

2. CyclicBarrier

字面意思是可以循环使用的屏障。他要做的事情是让一组线程到达一个同步点时被阻塞,直到最后一个线程到达同步点,才会打开屏障,所有线程继续运行。

默认的构造方法 CyclicBarrier(int parties) ,参数代表屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达屏障,然后被阻塞。

1.1 使用场景

可用于多线程计算数据,最后合并计算结果

1.2 主要方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//初始化
public CyclicBarrier(int parties)
//barrierAction表示被拦住的线程需要执行的任务
public CyclicBarrier(int parties, Runnable barrierAction)
//被拦住的线程调用次函数进入阻塞状态
public int await()
//被拦住的线程调用次函数进入阻塞状态,超时唤醒
public int await(long timeout, TimeUnit unit)
public void reset()
//返回需要被拦住的线程数量
public int getParties()
//查询此屏障是否处于断开状态
public boolean isBroken()
//返回已被拦住的线程数量
public int getNumberWaiting()

1.3 使用案例

初始化线程数为2,加上主线程调用await()3次,所以得出结论主线程调用不计入await次数之内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CyclicBarrierTest {
private static CyclicBarrier cb = new CyclicBarrier(2);
private static ReentrantLock lock = new ReentrantLock();
private static int count;
public static class RunnableTask implements Runnable{
@Override
public void run() {
try {
lock.lock();
count += 100;
cb.await();
}catch (Throwable e){

}
finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws Exception{
for(int i = 0; i < 2; i++) {
new Thread(new RunnableTask()).start();
}
cb.await();
System.out.println(count);
}
}

输出200

1.4 与CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
  • CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

3. Semaphore

Semaphore是用来控制同事访问特定资源的线程数量,它通过协调各个线程以保证合理的使用公共资源。

3.1 使用场景

可用于做流量控制,特别是公用资源有限的场景,比如数据库连接。

4. Exchanger

Exchanger类可用于两个线程之间交换信息。可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。

Exchanger类仅可用作两个线程的信息交换,当超过两个线程调用同一个exchanger对象时,得到的结果是不确定的,exchanger对象仅关心其包含的两个“格子”是否已被填充数据,当两个格子都填充数据完成时,该对象就认为线程之间已经配对成功,然后开始执行数据交换操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ExchangerTest {
private static Exchanger<String> exgr = new Exchanger<>();
private static ExecutorService threadpool = Executors.newFixedThreadPool(3);

public static void main(String[] args){
threadpool.execute(() -> {
String a = "银行流水A";
try {
exgr.exchange(a);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadpool.execute(() -> {
String b = "银行流水B";
try {
String a = exgr.exchange(b);
System.out.println(a);
} catch (InterruptedException e){
e.printStackTrace();
}
});
threadpool.shutdown();
}
}