模块封装
shenyu-disruptor定义了DisruptorProvider、DisruptorProviderManage、DataEvent、QueueConsumerFactory、DisrutporThreadFactory等一系列通用接口
 该模块的搭建了一个disruptor的初始化框架,
 DisruptorProviderManage提供Disruptor的初始化,可以在初始化是自定义参数,而初始化参数中,包含消费者工厂,初始化会将消费者工厂放置到QueueConsumer的成员变量当中,有QueueConsumer进行消息的侦听,一旦有消息,则由消费者工厂QueueConsumerFactory创建QueueConsumerExecutor进行消息的处理,QueueConsumerExecutor可以拿到消息,是具体的操作。而在DisruptorProviderManage对象中,成员变量provide是此次初始化的disruptor的生产者,由此provider进行消息的发布。
 所以,这个模块是对disruptor的通用封装,可以使用任何类型的数据,外界使用该模块需要进行的操作是,继承QueueConsumerExecutor其executor方法用来写具体的逻辑操作,实现QueueConsumerFactory接口,用来创建自己的实现的QueueConsumerExecutor,将工厂类用做DisruptorProviderManage的构造参数,获得对象,之后调用DisruptorProviderManage对象的start方法进行disruptor的初始化,disruptor便启动了,启动之后,就可以正常使用disruptor了,之后发布消息,则使用DisruptorProviderManage对象获取provider,进行消息的发布和disruptor的关闭。
项目启动
RegisterClientServerDisruptorPublisher#start,启动DisruptorProviderManage
    public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
        RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();
        factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
        factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
        factory.addSubscribers(new ApiDocExecutorSubscriber(shenyuClientRegisterService));
        providerManage = new DisruptorProviderManage<>(factory);
        providerManage.startup();
    }
 
DisruptorProviderManage#startup(boolean),初始化Disruptor配置。
    public void startup(final boolean isOrderly) {
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        if (isOrderly) {
            newConsumerSize = 1;
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            eventFactory = new DisruptorEventFactory<>();
        }
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
                ProducerType.MULTI,
                new BlockingWaitStrategy());
        @SuppressWarnings("all")
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        disruptor.start();
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }
 
发布事件
ShenyuClientRegisterEventPublisher#publishEvent,发布事件
    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        provider.onData(data);
    }
 
DisruptorProvider#onData,调用ringBuffer处理数据
    public void onData(final T data) {
        if (isOrderly) {
            throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");
        }
        try {
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }
 
QueueConsumer#onEvent,处理数据
    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            ThreadPoolExecutor executor = orderly(t);
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            executor.execute(queueConsumerExecutor);
        }
    }
 
创建QueueConsumerExecutor,获取所有的getSubscribers,进行分组。
        @Override
        public QueueConsumerExecutor<Collection<DataTypeParent>> create() {
            Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> maps = getSubscribers()
                    .stream()
                    .map(e -> (ExecutorTypeSubscriber<DataTypeParent>) e)
                    .collect(Collectors.toMap(ExecutorTypeSubscriber::getType, Function.identity()));
            return new RegisterServerConsumerExecutor(maps);
        }
 
处理事件
RegisterServerConsumerExecutor#run,线程执行,获取对应的ExecutorSubscriber,调用executor
    @Override
    public void run() {
        Collection<DataTypeParent> results = getData()
                .stream()
                .filter(this::isValidData)
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(results)) {
            return;
        }
        selectExecutor(results).executor(results);
    }
    private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
        final Optional<DataTypeParent> first = list.stream().findFirst();
        return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
    }
 
相关博客
-  
【开源项目】Disruptor框架介绍及快速入门
 -  
【源码解析】Disruptor框架的源码解析
 




















