文章目录
- 1、CountDownLatch介绍
 - 1.1、功能介绍
 - 1.2、demo
 - 1.3、问题
 
- 2、前置知识
 - 2.1、AQS整体结构
 - 2.1.1、整体结构
 - 2.1.2、state属性
 - 2.1.3、head和tail属性
 
- 3、CountDownLatchAPI源码解析
 - 3.1、countDown方法
 - 3.1.1、Sync类
 - 3.1.2、releaseShared方法
 - 3.1.3、tryReleaseShared方法
 
- 3.2、await方法
 - 3.2.1、tryAcquireShared方法
 - 3.2.2、doAcquireSharedInterruptibly方法
 - 3.2.2.1、addWaiter方法。
 - 3.2.2.2、setHeadAndPropagate方法
 - 3.2.2.3、shouldParkAfterFailedAcquire方法
 - 3.2.2.4、parkAndCheckInterrupt方法
 - 3.2.2.5、cancelAcquire方法
 
- 3.3、doReleaseShared方法
 
- 4、总结
 - 5、参考资料
 
1、CountDownLatch介绍
1.1、功能介绍
CountDownLatch,是一个同步器,常见的使用场景是:
 我们在主线程中需要执行一些异步任务,但是在全部的异步任务都执行完成前,主线程需要阻塞等待,等待异步任务全部执行完成后,主线程才能响应客户端。
1.2、demo
基于以上场景,搞了一个demo。
 下面的这个demo。总体上分为了三个小步骤,代表了CountDownLatch的三个核心API
 1)、初始化一个CountDownLatch。构造器中传入了10,代表这个CountDownLatch需要执行10次countDown方法。
 2)、然后在一个循环中提交异步任务,总共提交10个异步任务,每个异步任务执行一次countDown方法。
 3)、循环执行完成后,执行CountDownLatch.await方法
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            5,
            10,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100),
            new ThreadPoolExecutor.CallerRunsPolicy());
    @Test
    public void testDel(){
    	//初始化一个CountDownLatch
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
        	//给线程池提交异步任务,总共提交10个
            threadPoolExecutor.execute(() -> {
                try {
                    System.out.println("执行一个异步任务");
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    //执行countDownLatch.countDown(),该方法一定要写在finally中,确保无论什么情况,都能执行到
                    countDownLatch.countDown();
                }
            });
        }
        try {
            System.out.println("主线程等待");
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
1.3、问题
使用起来很简单,也就是三步。我有3个问题
 1)、countDown方法执行后,在做什么操作?
 2)、await方法执行后,countDown方法全部执行完成前,流程阻塞在了await处。CountDownLatch怎么实现的阻塞呢?
 3)、countDown方法全部执行完成后,await方法被唤醒,继续执行下面的业务逻辑,怎么唤醒的呢?
 这里多说一句。看源码,最好能带着问题看,这样看起来,不会太枯燥,因为你要搞明白一些问题。
 说回文章。我们下面就围绕我的这3个问题来展开。如果你还有其他问题,看懂我的3个问题后,可以自己再研究一下源码
2、前置知识
CountDownLatch是借助AQS实现的,大家应该都听说过AQS。其实不光CountDownLatch,像我们常使用的ReenTrantLock、Semaphore等同步器类也都是借助AQS实现的。所以在介绍CountDownLatch前,需要先介绍AQS。
 AQS全称是:AbstractQueuedSynchronizer
 先从类名上剖析一下这个类。
 Abstract:抽象的。说明内部有一些方法需要子类实现
 Queued:队列。内部实现依赖了队列
 Synchronizer:同步器。这个类是用来设计同步器的
2.1、AQS整体结构
2.1.1、整体结构
AQS抽象类的属性及主要方法,如下图(原图链接)
 
 带颜色的都是方法,不带颜色的都是属性。
 从上到下,难度由浅入深。像CountDownLatch相关的API就是和AQS的API层进行交互。从图上也能看出来,AQS内容很多,为了避免篇幅过大,我们只介绍和CountDownLatch有关系的。其他方法或属性,感兴趣可以自己研究下
AQS中和CountDownLatch有关系的属性如下:
 private volatile int state;
 private transient volatile Node head;
 private transient volatile Node tail;
2.1.2、state属性
state属性是AQS中的许可或者说资源,不管是CountDownLatch,还是ReentrantLock或者Semaphore,本质上都是在操作这个state属性。我们以CountDownLatch举例。
 初始化时,new CountDownLatch(10),就是将state的值设置为了10。看一下源码
//CountDownLatch构造器
public CountDownLatch(int count) {
    if (count < 0){
    	throw new IllegalArgumentException("count < 0");
    }
    this.sync = new Sync(count);
}
//Sync构造器
Sync(int count) {
    setState(count);
}
//setState方法
protected final void setState(int newState) {
    state = newState;
}
 
