Netty抽象出流水线(pipeline)这一层数据结构进行处理或拦截channel相关事件。
事件分为入站事件(inBound event)和出站事件(outBound event)的ChannelHandlers列表。ChannelPipeline使用先进的Intercepting Filter模式,使用户可以完全控制如何处理事件以及管道中的ChannelHandlers如何相互交互。类似与servlet的filter,亦或是Linux的管道命令,使用责任链模式。将不同的handlers组装成一个链表,进行依次调用。开发人员可以很方便的往链表添加或删除handler进行自己的业务逻辑操作。
 
channel事件分为入站和出栈事件两种类型,同样的handler分为ChannelInboundHandler和ChannelOutboundHandler两种类型处理对应事件。入站事件对应read事件,出栈事件对应write事件,读和写公用同一个pipeline,两个事件类型互不干涉。read事件从head开始往后找相关的inBound handlers依次进行处理,write事件从tail往前找相关的outBound handlers依次进行处理。
pipeline的创建
当channel创建时,就会创建一个pipeline与之进行绑定。在channel整个生命周期都会使用该pipeline进行事件处理,默认使用DefaultChannelPipeline类进行创建pipeline。
来看下DefaultChannelPipeline的构造方法:
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}
 
创建时候会初始化head和tail。我们在往pipeline添加的时候都是添加的handler,这里会包装成AbstractChannelHandlerContext类型添加到链表里。
添加handler
添加handler可以通过pipeline.addLast方法进行添加
 ChannelPipeline p = ...;
 p.addLast("1", new InboundHandlerA());
 p.addLast("2", new InboundHandlerB());
 p.addLast("3", new OutboundHandlerA());
 p.addLast("4", new OutboundHandlerB());
 p.addLast("5", new InboundOutboundHandlerX());
 
addLast的源码如下:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
    //判断handler能否被添加多次
        checkMultiplicity(handler);
        //创建一个DefaultChannelHandlerContext实例,将handler包装起来
        newCtx = newContext(group, filterName(name, handler), handler);
        //添加到链表
        addLast0(newCtx);
        //判断channel是否绑定了eventLoop,没绑定调用callHandlerCallbackLater后面文章会讲到这里
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
      
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    //回调当前handler的handlerAdded方法
    callHandlerAdded0(newCtx);
    return this;
}
//这里就是将handler添加到tail的前面
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
 
从上面源码可以看出,添加的handler首先会被包装成一个HandlerContext,然后在将其放到链表tail的前面。添加handler其它的方法还有addFirst,addBefore,addAfter原理类似。可以根据业务逻辑进行handler顺序编排。
常见事件
入站读事件:ChannelInboundHandler
| 回调方法 | 事件说明 | 
|---|---|
| channelRegistered | channel绑定EventLoop | 
| channelUnregistered | channel取消EventLoop绑定 | 
| channelActive | channel启动准备完成 | 
| channelInactive | channel处于非活动状态,准备关闭 | 
| channelRead | 从对端读取到数据 | 
| channelReadComplete | 处理完所有读取到的数据 | 
出站写事件:ChannelOutboundHandler
| 回调方法 | 事件说明 | 
|---|---|
| bind | bind操作完成前回调,serverchannel事件 | 
| connect | connect操作完成前回调,clientchannel事件 | 
| disconnect | disconnect操作完成前回调,client端事件 | 
| close | close操作前回调,server端事件 | 
| read | read操作前回调 | 
| write | write操作前回调 | 
| flush | flush操作前回调 | 
数据读写源码
io.netty.channel.Channel.write方法会调用pipeline.write,其源码如下
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}
 
这里看到写数据从tail开始调用。所以这里如果有多个handler的话,要注意我们加入handler的顺序。
手动添加的handler都被包装成DefaultChannelHandlerContext,该类只重写了handler()方法获取当前handler,其它方法实现都在其父类AbstractChannelHandlerContext中。方法间传递的ctx变量就是该类。递归调用handler也是主要在该类中方法实现:
ctx.write:
private void write(Object msg, boolean flush, ChannelPromise promise) {
    //找出下一个Outbound类型MASK_WRITE类型的handler
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
       //是否flush,调用下一个handler的write
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}
 
这里以读方法为例看调用过程
读数据从pipeline.fireChannelRead方法开始
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}
 
这里看到ctx从head开始往后找。然后调用ctx.invokeChannelRead
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
 
invokeChannelRead方法其实是调起handler.channelRead方法。
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRead(msg);
    }
}
 
下一个handler一般在处理完后,会调用ctx.fireChannelRead完成下一个handler的调用。这样完成链式调用
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}
 
参考:
 https://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html
 https://www.alibabacloud.com/blog/essential-technologies-for-java-developers-io-and-netty_597367



















