构建高性能内存队列:Disruptor

news2025/7/7 1:18:50

1、 背景

Java中有哪些队列

ArrayBlockingQueue 使用ReentrantLock
LinkedBlockingQueue 使用ReentrantLock
ConcurrentLinkedQueue 使用CAS
等等
我们清楚使用锁的性能比较低,尽量使用无锁设计。接下来就我们来认识下Disruptor。

2、Disruptor简单使用

github地址:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results

先简单介绍下:

Disruptor它是一个开源的并发框架,并获得2011 Duke’s程序框架创新奖【Oracle】,能够在无锁的情况下实现网络的Queue并发操作。英国外汇交易公司LMAX开发的一个高性能队列,号称单线程能支撑每秒600万订单~
日志框架Log4j2 异步模式采用了Disruptor来处理
局限呢,他就是个内存队列,也就是说无法支撑分布式场景。

3、简单使用

导入依赖

<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.2</version>
</dependency>

数据传输对象

@Data
public class EventData {
    private Long value;
}

消费者

public class EventConsumer implements WorkHandler<EventData> {

    /**
     * 消费回调
     * @param eventData
     * @throws Exception
     */
    @Override
    public void onEvent(EventData eventData) throws Exception {
        Thread.sleep(5000);
        System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue());
    }
}

生产者

public class EventProducer {

    private final RingBuffer<EventData> ringBuffer;

    public EventProducer(RingBuffer<EventData> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(Long v){
        // cas展位
        long next = ringBuffer.next();
        try {
            EventData eventData = ringBuffer.get(next);
            eventData.setValue(v);
        } finally {
            // 通知等待的消费者
            System.out.println("EventProducer send success, sequence:"+next);
            ringBuffer.publish(next);
        }
    }
}

测试类

public class DisruptorTest {

    public static void main(String[] args) {
        // 2的n次方
        int bufferSize = 8;

        Disruptor<EventData> disruptor = new Disruptor<EventData>(
                () -> new EventData(), // 事件工厂
                bufferSize,            // 环形数组大小
                Executors.defaultThreadFactory(),       // 线程池工厂
                ProducerType.MULTI,    // 支持多事件发布者
                new BlockingWaitStrategy());    // 等待策略

        // 设置消费者
        disruptor.handleEventsWithWorkerPool(
                new EventConsumer(),
                new EventConsumer(),
                new EventConsumer(),
                new EventConsumer());

        disruptor.start();

        RingBuffer<EventData> ringBuffer = disruptor.getRingBuffer();
        EventProducer eventProducer = new EventProducer(ringBuffer);
        long i  = 0;
        for(;;){
            i++;
            eventProducer.sendData(i);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

4、核心组件

基于上面简单例子来看确实很简单,Disruptor帮我们封装好了生产消费模型的实现,接下来我们来看下他是基于哪些核心组件来支撑起一个高性能无锁队列呢?

  • RingBuffer:环形数组
    底层使用数组entries,在初始化时填充数组,避免不断新建对象带来的开销。后续只会对entries做更新操作
    在这里插入图片描述

  • Sequencer: 核心管家

    • 定义生产同步的实现:SingleProducerSequencer单生产、MultiProducerSequencer多生产
    • 当前写的进度Sequence cursor
    • 所有消费者进度的数组Sequence[] gatingSequences
    • MultiProducerSequencer可用区availableBuffer【利用空间换取查询效率】
  • Sequence: 本身就是一个序号器用来标识处理进度,也可以当做是一个atomicInteger; 还有另外一个特点,为了解决伪共享问题而引入的:缓存行填充。这个在后面介绍。

  • workProcessor: 处理Event的循环,在循环中获取Disruptor的事件,然后把事件分配给各个handler

  • EventHandler: 负责业务逻辑的handler,自己实现。

  • WaitStrategy: 消费者 如何等待 事件的策略,定义了如下策略

    • leepingWaitStrategy:自旋 + yield + sleep
    • BlockingWaitStrategy:加锁,适合CPU资源紧张(不需要切换线程),系统吞吐量无要求的
    • YieldingWaitStrategy:自旋 + yield + 自旋
    • BusySpinWaitStrategy:自旋,减少线程之前切换
    • PhasedBackoffWaitStrategy:自旋 + yield + 自定义策略

5、带着问题来解析代码?

1、多生产者如何保证消息生产不会相互覆盖。【如何达到互斥效果】

在这里插入图片描述
每个线程获取不同的一段数组空间,然后通过CAS判断这段空间是否已经分配出去。

接下来我们看下多生产类MultiProducerSequencer中next方法【获取生产序号】

// 消费者上一次消费的最小序号 // 后续第二点会讲到
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 当前进度的序号
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 所有消费者的序号 //后续第二点会讲到
protected volatile Sequence[] gatingSequences = new Sequence[0];

 public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
        long current;
        long next;
        do
        {
            // 当前进度的序号,Sequence的value具有可见性,保证多线程间线程之间能感知到可申请的最新值
            current = cursor.get();
            // 要申请的序号空间:最大序列号
            next = current + n;
  
            long wrapPoint = next - bufferSize;
            // 消费者最小序列号
            long cachedGatingSequence = gatingSequenceCache.get();
            // 不允许生产进度比消费进度大于bufferSize,如果大于bufferSize,就阻塞等待
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                // 说明大于1圈,并没有多余空间可以申请
                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }
                // 更新最小值到Sequence的value中
                gatingSequenceCache.set(gatingSequence);
            }
            // CAS成功后更新当前Sequence的value
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);
        return next;
    }

2、生产者向序号器申请写的序号,如序号正在被消费,Sequencer是如何知道哪些序号是可以被写入的呢?【未消费则被覆盖如何处理】

从gatingSequences中取得最小的序号,生产者最多能写到这个序号的后一位。通俗来讲就是申请的序号不能大于最小消费者序号一圈【申请到最大序列号-buffersize 要小于/等于 最小消费的序列号】的时候, 才能申请到当前写的序号
在这里插入图片描述

public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
    return createWorkerPool(new Sequence[0], workHandlers);
}