countDown方法,最终会对state值减一
 await方法,监控state值的变化,当变成0时,阻塞结束
 这两个方法的源码,下面会详细的解析,这里就不细说了。
2.1.3、head和tail属性
看命名,就知道这两个属性是链表上的头节点和尾节点,初始值为null。数据类型是Node。说明,AQS内部存在一条Node链表
 看一下Node的属性
static final class Node {
    ......
    //共享模式的Node
    static final Node SHARED = new Node();
    
    //独占模式的Node
    static final Node EXCLUSIVE = null;
    //节点的等待状态,初始值是0
    volatile int waitStatus;
    //前置节点
    volatile Node prev;
    //后置节点
    volatile Node next;
    //节点代表的线程。每一个需要阻塞的线程,都会被封装成一个Node
    volatile Thread thread;
    ......
}
 
那Node链表又长什么样子呢?
 
 就长这个样子
 其中的waitStatus值很重要,关于Node链表,AQS基本就是围绕Node节点的waitStatus在做文章。除了初始化值0之外,还有另外4个取值
//节点处于取消状态。这个值是大于0的,另外的3个值都是小于0的。记一下这个点,源码中会经常遇到
static final int CANCELLED =  1;
//节点处于被唤醒状态。
static final int SIGNAL = -1;
//节点处于条件状态,这个CONDITION一般在CONDITION队列中使用。关于CountDownLatch,我们主要是讨论Sync队列
static final int CONDITION = -2;
//对于共享模式的Node来说,这个状态代表了传播
static final int PROPAGATE = -3;
 
AQS重要属性说完了。下面开始介绍CountDownLatch的重要API源码实现
3、CountDownLatchAPI源码解析
3.1、countDown方法
public void countDown() {
    sync.releaseShared(1);
}
 
这里调用了Sync类的releaseShard方法,接下来,看一下Sync类的实现。
3.1.1、Sync类
Sync类定义如下:
 可以看到Sync类继承了AQS,所以Sync类是连接CountDownLatch和AQS的桥梁。
private static final class Sync extends AbstractQueuedSynchronizer {
    ......
    //这个构造方法,我们在上面已经见过了,new CountDownLatch(10),设置CountDownLatch的许可数量时,就会使用到这个构造器。
    Sync(int count) {
        setState(count);
    }
    //获取state
    int getCount() {
        return getState();
    }
    //判断AQS中state的值状态。这是重写的AQS的方法
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    ......
}
 
3.1.2、releaseShared方法
在Sync类中,我们并没有看到releaseShared方法,说明releaseShared是父类AQS的方法。
 我们进入AQS看一下releaseShard方法的逻辑
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
 
先执行了tryReleaseShared方法,如果返回true,再执行doReleaseShared方法。先看tryReleaseShared方法
3.1.3、tryReleaseShared方法
protected boolean tryReleaseShared(int arg) {
     throw new UnsupportedOperationException();
}
 
原方法是抛出异常,说明这个方法需要子类重新实现,这就又回到了CountDownLatch类的Sync类中,Sync类中实现了tryReleaseShared方法,从方法名中能看出,这是共享模式释放。啥是共享模式呢?这里简单提一句,共享模式的对立面是独占模式。类似ReentrantLock,加锁操作就是独占模式,同一时刻,只能有一个线程加锁成功。共享模式,就是多个线程可以同时执行,对于CountDownLatch来说,多个线程可以同时执行countDown操作,所以countDown操作是共享模式。内部相应的方法命名时就会添加shared标识。比如此处的tryReleaseShared方法
 看一下tryReleaseShared的实现
protected boolean tryReleaseShared(int releases) {
    	//开启死循环
        for (;;) {
        	//获取state数量
            int c = getState();
            if (c == 0)
            	//返回失败
                return false;
            //如果state数量不是0,state减一
            int nextc = c-1;
            //CAS设置新的state值
            if (compareAndSetState(c, nextc))
            	//如果state执行减一操作后值为0,说明异步线程全部执行countDown完成,此时可以唤醒主线程
                return nextc == 0;
        }
    }
 
这个方法最终就是对state执行减一操作。
 tryReleaseShared执行完成,如果返回结果是true,会回到releaseShared方法中,然后就会执行doReleaseShared方法。
 这里停一下,我们先不看doReleaseShared方法实现。先回到我们的CountDownLatch使用场景上。我们知道CountDownLatch.countDown一般是子线程执行,子线程全部执行countDown完成前,主线程是阻塞在countDownLatch.await处的,等待CountDownLatch.countDown全部执行完成,然后才会继续执行。
 所以在执行doReleaseShared方法前,一般会先执行countDownLatch.await方法。所以,我们先看countDownLatch.await的实现逻辑,然后再看doReleaseShared方法逻辑。
