通过nioSocketChannel.pipeline()的addLast添加入站处理器,如果有多个必须显示的唤醒下一个入站处理器,否则执行链中间会断掉。
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
log.debug(nioSocketChannel.toString());
//添加各种处理器
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new StringDecoder());//字符串解析器
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg 1 : {}",msg);
//这里未调用唤醒下一个处理器
}
});
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg 1 : {}",msg);
super.channelRead(ctx, msg);
}
});
}
执行结果:

解决方法:

super.channelRead(ctx, msg);源码:
该函数调用的是:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
进一步跟踪调用:
public ChannelHandlerContext fireChannelRead(final Object msg) {
//找到下一个handler并执行
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
//新启动一个线程,执行新的处理器
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
附加:入站处理器调用出站后,分别有:
ChannelHandlerContext.writeAndFlush:从当前处理器往前找
nioSocketChannel.writeAndFlush:从最后一个处理依次往前找。

上图执行顺序:
入站1-》入站2-》入站2调用出站-》当前往前找-》出站1
NioSocketChannel 会从后往前找出站处理器,
ChannelHandlerContext:会从当前处理器往前找出战处理器。

nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg 1 : " + msg);
//唤醒下一个handler,同时可以将消息进一步处理
super.channelRead(ctx, msg + "我已经被处理过");
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("调用出战处理器 1");
System.out.println(msg.toString());
super.write(ctx, msg, promise);
}
});
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//得到消息,是第一个handler处理过后的内容
System.out.println("msg 2 : " + msg);
//ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("出出出出出".getBytes()));
//nioSocketChannel 会从最后一个依次向前找出站处理器
nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("出出出出出".getBytes()));
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("调用出战处理器 2");
System.out.println(msg.toString());
super.write(ctx, msg, promise);
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("调用出战处理器 3");
System.out.println(msg.toString());
super.write(ctx, msg, promise);
}
});
}
nioSocketChannel 依次从最后往前调用出站处理器,执行顺序为: 3 2 1





![[Java·算法·中等]LeetCode34. 在排序数组中查找元素的第一个和最后一个位置](https://img-blog.csdnimg.cn/9aab9b0988694ad68f5fde01f819e3ca.png)














