1. 简介
  Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。
   Disruptor区别于Kafka、RabbitMQ等消息队列,它是一个高性能的线程间异步通信的框架,即在同一个JVM进程中的多线程间消息传递。和ArrayBlockingQueue比较类似,但是它是一个有界无锁的高并发队列,如果项目中使用ArrayBlockingQueue在多线程之间传递消息,可以考虑是用Disruptor来代替。
   Github:https://github.com/LMAX-Exchange/disruptor
   文档:https://lmax-exchange.github.io/disruptor/
   资料及测试结论:https://lmax-exchange.github.io/disruptor/disruptor.html#_overview
2. 官方测试
- 吞吐量性能测试
参考ArrayBlockingQueue(简称ABQ)在不同机器、不同消费模式(下文详细介绍)环境下的性能对比,详情浏览官方测试报告:https://lmax-exchange.github.io/disruptor/disruptor.html#_throughput_performance_testing
| Nehalem 2.8Ghz – Windows 7 SP1 64-bit | Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit | |||
|---|---|---|---|---|
| ABQ | Disruptor | ABQ | Disruptor | |
| Unicast: 1P – 1C | 5,339,256 | 25,998,336 | 4,057,453 | 22,381,378 | 
| Pipeline: 1P – 3C | 2,128,918 | 16,806,157 | 2,006,903 | 15,857,913 | 
| Sequencer: 3P – 1C | 5,539,531 | 13,403,268 | 2,056,118 | 14,540,519 | 
| Multicast: 1P – 3C | 1,077,384 | 9,377,871 | 260,733 | 10,860,121 | 
| Diamond: 1P – 3C | 2,113,941 | 16,143,613 | 2,082,725 | 15,295,197 | 
- 延迟性能测试
参考ArrayBlockingQueue(简称ABQ)在相同机器、相同同消费模式环境下的对比,详情浏览官方测试报告:https://lmax-exchange.github.io/disruptor/disruptor.html#_latency_performance_testing
| Array Blocking Queue (ns) | Disruptor (ns) | |
|---|---|---|
| Min Latency | 145 | 29 | 
| Mean Latency | 32,757 | 52 | 
| 99% observations less than | 2,097,152 | 128 | 
| 99.99% observations less than | 4,194,304 | 8,192 | 
| Max Latency | 5,069,086 | 175,567 | 
3. 相关博客
【面试专栏】Java 阻塞队列【面试专栏】Java常用并发工具类【面试专栏】JAVA锁机制【面试专栏】JAVA CAS(Conmpare And Swap)原理【面试专栏】Java并发编程:volatile关键字
4. Java阻塞队列存在的弊端
常用的Java阻塞队列:
| 名称 | 是否有界 | 是否加锁 | 数据结构 | 队列类型 | 
|---|---|---|---|---|
| ArrayBlockingQueue | 有界 | 加锁 | 数组 | 阻塞 | 
| LinkedBlockingQueue | 可选有界 | 加锁 | 链表 | 阻塞 | 
| PriorityBlockingQueue | 无界 | 加锁 | 数组 | 阻塞 | 
| DelayQueue | 无界 | 加锁 | 数组 | 阻塞 | 
| LinkedTransferQueue | 无界 | 无锁 | 链表 | 阻塞 | 
| LinkedBlockingDeque | 可选有界 | 有锁 | 链表 | 阻塞 | 
在使用中,为了防止生产过快而消费不及时导致的内存溢出,或垃圾回收频繁导致的性能问题,一般会使用有界且数据结构为数组的阻塞队列,即:ArrayBlockingQueue。但是,ArrayBlockingQueue使用加锁的方式保证线程安全,在低延迟的场景中表现悲观,且存在伪共享的问题,因此结果不尽人意。
5. 伪共享
5.1 CPU内部存储结构
  现在的CPU都是多个CPU核心,如下图。为了提高访问效率,都有缓存行。每个核中都有L1 Cache和L2 Cache,L3 Cache则在多核之间共享。CPU在执行运算时,首先从L1 Cache查找数据,找不到则以一次从L2 Cache、L3 Cache查找,如果还是没有,则会去内存中查找,路径越长,耗时越长,性能越低。当数据被修改后,通过主线通知其他CPU将读取的数据标记为失效状态,下次访问时从内存重新读取数据到Cache。
   对于计算机的存储设备而言,除了CPU之外,外部还有内存和磁盘。如下图,存储容量越来越大,成本越来越大,但访问速度却越来越慢。
