阻塞队列---BlockQueue
BlockingQueue是带阻塞功能的队列,继承了Queue接口,当执行入队操作时,如果队列满了,则阻塞调用者;当执行出队操作时,如果队列是空的,也阻塞调用者。

public interface BlockingQueue<E> extends Queue<E> {
	//非阻塞入队,成功返回true,失败会抛出异常
	boolean add(E e);
	//非阻塞入队,成功返回true,失败返回false
	boolean offer(E e);
	//阻塞入队,会响应中断
	void put(E e) throws InterruptedException;
	//入队,若没有可用空间则等待指定时间,会响应中断
	boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
	//阻塞出队,会响应中断
	E take() throws InterruptedException;
	//出队,若没有可用元素则等待指定时间,会响应中断
	E poll(long timeout, TimeUnit unit) throws InterruptedException;
	//返回剩余容量
	int remainingCapacity();
	//移除元素,移除成功返回true
	boolean remove(Object o);
	//是否包含元素,是返回true
	public boolean contains(Object o);
	//元素全部出队,添加到给定的集合中
	int drainTo(Collection<? super E> c);
	//元素最大出队maxElements个,添加到给定的集合中
	int drainTo(Collection<? super E> c, int maxElements);
}ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的循环队列,在构造函数中,会要求传入数组的容量,核心源码说明如下:
/** 存储元素的数组 */
final Object[] items;
/** 队头索引 */
int takeIndex;
/** 队尾索引 */
int putIndex;
/** 队列的元素个数 */
int count;
final ReentrantLock lock;
/** lock的非空条件变量 */
private final Condition notEmpty;
/** lock的非满条件变量 */
private final Condition notFull;
......
//阻塞式入队操作
public void put(E e) throws InterruptedException {
    //空指针检查
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //可响应中断的加锁
    lock.lockInterruptibly();
    try {
        //如果队列满了,则在非满条件上等待
        while (count == items.length)
            notFull.await();
        //入队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    //获取元素数组
    final Object[] items = this.items;
    //队尾位置放置x
    items[putIndex] = x;
    //如果队尾的下一个位置到达数组末尾,则置为0
    if (++putIndex == items.length)
        putIndex = 0;
    //元素个数加1
    count++;
    //唤醒在非空条件上等待的线程
    notEmpty.signal();
}
//阻塞式出队
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //可中断的上锁
    lock.lockInterruptibly();
    try {
        //当队列为空时,在非空条件上等待
        while (count == 0)
            notEmpty.await();
        //出队
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    //获取队头位置的元素
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    //队头位置置为null
    items[takeIndex] = null;
    //如果队头的下一个位置到达数组长度,则队头置为0
    if (++takeIndex == items.length)
        takeIndex = 0;
    //元素个数减1
    count--;
    //迭代器链表不是空,则出队后更新迭代器状态
    if (itrs != null)
        itrs.elementDequeued();
    //唤醒在非满条件等待的线程
    notFull.signal();
    return x;
}
LinkedBlockingQueue
LinkedBlockingQueue是用单向链表实现的阻塞队列。
核心源码如下
/**
 * 元素节点
 * @param <E> 元素类型
 */
static class Node<E> {
	E item;
	
	Node<E> next;
	Node(E x) { item = x; }
}
/** 队列容量,构造函数没有传入时默认是0x7fffffff */
private final int capacity;
/** 当前元素数量,因为出队,入队用的是两把锁,所以需要原子类保证线程安全 */
private final AtomicInteger count = new AtomicInteger();
//队头指针
transient Node<E> head;
//队尾指针
private transient Node<E> last;
/** 出队锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 非空条件变量 */
private final Condition notEmpty = takeLock.newCondition();
/** 入队锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 非满条件变量 */
private final Condition notFull = putLock.newCondition();
//阻塞式入队
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //队满则在非满条件上等待
        while (count.get() == capacity) {
            notFull.await();
        }
        //入队
        enqueue(node);
        //元素个数加1
        c = count.getAndIncrement();
        //通知其他进行put操作的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    //如果队列本来是空的,此次操作入队成功,则通知在非空条件等待的线程
    if (c == 0)
        signalNotEmpty();
}
private void signalNotEmpty() {
    //获取出队锁才能操作非空条件变量
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
private void enqueue(Node<E> node) {
    last = last.next = node;
}
//阻塞式出队
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //队列为空时,在非空条件上等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出队
        x = dequeue();
        //元素个数减1
        c = count.getAndDecrement();
        //通知其他进行take操作的线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //如果队列本来是满的,此次操作出队成功,则通知在非满条件等待的线程
    if (c == capacity)
        signalNotFull();
    return x;
}
private void signalNotFull() {
    //获取出队锁才能操作非满条件变量
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}PriorityBlockingQueue
PriorityBlockingQueue叫做优先队列,是按照元素的优先级从小到大出队的。所以PriorityQueue中的2个元素之间可以比较大小,实现Comparable接口。
核心源码如下
/**
 * 优先队列表示为一个平衡的二叉小根堆:队列[n]的两个子队列是队列[2*n+1]和队列[2*(n+1)]。
 * 优先队列根据比较器排序,或者如果比较器为空,则根据元素的自然顺序排序:对于堆中的每个
 * 节点n和n的每个后代d,有n <= d。假设队列非空,最小值的元素在queue[0]中
 */
