CountDownLatch介绍

starlin 719 2019-08-11

简介

CountDownLatch是一个同步工具类,在完成一组正在其他线程中执行的操作之前,它允许一个或者多个线程一直等待。

与CyclicBarrier区别:

  1. CountDownLatch允许一个或者N个线程等待其他线程完成执行;而CyclicBarrier允许N个线程相互等待。
  2. CountDownLatch计算器无法重置;而CyclicBarrier计数器允许重置后使用,因此它被称为是循环的barrier。

实现

函数列表

// 构造方法,构造一个指定计数初始化的CountDownLatch
CountDownLatch(int count)

// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
public void await()

// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
public boolean await(long timeout, TimeUnit unit)

// 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
public void countDown()

// 返回当前对象的count值
public long getCount() {

内部类Sync()

Sync()为CountDownLatch的一个内部类,而Sync继承AQS,针对AQS的这里就不做详细介绍了

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

await()方法

该方法主要作用是使当前线程在锁存器倒数至零之前一直等待,除非线程中断,定义如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

可以看到其内部使用AQS的acquireSharedInterruptibly(int arg)方法

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

其中tryAcquireShared方法在Sync()内部类中进行了重写:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

getState()获取同步状态,其值等于计数器的值 ,若getState()返回的不是0,则tryAcquireShared返回的就是-1,调用doAcquireSharedInterruptibly(arg)方法,该方法为一个自旋方法会尝试一直去获取同步状态,源码如下:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown()方法

countDown() 方法递减锁存器的计数,如果计数到达零,则释放所有等待的线程

public void countDown() {
    sync.releaseShared(1);
}

内部调用AQS的releaseShared(int arg)方法来释放共享锁同步状态

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

上面代码中的tryReleaseShared()在Sync()内部类中进行了重写

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        // 获取锁状态
        int c = getState();
        // 释放锁成功
        if (c == 0)
            return false;
        int nextc = c-1;
        // //更新锁状态(计数器)
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

演示

下面用运动员跑步来举例说明,代码如下:

public class CountDownLatchTest1 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 3个主线程
        final CountDownLatch main = new CountDownLatch(3);
        // 1个子线程
        final CountDownLatch sub = new CountDownLatch(1);

        for (int i = 0; i < 3; i++) {
            final int c = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println("线程:" + Thread.currentThread().getName() + "已到达起跑线");
                        // 每次主线程减1,当主线程减到0时,主线程就被唤醒
                        main.countDown();
                        // 所有子线程到达之后必须等待
                        sub.await();
                        System.out.println("线程"+Thread.currentThread().getName()+"开始跑步---》到达终点!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();

        // 所有子线程到达后,主线程发起跑令
        try {
            main.await();
            Thread.sleep(1000);
            System.out.println("线程" + Thread.currentThread().getName() + "准备就绪");
            System.out.println("线程" + Thread.currentThread().getName() + "发起跑令");
            // 子线程减到1,被唤醒继续执行
            sub.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

线程:pool-1-thread-1已到达起跑线
线程:pool-1-thread-2已到达起跑线
线程:pool-1-thread-3已到达起跑线
线程main准备就绪
线程main发起跑令
线程pool-1-thread-1开始跑步---》到达终点!
线程pool-1-thread-2开始跑步---》到达终点!
线程pool-1-thread-3开始跑步---》到达终点!

总结

CountDownLatch内部通过共享锁实现,在创建CountDownLatch实例的时候,需要传递一个int参数,该参数为计数器初始值。
每调用一次countDown()方法,计数器减1,计数器大于0 时,await()方法会阻塞程序继续执行
当计数减至0时触发特定的事件。利用这种特性,可以让主线程等待子线程的结束

end,感谢阅读


# java并发