Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析

news2025/7/19 9:00:41

上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。

但是BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是SynchronousQueue(同步移交队列),队列长度为0。

作用就是一个线程往队列放数据的时候,必须等待另一个线程从队列中取走数据。同样,从队列中取数据的时候,必须等待另一个线程往队列中放数据。

这样特殊的队列,有什么应用场景呢?

1. SynchronousQueue用法

先看一个SynchronousQueue的简单用例:

/**
 * @author 一灯架构
 * @apiNote SynchronousQueue示例
 **/
public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建SynchronousQueue队列
        BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();

        // 2. 启动一个线程,往队列中放3个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 入队列 1");
                synchronousQueue.put(1);
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 入队列 2");
                synchronousQueue.put(2);
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 入队列 3");
                synchronousQueue.put(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 3. 等待1000毫秒
        Thread.sleep(1000L);

        // 4. 再启动一个线程,从队列中取出3个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}
复制代码

输出结果:

Thread-0 入队列 1
Thread-1 出队列 1
Thread-0 入队列 2
Thread-1 出队列 2
Thread-0 入队列 3
Thread-1 出队列 3
复制代码

从输出结果中可以看到,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2。

由于SynchronousQueue是BlockingQueue的实现类,所以也实现类BlockingQueue中几组抽象方法:

为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法。

操作抛出异常返回特定值阻塞阻塞一段时间
放数据addofferputoffer(e, time, unit)
取数据removepolltakepoll(time, unit)
查看数据(不删除)element()peek()不支持不支持

这几组方法的不同之处就是:

  1. 当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer(e, time, unit)方法阻塞指定时间然后返回false。
  2. 当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll(time, unit)方法阻塞指定时间然后返回null。
  3. 当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。

工作中使用最多的就是offer、poll阻塞指定时间的方法。

2. SynchronousQueue应用场景

SynchronousQueue的特点:

队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。

这种特殊的实现逻辑有什么应用场景呢?

我的理解就是,如果你希望你的任务需要被快速处理,就可以使用这种队列。

Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}
复制代码

newCachedThreadPool线程池的核心线程数是0,最大线程数是Integer的最大值,线程存活时间是60秒。

如果你使用newCachedThreadPool线程池,你提交的任务会被更快速的处理,因为你每次提交任务,都会有一个空闲的线程等着处理任务。如果没有空闲的线程,也会立即创建一个线程处理你的任务。

你想想,这处理效率,杠杠滴!

当然也有弊端,如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。

3. SynchronousQueue源码解析

3.1 SynchronousQueue类属性

public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

    // 转换器,取数据和放数据的核心逻辑都在这个类里面
    private transient volatile Transferer<E> transferer;

    // 默认的构造方法(使用非公平队列)
    public SynchronousQueue() {
        this(false);
    }

    // 有参构造方法,可以指定是否使用公平队列
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

    // 转换器实现类
    abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }

    // 基于栈实现的非公平队列
    static final class TransferStack<E> extends Transferer<E> {
    }

    // 基于队列实现的公平队列
    static final class TransferQueue<E> extends Transferer<E> {
    }

}
复制代码

可以看到SynchronousQueue默认的无参构造方法,内部使用的是基于栈实现的非公平队列,当然也可以调用有参构造方法,传参是true,使用基于队列实现的公平队列。

// 使用非公平队列(基于栈实现)
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
// 使用公平队列(基于队列实现)
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(true);
复制代码

本次就常用的栈实现来剖析SynchronousQueue的底层实现原理。

3.2 栈底层结构

栈结构,是非公平的,遵循先进后出。

使用个case测试一下:

/**
 * @author 一灯架构
 * @apiNote SynchronousQueue示例
 **/