5.2 缓存行
  CPU从内存中加载数据时,并不是一个字节一个字节的加载,而是一块一块的的加载数据,这样的一块称为:缓存行(Cache Line),即缓存行是CPU读取数据的最小单位。
   CPU的缓存行一般为32 ~ 128字节,常见的CPU缓存行为64字节。
# 查询CPU的Cache Line大小
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size  对于数组而言,CPU每次会加载数据中多个数据到Cache中。所以,如果按照物理内存地址分布的顺序去访问数据,Cache命中率最高,从而减少从内存加载数据的次数,提高性能。
   但对于单个变量而言,会存在Cache伪共享的问题。假设定义衣蛾Long类型的变量A,占用8个字节,则CPU每次从内存中读取数剧是,会连同连续地址内的其余7个数据一并加载到Cache中。
5.3 伪共享
  多个CPU缓存遵循MESI协议。
   假设,定义两个变量A、B,线程1绑定Core1,读取变量A,线程2绑定Core2,读取变量B。
   Core1和Core2分别读取变量A、B到Cache中,但变量A、B在同一Cache Line,因此,Core1和Core2都会把数据A、B加载到Cache中。
   线程1通过Core1修改变量A。首先通过主线发送消息给Core2将存放变量A的Cache Line标记为失效状态,然后将Core1中的Cache Line的状态标为已修改,并修改数据。
   线程2修改变量B时,发现Cache Line的状态为失效,并且Core1中的Cache Line为已修改状态,则先把Core1中的变量A、B写回内存,然后从内存中重新读取变量A、B。然后通过主线发送消息给Core1将存放变量B的Cache Line标记为失效状态,然后将Core2中的Cache Line的状态标为已修改,并修改数据。
   如果线程1、2频繁交替修改变量A、B,则会重复以上步骤,导致Cache没有意义。虽然变量A、B之间没有任何关系,但属于同一Cache Line。
   这种多个线程同时读写同一个Cache Line的不同变量而导致CPU Cache失效的现象称为伪共享(False Sharing)。
5.4 ArrayBlockingQueue中的伪共享
  查看ArrayBlockingQueue源码,有三个核心变量:
# 出队下标
/** items index for next take, poll, peek or remove */
int takeIndex;
# 入队下标
/** items index for next put, offer, or add */
int putIndex;
# 队列中元素数量
/** Number of elements in the queue */
int count;  这三个变量很容易放到同一Cache Line,这也是影响ArrayBlockingQueue性能的重要原因之一。
6. Disruptor如何避免伪共享
  ArrayBlockingQueue因为加锁和伪共享的问题影响性能,那Disruptor是如何避免这两个问题来提供性能的呢?主要表现在以下几个方面:
- 采用环形数组结构
- 采用CAS无锁方式
- 添加额外的信息避免伪共享
6.1 环形数组结构
  环形数组(RingBuffer)结构是Disruptor的核心。官网对环形数据结构的剖析:Dissecting the Disruptor: What’s so special about a ring buffer?
 优势:
- 数组结构:当CPU Cache加载数据时,相邻的数据也会被加载到Cache中,避免CPU频繁从内存中获取数据。
- 避免GC:实质还是一个普通的数组,当数据填满队列时(2^n - 1)时,再次添加数据回覆盖之前的数据。
- 位运算:数组大小必须为2的n次方,通过位运算提高效率。
6.2 CAS无锁方式
        Disruptor与传统的队列不同,分为队首指针和队尾指针,而是只有一个游标器Sequencer,它可以保证生产的消息不回覆盖还未消费的消息。Sequencer分为两个实现类:SingleProducerSequencer和MultiProducerSequencer,即单生产者和多生产者。当单个生产者时,生产者每次从RingBuffer中获取下一个可以生产的位置,然后存放数据;消费者先获取最大的可消费的位置,再读取数据进行消费。当多个生产者时,每个生产者下通过CAS竞争获取可以生产的位置,然后存放数据;每个消费者都需要先获取最大可消费的下标,然后读取数据进行消费。
6.3 添加额外信息
        RingBuffer的下标是一个volatile变量,即不用加锁就能保证多线程安全,同时每个long类型的下标还会附带7个long的额外变量,避免伪共享的问题。