private transient Object[] queue;
/**
 * 队列元素的个数
 */
private transient int size;
/**
 * 比较器,如果为空则使用元素的自然排序
 */
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
/**
 * 非空的条件变量
 */
private final Condition notEmpty;
public void put(E e) {
    offer(e); // never need to block
}
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    //元素个数超出数组长度,进行扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)//没有比较器,使用元素自带的比较功能
            siftUpComparable(n, e, array);//入堆
        else//有比较器使用比较器排序
            siftUpUsingComparator(n, e, array, cmp);
        //元素个数加1
        size = n + 1;
        //唤醒在非空条件等待的线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        //如果出队为空元素,说明没有元素,则进行等待;否则返回出队元素
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        //二叉小根堆的堆顶即为要出队的元素
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        //调整堆的结构
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}构造方法如果不传入大小,内部会默认数组长度为11,优先队列只有notEmpty条件,没有notFull条件,当元素个数超出数组长度时,执行扩容操作。
DelayQueue
即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。延迟时间,就是“未来将要执行的时间”-“当前时间”。放入DelayQueue中的元素,必须实现Delayed接口。
Delayed接口:
1.如果getDelay的返回值<=0,则说明该元素到期,需要从队列中拿出来执行。
2.该接口继承了Comparable 接口,需要实现Comparable接口的方法。
核心源码如下所示
private final transient ReentrantLock lock = new ReentrantLock();
//普通优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//第一个等待延迟队列队头元素的线程
private Thread leader = null;
private final Condition available = lock.newCondition();
//入队
public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //入队,放入二叉小根堆
        q.offer(e);
        //如果放入的元素在队头(堆顶),说明是延迟时间最小,则通知其他等待线程
        //否则说明延迟时间不是最小,不做操作
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
//出队
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            //从优先队列取出堆顶元素(延迟时间最小的)
            E first = q.peek();
            //如果是空,说明队列为空,需要阻塞
            if (first == null)
                available.await();
            else {
                //获取延迟时间,如果<=0直接出队
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                //否则说明延迟时间未到,先释放队头元素的引用,然后查看是否有其他
                //线程在等待队头元素,如果有,则阻塞
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {//否则当前线程是第一个获取队头元素的线程,等待有限的时间
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //如果当前线程已获取了队头元素,则唤醒其他线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}关于延迟队列的take()函数,要注意以下两点:
(1)在队列为空 和 堆顶元素的延迟时间没到 这两种情况下,会阻塞。
(2)Thread leader变量记录了等待堆顶元素的第1个线程,如果当前线程是第一个等待队头元素的线程,则使用condition.awaitNanos()等待一个有限的时间;当发现还有其他线程也在等待堆顶元素(leader!=NULL)时,才需要使用condition.await()无限期等待。
SynchronousQueue

SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put()/take(),线程会阻塞;直到另外一个线程调用了take()/put(),两个线程才同时解锁。
和锁一样,也有公平和非公平模式。如果是公平模式,则用TransferQueue类实现;如果是非公平模式,则用TransferStack类实现。
private transient volatile Transferer<E> transferer;
//......
public SynchronousQueue(boolean fair) {
    transferer = fair ? new SynchronousQueue.TransferQueue<E>() : new SynchronousQueue.TransferStack<E>();
}put和take方法如下所示,核心都是调用Transferer类的transfer方法。transfer有两种模式,分别是生产模式和消费模式,对应put和take。
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    //第一个参数不为空时为生产模式(put),为空时为消费模式(take)
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}
public E take() throws InterruptedException {
    //第一个参数不为空时为生产模式(put),为空时为消费模式(take)
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}公平模式和非公平模式
假设3个线程分别调用了put(..),3个线程会进入阻塞状态,直到其他线程调用3次take(),和3个put(..)一一配对才解除阻塞。
如果是公平模式(队列模式),则第1个调用put(..)的线程1会在队列头部,第1个到来的take()线程和它进行配对,遵循先到先配对的原则;
如果是非公平模式(栈模式),则第3个调用put(..)的线程3会在栈顶,第1个到来的take()线程和它进行配对,遵循的是后到先配对的原则。
公平模式的实现
TransferQueue是以单向链表实现。
static final class TransferQueue<E> extends Transferer<E> {
	/**
	 * 队列的节点,单向链表
	 */
	static final class QNode {
		//队列下个节点
		volatile QNode next;
		//如果是put item不为null,如果是take,item为null
		volatile Object item;
		//put/take对应的阻塞线程
		volatile Thread waiter;
		//put操作为true,take为false
		final boolean isData;
		QNode(Object item, boolean isData) {
			this.item = item;
			this.isData = isData;
		}
		//......省略其他方法
	}
	//队头
	transient volatile QNode head;
	//队尾
	transient volatile QNode tail;
	transient volatile QNode cleanMe;
	TransferQueue() {
		//初始化一个空节点,队头队尾都指向它
		QNode h = new QNode(null, false);
		head = h;
		tail = h;
	}
	E transfer(E e, boolean timed, long nanos) {
		//这里有两种模式,分别是生产模式对应put,消费模式对应take
		//两种模式不能同时存在于队列中,它们会一旦相遇就会配对出队
		QNode s = null;
		//当前线程的模式
		boolean isData = (e != null);
		for (;;) {
			QNode t = tail;
			QNode h = head;
			if (t == null || h == null)         // 队头队尾有一个为null则说明未初始化,重新循环
				continue;
			if (h == t || t.isData == isData) { // 队列为空或当前线程的模式和队尾元素为同一种模式
				QNode tn = t.next;
				if (t != tail)                  // 不一致读,则重新循环
					continue;
				if (tn != null) {               //tn不是空,说明当前队尾后面还有元素
					advanceTail(t, tn);         //CAS操作,将队尾指针由t的地址换成tn的地址(后移队尾指针)
					continue;                   //继续重新循环
				}
				if (timed && nanos <= 0)        // take和put不会进入此分支
					return null;
				if (s == null)                  //传入的元素新建一个QNode s
					s = new QNode(e, isData);
				if (!t.casNext(null, s))        //CAS操作 将s放到t的next属性,也就是加入队尾,如果失败则重新循环
					continue;
				advanceTail(t, s);              //CAS操作 后移队尾指针
				Object x = awaitFulfill(s, e, timed, nanos); //进入阻塞状态,直到s配对成功
				if (x == s) {
					clean(t, s);
					return null;
				}
				if (!s.isOffList()) {           // 唤醒之后,s还在队列中且为第一个元素
					advanceHead(t, s);          // 后移队头指针
					if (x != null)
						s.item = s;
					s.waiter = null;
				}
				return (x != null) ? (E)x : e;
			} else {                            // 配对模式
				QNode m = h.next;               // 取队列中第一个元素
				if (t != tail || m == null || h != head)
					continue;                   // 不一致读重新for循环
				Object x = m.item;
				if (isData == (x != null) ||    // m已经配对过
						x == m ||
						!m.casItem(x, e)) {         // 尝试配对失败
					advanceHead(h, m);          // 队头直接出队
					continue;
				}
				advanceHead(h, m);              // 配对成功,出队
				LockSupport.unpark(m.waiter);   //唤醒队列中与第一个元素对应的线程
				return (x != null) ? (E)x : e;
			}
		}
	}
}非公平模式实现
TransferStack也是一个单向链表,链表中的节点有三种状态,REQUEST对应take节点,DATA对应put节点,二者配对之后,会生成一个FULFILLING节点,入栈,然后FULLING节点和被配对的节点一起出栈。
static final class TransferStack<E> extends SynchronousQueue.Transferer<E> {
	/* 节点的模式 */
	/** 消费模式 */
	static final int REQUEST    = 0;
	/** 生产模式 */
	static final int DATA       = 1;
	/** 正在配对模式 */
	static final int FULFILLING = 2;
	/** 链表的节点 */
	static final class SNode {
		volatile SNode next;        // 栈的下个节点
		volatile SNode match;       // 配对的节点
		volatile Thread waiter;     // 对应的阻塞线程
		Object item;                // data; or null for REQUESTs
		int mode;                   //三种模式
		SNode(Object item) {
			this.item = item;
		}
		//......省略其他方法
	}
	/** 栈顶指针 */
	volatile SNode head;
	static SNode snode(SNode s, Object e, SNode next, int mode) {
		if (s == null) s = new SNode(e);
		s.mode = mode;
		s.next = next;
		return s;
	}
	@SuppressWarnings("unchecked")
	E transfer(E e, boolean timed, long nanos) {
		SNode s = null;
		int mode = (e == null) ? REQUEST : DATA;
		for (;;) {
			SNode h = head;
			if (h == null || h.mode == mode) {  // 栈为空或同种模式
				if (timed && nanos <= 0) {      // take和put不会进入这个分支
					if (h != null && h.isCancelled())
						casHead(h, h.next);
					else
						return null;
				} else if (casHead(h, s = snode(s, e, h, mode))) {//新元素创建节点入栈
					SNode m = awaitFulfill(s, timed, nanos);      //阻塞等待,直到s配对成功
					if (m == s) {
						clean(s);
						return null;
					}
					if ((h = head) != null && h.next == s)
						casHead(h, s.next);
					return (E) ((mode == REQUEST) ? m.item : s.item);
				}
			} else if (!isFulfilling(h.mode)) { // 不是同一种模式,待配对
				if (h.isCancelled())
					casHead(h, h.next);
				else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//生成fulfilling节点,入栈
					for (;;) { // 循环,直到匹配成功或者等待线程=null
						SNode m = s.next;       // m和s进行配对
						if (m == null) {        // 栈中只剩s元素了
							casHead(s, null);   // 弹出fulfilling节点
							s = null;           // s引用置空以便下次使用
							break;              // 继续外层循环
						}
						SNode mn = m.next;
						if (m.tryMatch(s)) {    //m和s尝试配对
							casHead(s, mn);     // s和m一起出栈,栈顶指针指向mn
							return (E) ((mode == REQUEST) ? m.item : s.item);
						} else                  // 配对失败
							s.casNext(m, mn);   // CAS操作,s.next字段由m指向mn,继续循环匹配直到匹配成功
					}
				}
			} else {                            // 已经配对过了,出栈
				SNode m = h.next;               // m和h进行配对
				if (m == null)                  // 栈中只剩h元素
					casHead(h, null);           // head=null,弹出h节点
				else {
					SNode mn = m.next;
					if (m.tryMatch(h))          // 配对成功
						casHead(h, mn);         // h和m一起出栈,栈顶指针指向mn
					else                        // 配对失败
						h.casNext(m, mn);       // CAS操作,h.next字段由m指向mn
				}
			}
		}
	}
}




![[R语言]RMarkdown: 入门与操作](https://img-blog.csdnimg.cn/img_convert/82ecf9265cd55102f0df503aa254219d.png)