public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建SynchronousQueue队列
        SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();

        // 2. 启动一个线程,往队列中放1个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 入队列 0");
                synchronousQueue.put(0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 3. 等待1000毫秒
        Thread.sleep(1000L);

        // 4. 启动一个线程,往队列中放1个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 入队列 1");
                synchronousQueue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 5. 等待1000毫秒
        Thread.sleep(1000L);

        // 6. 再启动一个线程,从队列中取出1个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 7. 等待1000毫秒
        Thread.sleep(1000L);

        // 8. 再启动一个线程,从队列中取出1个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}
复制代码

输出结果:

Thread-0 入队列 0
Thread-1 入队列 1
Thread-2 出队列 1
Thread-3 出队列 0
复制代码

从输出结果中可以看出,符合栈结构先进后出的顺序。

3.3 栈节点源码

栈中的数据都是由一个个的节点组成的,先看一下节点类的源码:

// 节点
static final class SNode {
    // 节点值(取数据的时候,该字段为null)
    Object item;
    // 存取数据的线程
    volatile Thread waiter;
    // 节点模式
    int mode;
    // 匹配到的节点
    volatile SNode match;
    // 后继节点
    volatile SNode next;
}
复制代码
  • item

    节点值,只在存数据的时候用。取数据的时候,这个值是null。

  • waiter

    存取数据的线程,如果没有对应的接收线程,这个线程会被阻塞。

  • mode

    节点模式,共有3种类型:

    类型值类型描述类型的作用
    0REQUEST表示取数据
    1DATA表示存数据
    2FULFILLING表示正在等待执行(比如取数据的线程,等待其他线程放数据)

3.4 put/take流程

放数据和取数据的逻辑,在底层复用的是同一个方法,以put/take方法为例,另外两个放数据的方法,add和offer方法底层实现是一样的。

先看一下数据流转的过程,方便理解源码。

还是以上面的case为例:

  1. Thread0先往SynchronousQueue队列中放入元素0
  2. Thread1再往SynchronousQueue队列放入元素1
  3. Thread2从SynchronousQueue队列中取出一个元素

第一步:Thread0先往SynchronousQueue队列中放入元素0

把本次操作组装成SNode压入栈顶,item是元素0,waiter是当前线程Thread0,mode是1表示放入数据。

第二步:Thread1再往SynchronousQueue队列放入元素1

把本次操作组装成SNode压入栈顶,item是元素1,waiter是当前线程Thread1,mode是1表示放入数据,next是SNode0。

第三步:Thread2从SynchronousQueue队列中取出一个元素

这次的操作比较复杂,也是先把本次的操作包装成SNode压入栈顶。

item是null(取数据的时候,这个字段没有值),waiter是null(当前线程Thread2正在操作,所以不用赋值了),mode是2表示正在操作(即将跟后继节点进行匹配),next是SNode1。

然后,Thread2开始把栈顶的两个节点进行匹配,匹配成功后,就把SNode2赋值给SNode1的match属性,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点。

3.5 put/take源码实现

先看一下put方法源码:

// 放数据
public void put(E e) throws InterruptedException {
    // 不允许放null元素
    if (e == null)
        throw new NullPointerException();
    // 调用转换器实现类,放元素
    if (transferer.transfer(e, false, 0) == null) {
        // 如果放数据失败,就中断当前线程,并抛出异常
        Thread.interrupted();
        throw new InterruptedException();
    }
}
复制代码

核心逻辑都在transfer方法中,代码很长,理清逻辑后,也很容易理解。

// 取数据和放数据操作,共用一个方法
E transfer(E e, boolean timed, long nanos) {
    SNode s = null;
    // e为空,说明是取数据,否则是放数据
    int mode = (e == null) ? REQUEST : DATA;

    for (; ; ) {
        SNode h = head;
        // 1. 如果栈顶节点为空,或者栈顶节点类型跟本次操作相同(都是取数据,或者都是放数据)
        if (h == null || h.mode == mode) {
            // 2. 判断节点是否已经超时
            if (timed && nanos <= 0) {
                // 3. 如果栈顶节点已经被取消,就删除栈顶节点
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
                // 4. 把本次操作包装成SNode,压入栈顶
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 5. 挂起当前线程,等待被唤醒
                SNode m = awaitFulfill(s, timed, nanos);
                // 6. 如果这个节点已经被取消,就删除这个节点
                if (m == s) {
                    clean(s);
                    return null;
                }
                // 7. 把s.next设置成head
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 8. 如果栈顶节点类型跟本次操作不同,并且不是FULFILLING类型
        } else if (!isFulfilling(h.mode)) {
            // 9. 再次判断如果栈顶节点已经被取消,就删除栈顶节点
            if (h.isCancelled())
                casHead(h, h.next);
                // 10. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶
            else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
                // 11. 使用死循环,直到匹配到对应的节点
                for (; ; ) {
                    // 12. 遍历下个节点
                    SNode m = s.next;
                    // 13. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。
                    if (m == null) {
                        casHead(s, null);
                        s = null;
                        break;
                    }
                    SNode mn = m.next;
                    // 14. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。
                    if (m.tryMatch(s)) {
                        casHead(s, mn);
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else
                        // 15. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配
                        s.casNext(m, mn);
                }
            }
        } else {
            // 16. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型,
            // 就再执行一遍上面第11步for循环中的逻辑(很少概率出现)
            SNode m = h.next;
            if (m == null)
                casHead(h, null);
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))
                    casHead(h, mn);
                else
                    h.casNext(m, mn);
            }
        }
    }
}
复制代码

transfer方法逻辑也很简单,就是判断本次操作类型是否跟栈顶节点相同,如果相同,就把本次操作压入栈顶。否则就跟栈顶节点匹配,唤醒栈顶节点线程,弹出栈顶节点。