class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
    protected volatile long value;
}
class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}7. 示例代码
- 创建项目 
- 修改pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.c3stones</groupId>
    <artifactId>springboot-disruptor-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>- 创建消息模型类
 用于在Disruptor之间传递消息。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * 消息
 *
 * @author CL
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
    private Object data;
    @Override
    public String toString() {
        return data.toString();
    }
}- 创建生产者
import com.c3stones.model.Message;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.RequiredArgsConstructor;
/**
 * 生产者
 *
 * @author CL
 */
@RequiredArgsConstructor
public class Producer {
    private final Disruptor disruptor;
    /**
     * 发送数据
     *
     * @param data 数据
     */
    public void send(Object data) {
        RingBuffer<Message> ringBuffer = disruptor.getRingBuffer();
        // 获取可以生成的位置
        long next = ringBuffer.next();
        try {
            Message msg = ringBuffer.get(next);
            msg.setData(data);
        } finally {
            ringBuffer.publish(next);
        }
    }
}- 创建分组模式消费者
import com.c3stones.model.Message;
import com.lmax.disruptor.WorkHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
 * 分组消费
 *
 * @author CL
 */
@Slf4j
@RequiredArgsConstructor
public class GroupConsumer implements WorkHandler<Message> {
    /**
     * 消费着编号
     */
    private final Integer number;
    /**
     * 分组消费:每个生产者生产的数据只能被一个消费者消费
     *
     * @param message 消息
     */
    @Override
    public void onEvent(Message message) {
        log.info("Group Consumer number: {}, message: {}", number, message);
    }
}- 创建重复模式消费者
import com.c3stones.model.Message;
import com.lmax.disruptor.EventHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
 * 重复消费
 *
 * @author CL
 */
@Slf4j
@RequiredArgsConstructor
public class RepeatConsumer implements EventHandler<Message> {
    /**
     * 消费着编号
     */
    private final Integer number;
    /**
     * 重复消费:每个消费者重复消费生产者生产的数据
     *
     * @param message    消息
     * @param sequence   当前序列号
     * @param endOfBatch 批次结束标识(常用于将多个消费着的数据依次组合到最后一个消费者统一处理)
     */
    @Override
    public void onEvent(Message message, long sequence, boolean endOfBatch) {
        log.info("Repeat Consumer number: {}, message: {}, curr sequence: {}, is end: {}",
                number, message, sequence, endOfBatch);
    }
}- 创建通用模式消费者
 一般采用这种更加通用的模式。
import com.c3stones.model.Message;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
 * 消费者
 *
 * @author CL
 */
@Slf4j
@RequiredArgsConstructor
public class Consumer implements WorkHandler<Message>, EventHandler<Message> {
    private final String desc;
    /**
     * 重复消费:每个消费者重复消费生产者生产的数据
     *
     * @param message    消息
     * @param sequence   当前序列号
     * @param endOfBatch 批次结束标识(常用于将多个消费着的数据依次组合到最后一个消费者统一处理)
     */
    @Override
    public void onEvent(Message message, long sequence, boolean endOfBatch) {
        this.onEvent(message);
    }
    /**
     * 分组消费:每个生产者生产的数据只能被一个消费者消费
     *
     * @param message 消息
     */
    @Override
    public void onEvent(Message message) {
        log.info("Group Consumer describe: {}, message: {}", desc, message);
    }
}8. 单元测试
- 测试分组消费:每个生产者生产的数据只能被一个消费者消费
import com.c3stones.consumer.GroupConsumer;
import com.c3stones.model.Message;
import com.c3stones.producer.Producer;
import com.lmax.disruptor.dsl.Disruptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.Executors;
/**
 * 分组消费 单元测试
 *
 * @author CL
 */
public class GroupConsumerTest {
    private Disruptor<Message> disruptor;
    @Before
    public void init() {
        GroupConsumer a = new GroupConsumer(1);
        GroupConsumer b = new GroupConsumer(2);
        disruptor = new Disruptor(Message::new, 1024, Executors.defaultThreadFactory());
        disruptor.handleEventsWithWorkerPool(a, b);
        disruptor.start();
    }
    @After
    public void close() {
        disruptor.shutdown();
    }
    @Test
    public void test() {
        Producer producer = new Producer(disruptor);
        Arrays.asList("aaa", "bbb").forEach(data -> producer.send(data));
    }
}单元测试结果:
[pool-1-thread-1] INFO com.c3stones.consumer.GroupConsumer - Group Consumer number: 1, message: bbb
[pool-1-thread-2] INFO com.c3stones.consumer.GroupConsumer - Group Consumer number: 2, message: aaa- 测试重复消费:每个消费者重复消费生产者生产的数据
import com.c3stones.consumer.RepeatConsumer;
import com.c3stones.model.Message;
import com.c3stones.producer.Producer;
import com.lmax.disruptor.dsl.Disruptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.Executors;
/**
 * 重复消费 单元测试
 *
 * @author CL
 */
