这篇文章将会详细介绍CountDownLatch这个并发类,通过深入底层源代码讲解其具体实现。
/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 */上面是CountDownLatch这个API的前两行注释,一句话已经说明了这个类的作用。
一种同步辅助工具,允许一个或多个线程等待其他线程正在执行的一组操作完成。
CountDownLatch是一个计数器,通过的构造方法指定初始计数器的值,调用CountDownLatch的countDown()方法让计数器的值减少1,当计数器的值变成0时,调用它的await()方法阻塞的当前线程会被唤醒,继续执行剩余的代码。
这个类的结构很简单,就不画类图了,直接贴出代码
package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    private final Sync sync;
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public long getCount() {
        return sync.getCount();
    }
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}CountDownLatch是通过AQS的实现类Sync来实现这个计数器功能的,通过AQS的state属性保存计数器的值。
因为这里的计数器值state是共享的,所以重写了AQS的tryAcquireShared()和tryReleaseShared()方法。
为什么要重写这两个方法呢?
因为AQS里的几个重要的方法aquire()和release()会根据当前是独占模式还是共享模式去调用对应的tryAcquire()/tryAcquireShared()、tryRelease()/tryReleaseShared()方法。
接下来,关注CountDownLatch的两个重要的方法:
countDown()
让计数器的值减1
    public void countDown() {
        sync.releaseShared(1);
    }在这个方法里调用了releaseShared()方法,因为静态内部类Sync没有重写这个方法,所以调用的是超类AbstractQueuedSynchronizer的方法。
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }这个方法里先调用tryReleaseShared()方法,因为Sync重写了这个方法,所以调用的是Sync的方法。
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                // 获取state值,如果是0,直接返回false
                int c = getState();
                if (c == 0) {
                    return false;
                }
                // 通过Unsafe工具类的CAS方法尝试修改state的值为state - 1
                int nextc = c - 1;
                if (compareAndSetState(c, nextc)) {
                    // 修改完成之后,根据修改之前的state值是否为1返回true或false
                    return nextc == 0;
                }
            }
        }await()
阻塞当前线程,直到state值变成0才唤醒。
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }在方法内部调用了AQS的acquireSharedInterruptibly()方法,Interruptibly后缀的方法都可以被中断。
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }在这个方法中,先调用Thread类的静态方法interrupted()判断当前线程是否被中断,如果当前线程中断状态为true,则清除线程的中断标记并返回true。否则返回false。如果当前线程是被中断的状态,则抛出中断异常返回。关于interrupted()方法的详细介绍,请参考文章
interrupt()、interrupted()和isInterruptd()的区别 https://blog.csdn.net/heyl163_/article/details/132138422如果当前线程是正常的状态,调用tryAcquireShared()方法,因为Sync重写了这个方法,所以调用的是Sync的tryAcquireShared()方法。这个方法就是判断当前state属性值是否为0,如果不是0就返回-1,否则返回1
https://blog.csdn.net/heyl163_/article/details/132138422如果当前线程是正常的状态,调用tryAcquireShared()方法,因为Sync重写了这个方法,所以调用的是Sync的tryAcquireShared()方法。这个方法就是判断当前state属性值是否为0,如果不是0就返回-1,否则返回1
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }继续回到上面的代码,当state不为0时,会执行doAcquireSharedInterruptibly()方法,注意,这里的一个死循环for会让当前线程一直阻塞在这里,直到tryAcquireShared()获取到的返回值大于0,也就是1时才退出循环,而根据上面的方法,此时state值为0。
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }总结:这里的等待是通过无条件的for循环让当前线程一直等待,直到state的值变为0才退出循环返回。所以,在这里可以替代线程的join()方法使用,这也是CountDownLatch的主要用途。
好了,文章就分享到这里了,看完不要忘了点赞+收藏哦~



















