LinkedBlockingQueue源码
LinkedBlockingQueue介绍
【1】LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在自定义线程池使用到LinkedBlockingQueue队列时会根据业务需求定义合适的队列容量值capacity。
【2】LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,分别是写锁putLock和读锁takeLock。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
LinkedBlockingQueue类的一些关键特性和方法
- 无界队列:如果没有指定容量,队列的容量默认为 Integer.MAX_VALUE。
- 线程安全:通过内部锁机制确保了线程安全。
- FIFO 顺序:元素按照先进先出的方式进行排列。
- 条件对象:使用 ReentrantLock 和 Condition 对象来控制队列的入队和出队操作。
- 动态创建节点:每次插入操作时,除非这会使队列超出其容量,否则都会动态创建新的链表节点。
- 序列化:该类是可序列化的,这意味着它可以被写入到一个输出流中,并从输入流中恢复。
- 迭代器:提供了弱一致性迭代器,用于按顺序访问队列中的元素。
- Spliterator:提供了一个 Spliterator,支持在并行操作中使用 Stream API。
LinkedBlockingQueue使用
//创建有界队列,指定队列的大小为100
BlockingQueue<Runnable> boundedQueue = new LinkedBlockingQueue<>(100);
//无界队列
BlockingQueue<Runnable> unboundedQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue源码介绍
1.属性值
// 表示队列的容量上限,如果不指定容量值,则为Integer.MAX_VALUE,无限大
private final int capacity;
// 当前队列的元素数量 使用AtomicInteger来实现线程安全的计数器
private final AtomicInteger count = new AtomicInteger();
// 链表的头节点,其item属性总是null,用于标识队列的开始
transient Node<E> head;
// 链表的尾节点,总是指向最后一个节点,其next属性为null
private transient Node<E> last;
// 读锁 控制从队列中取出元素操作
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty:takeLock的条件对象
// 当拿到takeLock锁并且notEmpty条件对象条件成立也就是队列至少有一条可取数据的时候,才会从队列取出一个元素。
// 拿到takeLock锁,但是队列无元素时,线程会调用notEmpty条件对象的wait方法,这将导致线程释放锁并进入WAITING状态,等待其他线程的唤醒。(注意,offer()方法是非阻塞的,从队列中获取不到数据会返回false)
private final Condition notEmpty = takeLock.newCondition();
// 写锁 控制往队列中添加元素操作
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件
// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
// 当拿到putLock锁并且notFull条件对象条件成立也就是队列未满(通过AutomicInteger记录的队列状态来判断当前队列元素数量是否大于队列容量阈值,小于则表示未满),才会往队列中添加元素。
private final Condition notFull = putLock.newCondition();
// 典型的单链表结构
// Node类是一个泛型类,可以存储任何类型的元素(由类型参数E指定)。
static class Node<E> {
    E item;  // 存储节点中的数据项
    Node<E> next;  // 单链表结构 指向链表中的下一个节点,有三种可能的值:
    			    // 1.真实的后继节点。
				   // 2.this Node,这通常意味着后继是head.next,这可能是在初始化时或者在某些特殊情况下使用。
				   // 3.null,表示没有后继节点,即当前节点是链表的最后一个节点。
    Node(E x) { item = x; } // 有参构造函数
}
2.构造器
public LinkedBlockingQueue() {
    // 如果没传容量,就使用最大int值初始化其容量
    this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化head和last指针为空值节点
    last = head = new Node<E>(null);
}
// TaskQueue队列才会使用到这个有参够咱
// 这是构造器的声明,它接受一个Collection类型的参数c,该参数中的元素类型是E的子类型。
public LinkedBlockingQueue(Collection<? extends E> c) {
    // 调用LinkedBlockingQueue的另一个有参构造器,将队列的容量设置为Integer.MAX_VALUE,这意味着队列可以无限大。
    this(Integer.MAX_VALUE);
    // 创建一个ReentrantLock对象putLock,它用于控制对队列的写入操作。
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 获取putLock的锁,以确保在向队列添加元素时,其他线程不能同时修改队列。
    try {
        // 初始化一个计数器n,用于记录添加到队列中的元素数量。
        int n = 0;
        for (E e : c) {
            // 如果集合中的某个元素为null,则抛出NullPointerException。
            if (e == null)
                throw new NullPointerException();
            // 如果队列已满(即添加的元素数量达到队列容量),则抛出IllegalStateException。
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            // 将元素e封装成Node对象,并使用enqueue方法将其添加到队列中。
            enqueue(new Node<E>(e));
            // 每次成功添加一个元素后,增加计数器n。
            ++n;
        }
        // 更新队列中的元素数量。
        count.set(n);
    } finally {
        putLock.unlock();
    }
}
3.入队put方法-阻塞
    /**
     * 队列添加元素
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        // 检查传入的元素是否为 null,如果是,则抛出 NullPointerException。
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        // 声明了一个整型变量 c 并初始化为 -1,这通常用作一个标志,表示操作尚未成功完成。
        int c = -1;
        // 创建了一个新的节点 node 来存储传入的元素 e。
        Node<E> node = new Node<E>(e);
        // 获取用于控制对队列进行入队操作的 ReentrantLock。
        final ReentrantLock putLock = this.putLock;
        // 获取用于记录队列中元素数量的 
        final AtomicInteger count = this.count;
        // 获取 putLock 锁,如果线程在获取锁的过程中被中断,则会抛出 InterruptedException。
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            // 如果队列已满(当前计数等于容量),则调用 notFull 条件对象的 await 方法,使当前线程等待,直到被另一个线程唤醒。
            while (count.get() == capacity) {
                notFull.await();
            }
            // 调用 enqueue 方法将新节点添加到队列尾部。
            enqueue(node);
            // 原子地读取当前的计数并将其增一,c 变量存储了增一前的计数。
            c = count.getAndIncrement();
            // 如果队列中还可以添加更多元素(当前计数加一小于容量),则使用 notFull 条件对象的 signal 方法唤醒等待的线程。
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放 putLock 锁。
            putLock.unlock();
        }
        // 如果入队前队列为空(计数为0),调用 signalNotEmpty 方法,以通知可能等待的线程队列现在不为空。
        if (c == 0)
            signalNotEmpty();
    }
4.入队offer方法-非阻塞
    /**
      * 队列添加元素
      * 这段代码是LinkedBlockingQueue类中的offer方法的实现。offer方法用于向队列尾部添加一个元素,如果队	  * 列未满,则添加成功并返回true,否则返回false。与add方法不同,offer方法在无法添加元素时不会抛出异	  * 常,而是返回一个布尔值来告知操作结果。
      *
      * @throws NullPointerException 元素为空抛出空指针异常
      */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        // 队列当前元素计数器
        final AtomicInteger count = this.count;
        // 队列已满,则返回false
        if (count.get() == capacity)
            return false;
        // 队列当前元素计数器初始化为-1
        int c = -1;
        // Node节点封装元素,包含节点数据和指向下一节点的指针
        Node<E> node = new Node<E>(e);
        // 写锁获取
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 如果队列当前元素计数小于队列最大容量,则执行元素入队操作
            if (count.get() < capacity) {
                // 入队
                enqueue(node);
                // 这个原子操作表示,c=count(原值),count(新值)=count+1。注意:这个c的值是count+1之前的值,所以才会用  if (c + 1 < capacity)判断
                c = count.getAndIncrement();
                // 如果队列未满,唤醒等待队列不满的线程(如果有的话)来往队列添加任务。
                if (c + 1 < capacity)
                    // signal底层:notFull条件对象实现了AQS接口,通知线程的底层是AQS,唤醒的方法是						 LockSupport.unpark(node.thread);
                    // 这个通知不需要重新获取putLock锁,因为使用的是ReentrantLock->重入锁
                    notFull.signal();
            }
        } finally {
            // 释放锁
            putLock.unlock();
        }
        // 如果队列当前元素为空,说明队列已经添加了元素了,唤醒等待队列非空的线程(如果有的话)来往队列获取任务。
        // notEmpty条件对象实现了AQS接口,通知线程的底层是AQS,唤醒的方法是						 		   LockSupport.unpark(node.thread);
        // 这个通知需要获取takeLock锁,获取到锁才会通知。
        if (c == 0)
            signalNotEmpty();
        // 任务添加成功返回true
        return c >= 0;
    }
    /**
     * 队列添加元素,并且支持单位时间内添加成功
     *
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果传入的元素为null,则抛出NullPointerException。
        if (e == null) throw new NullPointerException();
        // 将超时时间转化为纳秒
        long nanos = unit.toNanos(timeout);
        // 初始化一个计数器c,用于记录添加操作前的队列元素数量。
        int c = -1;
        // 获取用于控制队列尾部添加操作的锁。
        final ReentrantLock putLock = this.putLock;
        // 获取队列中的元素计数器。
        final AtomicInteger count = this.count;
        // 注意,获取写锁,这里是支持中断锁:如果当前线程在尝试获取锁的过程中被中断,抛出				  			InterruptedException
        putLock.lockInterruptibly();
        try {
            // 如果队列当前元素数量等于队列容量最大值,则进入while循环
            while (count.get() == capacity) {
                // 如果队列已满,但是超时时间还没结束,则一直循环,直到有效时间过期,返回false
                if (nanos <= 0)
                    return false;
                // 更新剩余的超时时间,等待直到队列有空间或超时时间结束。
                // notFull是一个Condition对象,调用它的awaitNanos方法将导致当前线程释放锁并等待,直到被唤醒或超时
                nanos = notFull.awaitNanos(nanos);
            }
            // 如果队列有空间,将新节点添加到队列尾部
            enqueue(new Node<E>(e));
            // 原子性地获取当前的元素数量并将其增加1
            c = count.getAndIncrement();
            // 如果队列未满,唤醒一个等待队列不满的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放锁
            putLock.unlock();
        }
        // 如果队列当前元素为空,说明队列已经添加了元素了,唤醒等待队列非空的线程(如果有的话)来往队列获取任务。
        if (c == 0)
            signalNotEmpty();
        return true;
    }
5.出队take方法-阻塞
// 从队列中取出一个元素。这个方法会抛出 InterruptedException,如果线程在等待队列非空时被中断。
// 阻塞获取
public E take() throws InterruptedException {
    // 声明了元素变量 x,稍后将用于存储从队列中取出的元素。    
    E x;
    // 声明了一个整型变量 c,用于当前记录队列中的元素数量。
        int c = -1;
    // 获取用于记录队列中元素数量的 AtomicInteger 引用。
        final AtomicInteger count = this.count;
    // 获取用于控制对队列进行出队操作的 ReentrantLock 引用。
        final ReentrantLock takeLock = this.takeLock;
    // 调用 lockInterruptibly 方法获取锁,如果当前线程在获取锁的过程中被中断,会抛出 InterruptedException。
        takeLock.lockInterruptibly();
        try {
            // 如果队列为空(计数为0),则调用 notEmpty 条件对象的 await 方法,使当前线程等待,直到被另一个线程唤醒。
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 调用 dequeue 方法从队列头部移除并返回一个元素,存储在变量 x 中。
            x = dequeue();
            // 原子地读取当前的计数并将其减一,c 变量存储了减一前的计数。
            c = count.getAndDecrement();
            // 如果队列中还有至少一个元素(c > 1),则使用 notEmpty 条件对象的 signal 方法唤醒等待的线程。
            if (c > 1)
                notEmpty.signal();
        } finally {
            // 释放 takeLock 锁。
            takeLock.unlock();
        }
    // 如果取出元素后的计数等于队列的容量,调用 signalNotFull 方法,以通知可能等待的线程队列现在不是满的。
        if (c == capacity)
            signalNotFull();
        return x;
    }
6.出队poll方法-非阻塞
// 从队列获取元素,返回类型是E,E是队列中元素的类型,获取不到直接返回null
public E poll() {
    // 创建了一个AtomicInteger对象count,用于记录队列中的元素数量。
        final AtomicInteger count = this.count;
    // 检查队列是否为空,如果是,则返回null
        if (count.get() == 0)
            return null;
    // 声明了一个元素变量x,用于存储从队列中取出的元素。
        E x = null;
    // 声明了一个整型变量c,用于稍后记录取出元素前队列中的元素数量。
        int c = -1;
    // 声明了一个ReentrantLock对象takeLock,用于同步线程对队列的操作。
        final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock的锁,以确保线程安全。
        takeLock.lock();
        try {
            // 再次检查队列是否非空。
            if (count.get() > 0) {
                // 如果队列非空,从队列中移除并返回一个元素。
                x = dequeue();
                // 原子的读取当前的计数并将其减一,c变量存储了减一前的计数。
                c = count.getAndDecrement();
                // 如果队列中还有至少一个元素,发送一个信号通知可能等待的线程队列非空。
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
    // 如果取出元素后c的计数等于队列的容量,发送一个信号通知可能等待的线程队列现在不是满的了。
        if (c == capacity)
            signalNotFull();
    // 返回从队列中取出的元素。
        return x;
    }
   // 有效时间内获取队列元素,获取不到直接返回null
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        // 声明了元素变量 x,并初始化为 null。
        E x = null;
        // 声明了一个整型变量 c,并初始化为 -1,用作状态标志。
        int c = -1;
        // 将超时时间转换为纳秒数。
        long nanos = unit.toNanos(timeout);
        // 获取用于记录队列中元素数量的 AtomicInteger。
        final AtomicInteger count = this.count;
        // 获取用于控制对队列进行出队操作的 ReentrantLock。
        final ReentrantLock takeLock = this.takeLock;
        // 获取 takeLock 锁,如果线程在获取锁的过程中被中断,则会抛出 InterruptedException。
        takeLock.lockInterruptibly();
        try {
            // 如果队列为空,则循环等待,直到队列非空或超时。
            while (count.get() == 0) {
                // 如果超时时间已经过去,则立即返回 null。
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 如果队列非空,则调用 dequeue 方法移除并返回队列头部的元素。
            x = dequeue();
            // 原子地读取当前的计数并将其减一,c 变量存储了减一前的计数。
            c = count.getAndDecrement();
            // 如果队列中还有至少一个元素,则使用 notEmpty 条件对象的 signal 方法唤醒等待的线程。
            if (c > 1)
                notEmpty.signal();
        } finally {
            // 释放 takeLock 锁。
            takeLock.unlock();
        }
        // 如果当前队列元素计数等于队列的容量,证明队列至少有一个空位,调用 signalNotFull 方法,以通知可能等待的线程队列现在不是满的。
        if (c == capacity)
            signalNotFull();
        return x;
    }
队列的出队公共方法
// 队列移除元素    
private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        // 获取头节点,头节点的item为null,next指向第一个节点,也就是指向最先入队的节点
        Node<E> h = head;
    	// 获取第一个节点
        Node<E> first = h.next;
    	// h.next表示h下一个节点指向,现在指向h,也就是指向本身,证明没有其他对象引用,方便JVM垃圾收集器回收
        h.next = h; // help GC
    	// 移动头节点next指针:将 head 引用移动到 first,即队列的第一个元素,因为原来的头节点已经被移除了。那么,first的next节点就是第一个元素了。
        head = first;
    	// 获取第一个节点的数据
        E x = first.item;
    	// 将 first 节点中的元素置为 null,这样做可以减少对对象的引用,有助于垃圾收集。
         // 头节点head的item也置为空,只维护next指针,指向就是第一个元素,也就是没移除第一个元素之前的第二个元素或者null
        first.item = null;
        // 返回第一个节点数据
        return x;
    }
队列的入队公共方法
// 队列添加元素    
private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
LinkBlockingQueue单向列表结构
 
在 LinkedBlockingQueue 中,队列的头(head)和尾(tail)是两个非常重要的概念:
- 头节点(Head):队列的头部,即队列中的第一个元素。在 LinkedBlockingQueue 中,头节点是队列中最老的元素,也就是最先进入队列的元素。当我们从队列中取元素时,总是从头节点开始取,头节点的next指针指向的元素就是我们的第一个元素,头节点的item永远为空。
- 尾节点(Tail):队列的尾部,即队列中的最后一个元素,尾节点的item就是我们的最后一个元素,尾节点的next永远为空。在 LinkedBlockingQueue 中,尾节点是队列中最新的元素,也就是最后进入队列的元素。当我们向队列中添加元素时,总是从尾节点开始添加。
LinkedBlockingQueue 中的头节点和尾节点是通过两个指针 head 和 tail 来维护的。当队列不为空时,head 总是指向队列的第一个元素,tail 总是指向队列的最后一个元素。
LinkedBlockingQueue总结
【1】无界阻塞队列,可以指定容量,默认为 Integer.MAX_VALUE,先进先出,存取互不干扰
【2】数据结构:链表(可以指定容量,默认为 Integer.MAX_VALUE,内部类Node存储元素)
【3】锁分离:读写锁分离,存取互不干扰,存取操作的是不同的Node对象【这是最大的亮点】
【4】阻塞对象(notEmpty【出队:队列count=0,无元素可取时,阻塞在该对象上】,notFull【入队:队列count=capacity,放不进元素时,阻塞在该对象上】)
【5】入队,从队尾入队,由last指针记录。
【6】出队,从队首出队,由head指针记录。
【7】线程池中采用LinkedBlockingQueue而不采用ArrayBlockingQueue的原因便是因为锁分离带来了性能的提升,大大提高队列的吞吐量。
小结:
LinkedBlockingQueue使用锁和条件对象来控制对链表的并发访问,保证线程安全。



