EventHandlerGroup<T> createWorkerPool(
    final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);


    consumerRepository.add(workerPool, sequenceBarrier);

    final Sequence[] workerSequences = workerPool.getWorkerSequences();

    updateGatingSequencesForNextInChain(barrierSequences, workerSequences);

    return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}

    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    if (processorSequences.length > 0)
    {
        // 消费者启动后就会将所有消费者存放入AbstractSequencer中gatingSequences
        ringBuffer.addGatingSequences(processorSequences);
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}

3、在多生产者情况下,生产者是申请到一段可写入的序号,然后再写入这些序号中,那么消费者是如何感知哪些序号是可以被消费的呢?【借问提1图说明】

这个前提是多生产者情况下,第一点我们说过每个线程获取不同的一段数组空间,那么现在单单通过序号已经不够用了,MultiProducerSequencer使用了int 数组 【availableBuffer】来标识当前序号是否可用。当生产者成功生产事件后会将availableBuffer中当前序列号置为1标识可以读取。

如此消费者可以读取的的最大序号就是我们availableBuffer中第一个不可用序号-1。
在这里插入图片描述
初始化availableBuffer流程

public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    // 初始化可用数组
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
// 初始化默认availableBuffer为-1
private void initialiseAvailableBuffer()
{
    for (int i = availableBuffer.length - 1; i != 0; i--)
    {
        setAvailableBufferValue(i, -1);
    }

    setAvailableBufferValue(0, -1);
}

// 生产者成功生产事件将可用区数组置为1
public void publish(final long sequence)
{
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();
}

private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

消费者消费流程

WorkProcessor类中消费run方法
public void run()
    {
        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // 先通过cas获取消费事件的占有权
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                // 数据就绪,可以消费
                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    // 触发回调函数
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    // 获取可以被读取的下标
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
        // ....省略
        }

        notifyShutdown();

        running.set(false);
    }
    
    
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();
        // 这个值获取的current write 下标,可以认为全局消费下标。此处与每一段的write1和write2下标区分开
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        // 通过availableBuffer筛选出第一个不可用序号 -1
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
    
    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        // 从current read下标开始, 循环至 current write,如果碰到availableBuffer 为-1 直接返回
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
        {
            if (!isAvailable(sequence))
            {
                return sequence - 1;
            }
        }

        return availableSequence;
    }

5、什么是伪共享问题呢?

为了提高CPU的速度,Cpu有高速缓存Cache,该缓存最小单位为缓存行CacheLine,他是从主内存复制的Cache的最小单位,通常是64字节。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。如果你访问一个long数组,当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。