transfer方法中调用了awaitFulfill方法,作用是挂起当前线程。

// 等待被唤醒
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 1. 计算超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 2. 计算自旋次数
    int spins = (shouldSpin(s) ?
            (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();
        // 3. 如果已经匹配到其他节点,直接返回
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            // 4. 超时时间递减
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 5. 自旋次数减一
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null)
            s.waiter = w;
        // 6. 开始挂起当前线程
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}
复制代码

awaitFulfill方法的逻辑也很简单,就是挂起当前线程。

take方法底层使用的也是transfer方法:

// 取数据
public E take() throws InterruptedException {
    // // 调用转换器实现类,取数据
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    // 没取到,就中断当前线程
    Thread.interrupted();
    throw new InterruptedException();
}
复制代码

4. 总结

  1. SynchronousQueue是一种特殊的阻塞队列,队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。
  2. SynchronousQueue底层是基于栈和队列两种数据结构实现的。
  3. Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。
  4. 如果希望你的任务需要被快速处理,可以使用SynchronousQueue队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/17385.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql的服启动以及用户登录

目录 1.mysql的启动 A.使用电脑图形化界面打开 B.使用命令行 2.数据库的登录 A.使用命令行加密码直接登录 B.使用命令行再后面输入密码实现登录 C.访问同一台电脑上的不同数据库 D.访问其他主机上的数据库 E.退出mysql 1.mysql的启动 A.使用电脑图形化界面打开 在电脑当中…

Spring MVC面试题

什么是Spring MVC&#xff1f;简单介绍下你对Spring MVC的理解&#xff1f; SpringMVC是一个基于Java的实现了MVC设计模式的请求驱动类型的轻量级Web框架&#xff0c;把复杂的web应用分成逻辑清晰的表示层、控制层、业务层&#xff08;服务层&#xff09;、持久层&#xff0c;…

谈谈从DAMA、DCMM和DGI三大数据治理框架详细了解数据战略规划的关键要素

当前,数据作为新的生产要素提到了关键位置,众多组织认为数据是重要的战略资产。可是,如何发挥数据要素的生产力,数据资产又如何为企业创造价值,确有些无从下手。那么,这就是数据战略要解决的问题。企业怎么看待数据资产、数据的价值如何定位,对数据价值的期望是什么,数…

javaweb 之 会话技术 Cookie Session 登录注册案例 验证码

会话跟踪技术 会话&#xff1a;用户打开浏览器&#xff0c;访问web服务器的资源&#xff0c;会话建立&#xff0c;直到有一方断开连接&#xff0c;会话结束。在一次会话中可以包含多次请求和响应 会话跟踪&#xff1a;一种维护浏览器状态的方法&#xff0c;服务器需要识别多次…

《面试系列篇》——11种常用的设计模式

目录 【一】前言 【二】单例模式 2.1概念 2.2 饿汉模式 2.3 懒汉模式 多线程版本&#xff1a; 【三】简单工厂模式 【四】工厂方法模式 【五】抽象工厂模式 【六】策略模式 【七】装饰模式 7.1 定义 7.2 使用示例 【八】代理模式 8.1 定义 8.2 使用的优势 8.3…

【机器学习】岭回归和LASSO回归详解以及相关计算实例-加利福尼亚的房价数据集、红酒数据集

文章目录一,岭回归和LASSO回归1.1 多重共线性1.2 岭回归接手1.3 线性回归的漏洞&#xff08;线性系数的矛盾解释&#xff09;1.4 Ridge Regression1.5 岭回归实例&#xff08;加利福尼亚的房价数据集&#xff09;1.6 MSE1.7 LASSO1.8 Ridge岭回归和Lasso套索回归的比较1.9 Lass…

H5基本开发2——(HTML常见标签)

常见标签说明 <!DOCTYPE html> <html lang"zh-CN"> <head> <meta charset"UTF-8"> <meta name"viewport" content"widthdevice-width, initial-scale1.0"> <meta http-equiv"X-UA-Compatible&q…

Elasticsearch安装

1.Linux下使用Elasticsearch 本教程所有操作均在CentOS 7.x环境下进行。 elasticsearch基于java进行开发&#xff0c;所以在运行elasticsearch之前需要安装jdk yum -y install java-1.8.0-openjdk 1.1.安装和配置 即将开始在centos 7 下安装Elasticsearch 1.1.1.新建一个用…

(四)QCustomPlot柱形图动态显示实例开发

系列文章目录 提示&#xff1a;这里是该系列文章的所有文章的目录 第一章&#xff1a; &#xff08;一&#xff09;QCustomPlot常见属性设置、多曲线绘制、动态曲线绘制、生成游标、矩形放大等功能实现 第二章&#xff1a; &#xff08;二&#xff09;QCustomPlot生成热力图/矩…

