logo头像

生而无畏,战至终章

CountDownLatch介绍

简介

CountDownLatch是一个同步工具类,在完成一组正在其他线程中执行的操作之前,它允许一个或者多个线程一直等待。

与CyclicBarrier区别:

  1. CountDownLatch允许一个或者N个线程等待其他线程完成执行;而CyclicBarrier允许N个线程相互等待。
  2. CountDownLatch计算器无法重置;而CyclicBarrier计数器允许重置后使用,因此它被称为是循环的barrier。

实现

函数列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 构造方法,构造一个指定计数初始化的CountDownLatch
CountDownLatch(int count)

// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
public void await()

// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
public boolean await(long timeout, TimeUnit unit)

// 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
public void countDown()

// 返回当前对象的count值
public long getCount() {

内部类Sync()

Sync()为CountDownLatch的一个内部类,而Sync继承AQS,针对AQS的这里就不做详细介绍了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

await()方法

该方法主要作用是使当前线程在锁存器倒数至零之前一直等待,除非线程中断,定义如下:

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

可以看到其内部使用AQS的acquireSharedInterruptibly(int arg)方法

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

其中tryAcquireShared方法在Sync()内部类中进行了重写:

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

getState()获取同步状态,其值等于计数器的值 ,若getState()返回的不是0,则tryAcquireShared返回的就是-1,调用doAcquireSharedInterruptibly(arg)方法,该方法为一个自旋方法会尝试一直去获取同步状态,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

countDown()方法

countDown() 方法递减锁存器的计数,如果计数到达零,则释放所有等待的线程

1
2
3
public void countDown() {
sync.releaseShared(1);
}

内部调用AQS的releaseShared(int arg)方法来释放共享锁同步状态

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

上面代码中的tryReleaseShared()在Sync()内部类中进行了重写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 获取锁状态
int c = getState();
// 释放锁成功
if (c == 0)
return false;
int nextc = c-1;
// //更新锁状态(计数器)
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

演示

下面用运动员跑步来举例说明,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CountDownLatchTest1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
// 3个主线程
final CountDownLatch main = new CountDownLatch(3);
// 1个子线程
final CountDownLatch sub = new CountDownLatch(1);

for (int i = 0; i < 3; i++) {
final int c = i;
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("线程:" + Thread.currentThread().getName() + "已到达起跑线");
// 每次主线程减1,当主线程减到0时,主线程就被唤醒
main.countDown();
// 所有子线程到达之后必须等待
sub.await();
System.out.println("线程"+Thread.currentThread().getName()+"开始跑步---》到达终点!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();

// 所有子线程到达后,主线程发起跑令
try {
main.await();
Thread.sleep(1000);
System.out.println("线程" + Thread.currentThread().getName() + "准备就绪");
System.out.println("线程" + Thread.currentThread().getName() + "发起跑令");
// 子线程减到1,被唤醒继续执行
sub.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果:

1
2
3
4
5
6
7
8
线程:pool-1-thread-1已到达起跑线
线程:pool-1-thread-2已到达起跑线
线程:pool-1-thread-3已到达起跑线
线程main准备就绪
线程main发起跑令
线程pool-1-thread-1开始跑步---》到达终点!
线程pool-1-thread-2开始跑步---》到达终点!
线程pool-1-thread-3开始跑步---》到达终点!

总结

CountDownLatch内部通过共享锁实现,在创建CountDownLatch实例的时候,需要传递一个int参数,该参数为计数器初始值。
每调用一次countDown()方法,计数器减1,计数器大于0 时,await()方法会阻塞程序继续执行
当计数减至0时触发特定的事件。利用这种特性,可以让主线程等待子线程的结束

end,感谢阅读