3.2、await方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
 
await方法抛出了InterruptedException,说明这个方法是能响应线程中断的。在await方法内部执行了Sync类的acquireSharedInterruptibly方法
 这个方法,Sync类中并没有,所以是父类AQS中的。我们到AQS中看一下这个方法的逻辑
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //检查当前线程是否被中断。如果被中断,抛出中断异常
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
 
先是判断了线程是否被中断,如果被中断,会直接抛出中断异常
 如果线程没有被中断,然后会执行tryAcquireShared。我们看一下tryAcquireShared的方法实现
3.2.1、tryAcquireShared方法
tryAcquireShared实现如下:
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
 
可以看到,AQS中的tryAcquireShared方法逻辑是直接抛出了UnsupportedOperationException异常(不支持的操作),说明该方法是需要子类重写的。我们回到CountDownLatch的Sync类中看下Sync类重写后的方法实现
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
 
可以看到,就是判断state的值。如果getState == 0的话,就返回1,此时代表CountDownLatch.countDown已经全部执行完成。如果getState != 0的话,就返回-1,代表CountDownLatch.countDown未全部执行完成。
 tryAcquireShared,如果返回-1的话,接着就会执行doAcquireSharedInterruptibly方法
3.2.2、doAcquireSharedInterruptibly方法
这是CountDownLatch实现await方法的关键逻辑
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //将当前线程以共享模式加入Node链表中
        final Node node = addWaiter(Node.SHARED);
        //获取锁的结果,初始化值为true,代表获取锁失败
        boolean failed = true;
        try {
        	//开启一个死循环,这就实现了CountDownLatch.await阻塞的效果。
            for (;;) {
            	//获取当前节点的前置节点
                final Node p = node.predecessor();
                //如果前置节点是头节点,再次执行tryAcquireShared方法判断state的值
                if (p == head) {
                	//获取state的状态
                    int r = tryAcquireShared(arg);
                    //如果返回值r >= 0代表state是0,此时说明CountDownLatch.countDown已经执行完成,此时会结束死循环,主线程继续往下执行
                    if (r >= 0) {
                    	//将当前节点设置为头节点并且进行传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //1、shouldParkAfterFailedAcquire判断是否需要执行线程阻塞。
                //2、parkAndCheckInterrupt是阻塞当前线程,避免一直获取不到锁,浪费cpu
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
            	//获取锁失败的话,执行cancelAcquire方法
                cancelAcquire(node);
        }
    }
 
doAcquireSharedInterruptibly方法中有很多逻辑,这个方法的主要作用是:
 开启死循环,线程不断地获取锁。如果线程满足阻塞条件的话,就对该线程执行阻塞操作,然后休眠等待被唤醒,避免浪费cpu。被唤醒后,继续获取锁。获取锁成功,就退出死循环。
 下面挨个分析其中的方法
3.2.2.1、addWaiter方法。
看这个方法名,也能知道这是添加一个等待者,也就是我们执行了CountDownLatch.await方法的主线程
private Node addWaiter(Node mode) {
		//创建一个Node,Node的线程是当前线程,节点模式是共享模式
        Node node = new Node(Thread.currentThread(), mode);
        
        //获取Node链表的尾指针
        Node pred = tail;
        //如果尾指针不为空,说明Node链表已经被初始化。将当前Node节点入队
        if (pred != null) {
        	//获取node节点的前向指针
            node.prev = pred;
            //将当前节点设置为尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果尾指针为空,说明Node链表未被初始化,需要先初始化Node链表,再将当前Node节点入队
        enq(node);
        return node;
    }
//enq方法逻辑
private Node enq(final Node node) {
		//开启死循环
        for (;;) {
        	//获取Node链表尾节点
            Node t = tail;
            //如果尾节点为空,说明Node链表未被初始化,需要初始化
            if (t == null) {
            	//初始化一个Node节点,利用CAS将其设置为头节点
                if (compareAndSetHead(new Node()))
                	//tail节点指向head节点
                    tail = head;
            } else {
            	//当前Node节点前向指针指向尾部节点
                node.prev = t;
                //将当前节点设置为尾节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
 
这里总结一下,addWaiter方法就是将执行CountDownLatch.await的主线程加入到等待链表中
3.2.2.2、setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        //将当前节点设置为头节点
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            //取当前节点的下一个节点,对于我们这个场景来说,只有一个节点,没有下一个节点,所以下一个节点是null
            Node s = node.next;
            //如果下一个节点是null,执行doReleaseShared方法
            if (s == null || s.isShared())
                doReleaseShared();
}
 
doReleaseShared方法的逻辑如下
private void doReleaseShared() {
		//开启一个死循环
        for (;;) {
        	//取头节点。头节点就是我们的节点
            Node h = head;
            //头结点不为空 && 头结点不等于尾节点。这说明Node链表中间有其他元素
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
 
在CountDownLatch的流程中,只有一个主线程,并且在setHeadAndPropagate方法中,先执行了setHead(node)方法,将当前节点设置为头节点,所以此时的头节点和尾节点是相同的。所以h != null && h != tail结果为假,doReleaseShared方法会直接返回
执行完setHeadAndPropagate方法,下面就该执行shouldParkAfterFailedAcquire和parkAndCheckInterrupt了。这两个方法,前一个是判断能否执行阻塞,后一个是执行线程阻塞操作的。
 我们看一下shouldParkAfterFailedAcquire的实现逻辑。
3.2.2.3、shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前置节点状态是SIGNAL,此时返回true,说明需要阻塞当前节点
        if (ws == Node.SIGNAL)
            return true;
        //waitStatus > 0,说明当前节点的前置节点是取消状态
        if (ws > 0) {
            
            do {
            	//向前遍历,删除已取消节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //找到不是取消节点的节点,将当前节点放入该节点后
            pred.next = node;
        } else {
            //将当前节点的waitStatus置为SIGNAL状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
 
3.2.2.4、parkAndCheckInterrupt方法
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
 
逻辑里,调用了LockSupport.park方法,阻塞当前线程,然后返回了线程的中断状态
 为什么要返回中断状态呢?
 因为LockSupport.park方法是能响应线程中断的,但是响应完中断后,并不会抛出异常,那这个时候就会有一个问题。我们怎么知道park方法是怎么被唤醒的呢?
 是CountDownLatch.countDown执行完成后,正常被唤醒的。
 还是执行CountDownLatch.await方法的线程执行了中断唤醒的。
 区分不出来吧?所以此处要执行Thread.interrupted(),如果线程执行了中断,此时会Thread.interrupted()会返回true。以此就可以判断LockSupport.park的唤醒原因
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
   throw new InterruptedException();
 
如果线程发生了中断,此时就会抛出中断异常
3.2.2.5、cancelAcquire方法
抛出中断异常后,doAcquireSharedInterruptibly方法中的finally代码块就会开始执行。
 如果failed值为true的话,此时会执行cancelAcquire方法
if (failed)
   //获取锁失败的话,执行cancelAcquire方法
   cancelAcquire(node);
 
从方法名上看,cancelAcquire方法是取消获取的意思。意思就是取消获取锁,说明节点对应的thread不再竞争锁。在这个方法中,会将Node节点的waitStatus值置为CANCELLED。这个方法和我们要研究的CountDownLatch流程没有什么关系,就不看了
 以上,就是await方法的流程。
3.3、doReleaseShared方法
我们在介绍countDown方法的时候说过,有一个方法等介绍完await方法再进行介绍,还有印象吧?就是在介绍countDown方法逻辑时,tryReleaseShared方法执行后,应该执行doReleaseShared方法。再看一下countDown流程中的这段源码
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
 
tryReleaseShared方法执行成功后,会执行doReleaseShared方法
 看一下doReleaseShared方法的实现
private void doReleaseShared() {
        //开启一个死循环
        for (;;) {
        	//获取头节点
            Node h = head;
            //头结点不等于空 && 头节点不等于尾节点
            if (h != null && h != tail) {
            	//获取头结点的waitStatus值
                int ws = h.waitStatus;
                //如果头节点状态是SIGNAL
                if (ws == Node.SIGNAL) {
                	//将头结点状态改成0,初始状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    //唤醒后面节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }
 
这个方法,对于CountDownLatch的流程来说,其实可以不用关注。为什么这样说呢?
 对于CountDownLatch来说,Node链表上其实只有一个节点。并且在执行
 setHeadAndPropagate方法时,将当前主线程代表的Node设置为了头节点,所以头尾是相同的。在执行h != null && h != tail判断条件时,就不满足判断条件了,所以在执行h == head判断条件时,结果就会为true,方法就直接返回了
4、总结
以上就是CountDownLatch核心API的源码解析。
 我画了一张图,我们从整体上看一下CountDownLatch的操作流程图
 
 CountDownLatch本身的API不多,也都很简单。但是因为CountDownLatch是借助AQS框架实现的,所以会涉及到一些AQS的逻辑,稍微复杂一点。
5、参考资料
1)、https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html。这是美团技术团队写的一篇文章,我最开始看不懂AQS源码的时候,搜到了这篇文章,连续读了3遍,然后对AQS的整体技术脉络有了一个大体的认识,后面再看CountDownLatch相关的代码就清楚多了。推荐大家也读一下。



