【数据结构与算法】递归全流程详细剖析 | 详解图的深度优先遍历

&#x1f680; 作者 &#xff1a;“大数据小禅” &#x1f680;文章简介&#xff1a;本篇文章属于数据结构与算法系列文章&#xff0c;这篇文章会对算法中的递归进行一个详细的介绍&#xff0c;不仅是概念&#xff0c;而是从运行过程中的每一步进行详细分析。并使用递归的方式来…

Vue3 - 使用 mitt.js 进行组件通信(兄弟关系)

前言 在 Vue2 中&#xff0c;两个同级&#xff08;兄弟关系&#xff09;的组件传递数据&#xff0c;都是利用一个叫 EventBus 事件总线&#xff0c;利用这根总线&#xff0c;可以让两个毫无关系的组件进行通信。 其实这个问题的本质核心解决方案&#xff0c;就一句话。通过第三…

开传奇需要什么技术要什么条件

开传奇需要什么技术要什么条件 对于这个问题&#xff0c;近期问的人比较多。这也是热爱传奇这个游戏的朋友会问到的一个问题&#xff0c;因为喜欢玩这个游戏&#xff0c;也想要自己去开一个经营一个 不管是电脑端还是手机端。但是对于一些新手确实不知道该如何开始操作从哪些方…

基于PHP+MySQL的校园二手旧书回收捐赠系统

校园内因为所教授的课程比较多,人数基数也比较大,所以每年都会产生大量的二手废旧图书,很多时候这些图书还可以进行二次利用,如果直接当废品进行回收很暴殄天物的,所以为了能够让这些校园内的二手图书得到更多的利用,我们开发了PHP校园二手旧书回收捐赠系统 本系统分为前台和后…

「Redis数据结构」压缩列表(ZipList)

「Redis数据结构」压缩列表&#xff08;ZipList&#xff09; 文章目录「Redis数据结构」压缩列表&#xff08;ZipList&#xff09;一、概述二、结构三、连锁更新问题四、压缩列表的缺陷五、小结参考ZipList 是一种特殊的“双端链表” &#xff0c;由一系列特殊编码的连续内存块…

基于协同过滤进行金融产品个性化推荐 代码+数据

1.案例简介 (1)方法概述: 本教程包含如下内容: 从原始的数据文件中加载数据,进行训练集和测试集的切分。 对数据分batch, 利用用户侧信息,和商品侧信息,进行双塔模型的训练 结果展示 (2)宏观流程图 (3)案例知识点 pytorch 视频CTR预估指标 博客point wise 以及 pair wise …

从0到1图文教你如何将spring boot项目部署到minikube中去

这里是weihubeats,觉得文章不错可以关注公众号小奏技术&#xff0c;文章首发。拒绝营销号&#xff0c;拒绝标题党 源码 本项目博文的源码已上传github&#xff0c;有需要自取 github: https://github.com/weihubeats/weihubeats_demos/tree/master/spring-boot-demos/spring-…

C#界面里Control.Enabled 属性的使用

C#界面里Control.Enabled 属性的使用 在开发中即使一个细节也可以决定成败。 前段时间刚好是毕业季,公司招聘了很多应界毕业生, 由于项目非常多,许多老手都已经分配到各个项目上去,还有一些小项目不得不让这些新生力军来接手。 这些初生牛犊不怕虎,天天加班,夜夜苦战…

idea打开之前的项目不能正常编译/idea中项目Compile output丢失问题

报错笔记 文章目录错误一&#xff1a;Cannot resolve class or package java错误二&#xff1a;Compile output丢失问题下面的报错发生情况都是打开之前写的JavaWeb文件时&#xff0c;各个文件中的代码爆红错误一&#xff1a;Cannot resolve class or package ‘java’ 在idea…

【安卓应用渗透】第一篇:安卓逆向回顾和梳理-2211

文章目录安卓开发课程回顾第一课&#xff1a;安卓项目文件结构&#xff08;逻辑视图&#xff09;第五课&#xff1a;文件读取和数据库存取文件存取SQLite数据库存取安卓开发&#xff1a;案前资料库安卓逆向课程回顾第一课&#xff1a;基础逆向工具&#xff08;Apktool, Keytool…

【分享】订阅金蝶云进销存集简云连接器同步销货数据至金蝶云进销存系统

方案场景 在企业规模不断壮大的过程中&#xff0c;企业都是在钉钉内提交OA审批&#xff0c;并通过人工的方式统计多种审批数据&#xff0c;然后手动导入到某系统内&#xff0c;如果中间有调整又需要从头再进行核对&#xff0c;繁琐的流程严重影响了业务拓展。基于该方式企业希…