伪共享问题是指,当多个线程共享某份数据时,线程1可能拉到线程2的数据在其cache line中,此时线程1修改数据,线程2取其数据时就要重新从内存中拉取,两个线程互相影响,导致数据虽然在cache line中,每次却要去内存中拉取。

Disruptor是如何解决的呢?

在value前后统一都加入7个Long类型进行填充,线程拉取时,不论如何都会占满整个缓存
在这里插入图片描述

6、回顾总结:Disuptor为何能称之为高性能的无锁队列框架呢?

  • 缓存行填充,避免缓存频繁失效。【java8中也引入@sun.misc.Contended注解来避免伪共享】
  • 无锁竞争:通过CAS 【二阶段提交】
  • 环形数组:数据都是覆盖,避免GC
  • 底层更多的使用位运算来提升效率

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/109231.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web3中文|可判115年监禁的FTX创始人SBF即将被引渡到美国

巴哈马总检察长办公室在宣布逮捕FTX前CEO Sam Bankman-Fried时&#xff0c;指出他很可能应美国要求被引渡。 一个多星期后&#xff0c;美国广播公司新闻报道称 &#xff0c;SBF于12月20日签署了引渡文件。 另据彭博社12月20日的一份报告称&#xff0c;该交易所创始人SBF于12月…

Centos7安装配置Minio

Background 官方下载地址&#xff1a;https://github.com/minio/minio/releases 这里给出本次使用的一个版本&#xff1a;minio-2021-05-11T23:27:41Z&#xff0c;提取码&#xff1a;king 1、下载minio文件夹 其他的版本的相关命令可能发生变化&#xff0c;这里只是针对我提供…

基于GIS的生态安全格局构建之生态阻力面的建立

GIS前沿 一、数据来源介绍 &#xff08;一&#xff09;土地利用数据 土地利用数据来自国土资源三次调查数据&#xff08;2018年&#xff09;&#xff0c;根据研究需要对其进行分析处理。 &#xff08;二&#xff09;生态安全等级数据 利用对从生态属性和生态干扰两方面选择的…

Junit5 + YAML 轻松实现参数化和数据驱动,让 App 自动化测试更高效(一)

登录&#xff1a;不同的用户名&#xff0c;不同的密码&#xff0c;不同的组合都需要做登录场景的测试&#xff0c;正常的排列组合下可能会产生多个用例 搜索&#xff1a;不同的搜索条件产生不同的搜索结果&#xff0c;搜索也是常见的测试项&#xff0c;单个搜索参数或者多种搜…

Java---正则表达式

目录 一、正则表达式的介绍 二、正则表达式的基本语法 &#xff08;1&#xff09;字符类 &#xff08;2&#xff09;预定义符 &#xff08;3&#xff09;数量词 三、正则表达式的具体实例 &#xff08;1&#xff09;判断电话号码是否符合规则 &#xff08;2&#xff09;…

git push踩坑记录【看注意事项】

记录一次git push的踩坑过程&#xff08;详细在注意事项里&#xff0c;列出了具体的解决办法&#xff09;。 push远程仓库命令 使用命令 git init git add . git commit -m "提交说明写在这里" git remote add origin gitgithub.com:xxx/surgical-robot.git git p…

4、常用类和对象

文章目录4、常用类和对象4.1 Object4.2 数组4.3 二维数组4.4 二维数组 九层妖塔4.5 冒泡排序4.6 选择排序4.7 二分法查找4.8 字符串4.9 字符串拼接4.10 字符串比较4.11 字符串截断4.12 字符串替换4.13 字符串大小写转换4.14 字符串查询4.15 StringBuilder4.16 包装类4.17 日期类…

windows环境下python和gdal绑定方法

作者:朱金灿 来源:clever101的专栏 为什么大多数人学不会人工智能编程?>>> 编译和安装gdal 此篇介绍的方法并不需要用到pip工具,可依据自己编译的gdal库来灵活绑定。 安装gdal主要是设置两个环境变量:一是gdal的动态库路径加入到path环境变量下,如下图: 二是…

vim的常规操作

Linux系统内置vi文本编辑器&#xff0c;vim是vi的增强版 vi和vim的三种模式 正常模式&#xff1a;默认模式&#xff0c;可以使用“上下左右”键来移动光标&#xff0c;也可以用删除、复制、粘体等功能插入模式&#xff1a;按i,I,o,O,a,A,r,R等任何一个字母进入命令行模式&…

django logging的StreamHandler的一个小用法