public class RepeatConsumerTest {
    private Disruptor<Message> disruptor;
    @Before
    public void init() {
        RepeatConsumer a = new RepeatConsumer(1);
        RepeatConsumer b = new RepeatConsumer(2);
        disruptor = new Disruptor(Message::new, 1024, Executors.defaultThreadFactory());
        disruptor.handleEventsWith(a, b);
        disruptor.start();
    }
    @After
    public void close() {
        disruptor.shutdown();
    }
    @Test
    public void test() {
        Producer producer = new Producer(disruptor);
        Arrays.asList("aaa", "bbb").forEach(data -> producer.send(data));
    }
}单元测试结果:
[pool-1-thread-1] INFO com.c3stones.consumer.RepeatConsumer - Repeat Consumer number: 1, message: aaa, curr sequence: 0, is end: false
[pool-1-thread-2] INFO com.c3stones.consumer.RepeatConsumer - Repeat Consumer number: 2, message: aaa, curr sequence: 0, is end: false
[pool-1-thread-1] INFO com.c3stones.consumer.RepeatConsumer - Repeat Consumer number: 1, message: bbb, curr sequence: 1, is end: true
[pool-1-thread-2] INFO com.c3stones.consumer.RepeatConsumer - Repeat Consumer number: 2, message: bbb, curr sequence: 1, is end: true- 测试链路模式
/**
 * 测试链路模式
 * <p>
 * p => a -> b -> c
 * </p>
 */
@Test
public void testChain() throws InterruptedException {
    Consumer a = new Consumer("a");
    Consumer b = new Consumer("b");
    Consumer c = new Consumer("c");
    Disruptor<Message> disruptor = new Disruptor(Message::new, 1024, Executors.defaultThreadFactory());
    disruptor.handleEventsWith(a).then(b).then(c);
    disruptor.start();
    Producer producer = new Producer(disruptor);
    producer.send("Chain");
    Thread.sleep(1000);
    disruptor.shutdown();
}单元测试结果:
[pool-1-thread-1] INFO com.c3stones.consumer.Consumer - Group Consumer describe: a, message: Chain
[pool-1-thread-2] INFO com.c3stones.consumer.Consumer - Group Consumer describe: b, message: Chain
[pool-1-thread-3] INFO com.c3stones.consumer.Consumer - Group Consumer describe: c, message: Chain- 测试钻石模式
/**
     * 测试钻石模式
     * <p>
     *           a
     *        ↗     ↘
     *  p =>            c
     *        ↘     ↗
     *           b
     * </p>
     */
    @Test
    public void testDiamond() throws InterruptedException {
        Consumer a = new Consumer("a");
        Consumer b = new Consumer("b");
        Consumer c = new Consumer("c");
        Disruptor<Message> disruptor = new Disruptor(Message::new, 1024, Executors.defaultThreadFactory());
        disruptor.handleEventsWithWorkerPool(a, b).then(c);
        disruptor.start();
        Producer producer = new Producer(disruptor);
        producer.send("Diamond1");
        producer.send("Diamond2");
        Thread.sleep(1000);
        disruptor.shutdown();
    }单元测试结果:
[pool-1-thread-1] INFO com.c3stones.consumer.Consumer - Group Consumer describe: a, message: Diamond2
[pool-1-thread-2] INFO com.c3stones.consumer.Consumer - Group Consumer describe: b, message: Diamond1
[pool-1-thread-3] INFO com.c3stones.consumer.Consumer - Group Consumer describe: c, message: Diamond1
[pool-1-thread-3] INFO com.c3stones.consumer.Consumer - Group Consumer describe: c, message: Diamond29. 项目地址
spring-boot-disuptor-demo












![[Linux]进程替换](https://img-blog.csdnimg.cn/ea21bba3198e46aebbbe55ee313f6fca.png)