首先先了解下&#xff0c;logging的大致结构&#xff0c;它有一个内置处理器&#xff0c;还有一个django提供的内置记录器。基本上&#xff0c;日志模块就是由这俩组成的&#xff0c;他俩的关系&#xff0c;有点水渠理论的意思。就是说&#xff0c;处理器&#xff0c;和记录器&…

MCU-51:独立按键控制LED灯的动作

目录一、独立按键二、独立按键控制LED亮灭二、消除按键抖动2.1 按键的抖动2.2 控制LED灯状态-消除按键抖动三、独立按键控制LED显示二进制四、独立按键控制LED灯移位一、独立按键 轻触按键&#xff1a;相当于是一种电子开关&#xff0c;按下时开关接通&#xff0c;松开时开关断…

微服务技术--认识微服务

技术栈&#xff1a; 认识微服务 服务架构演变 单体架构 将业务的功能集中在一个项目中开发&#xff0c;打成一个包部署优点&#xff1a; 架构简单部署成本低 缺点&#xff1a; 耦合度高 分布式架构 根据业务功能对系统进行拆分&#xff0c;每个业务模块作为独立项目开发&…

黑盒测试用例设计 - 场景法

原理 现在的软件几乎都是用时间触发来控制流程的。测试时&#xff0c;可以以生动的描述出触发时的情景&#xff0c;有利于设计测试用例&#xff0c;同时使测试用例更容易理解和执行。基本流&#xff1a;软件功能按照正确的事件流实现的一条正确流程。通常一个业务仅存在一个基…

[洛谷]P2234 [HNOI2002]营业额统计

[洛谷]P2234 [HNOI2002]营业额统计一、问题描述题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1提示二、问题分析1、算法标签2、思路分析三、代码实现一、问题描述 [洛谷]P2234 [HNOI2002]营业额统计 题目描述 Tiger 最近被公司升任为营业部经理&#xff0c;他上任后…

微服务系列 - Zookeeper下篇:源码解析

前言 关于zookeeper的入门到精通请阅读&#xff1a;微服务系列 - Zookeeper上篇 注&#xff1a;本内容仅用于个人学习笔记&#xff0c;如有侵扰&#xff0c;联系删除 参考文档&#xff1a;https://blog.csdn.net/mjb740074431/article/details/120173792 一、算法基础 Zook…

@Configuration注解

1.作用 Configuration注解的作用&#xff1a;声明一个类为配置类&#xff0c;用于取代bean.xml配置文件注册bean对象。 2.基础运用 Configuration注解最常见的搭配使用有两个&#xff1a;Bean和Scope Bean&#xff1a;等价于Spring中的bean标签用于注册bean对象的&#xff…

SpringBoot 过滤器、拦截器、监听器对比及使用场景

一、关系图理解 二、区别 1.过滤器 过滤器是在web应用启动的时候初始化一次, 在web应用停止的时候销毁 可以对请求的URL进行过滤, 对敏感词过滤 挡在拦截器的外层 实现的是 javax.servlet.Filter 接口&#xff0c;是 Servlet 规范的一部分 在请求进入容器后&#xff0c;但…

Java 线程的六种状态及其简易转换

1.Java中线程的状态分为六种 NEW&#xff1a;初始状态&#xff0c;线程被创建&#xff0c;但是还没有调用start()方法。 RUNNABLE&#xff1a;运行状态&#xff0c;Java线程将操作系统中的就绪和运行两种状态笼统地称作“运行中”。 BLOCKED&#xff1a;阻塞状态,表示线程阻塞于…

IJCAI-2022 多级发射方法的脉冲神经网络

原文链接&#xff1a;CSDN-脉冲神经网络&#xff08;SNN&#xff09;论文阅读&#xff08;四&#xff09;-----IJCAI-2022 多级发射方法的脉冲神经网络 Multi-Level Firing with Spiking DS-ResNet: Enabling Better and Deeper Directly-Trained Spiking Neural Networks目录说…

SAP UI5 Smart Table 和 Smart Filter Bar 的联合使用方法介绍试读版

本教程第 147 个步骤&#xff0c;我们介绍了 SAP UI5 Smart Table 控件的用法&#xff1a; SAP UI5 应用开发教程之一百四十七 - SAP UI5 SmartTable 控件的使用介绍 如下图所示&#xff1a; 本步骤我们在 Smart Table 本身的基础上再进一步&#xff0c;学习如何将 Smart Tab…