物联网接入层技术剖析(四):当epoll遇见MQTT
Netty与高性能网络服务、Linux高并发网络编程实战、从epoll到Netty物联网接入层技术剖析、深入理解I/O多路复用、服务端网络编程进阶指南Netty与物联网当epoll遇见MQTT0 写在前面这个系列写了三篇从 select 到 epoll从内核源码到 Java NIO一路挖下来。但说实话在实际的物联网平台开发中我们很少直接和这些底层 API 打交道。站在它们之上的是 Netty——一个把网络编程的复杂性封装得恰到好处的框架。这篇文章是系列的最后一篇我想把视角拉回到实战层面。聊聊 Netty 是怎么使用 epoll 的在物联网设备接入场景中如何设计一个基于 Netty MQTT 的接入层以及我在项目中积累的一些性能调优经验。1 Netty对epoll的封装Netty 在 Linux 平台上提供了两种 Selector 实现NioEventLoopGroup基于 JDK 的 Selector和EpollEventLoopGroup基于 Netty 自己的 native epoll 封装。为什么要自己封装一套不用 JDK 的呢原因是 JDK 的 Selector 实现虽然底层也是 epoll但中间隔了太多层抽象有些 epoll 的高级特性用不上。Netty 的EpollEventLoopGroup直接通过 JNI 调用 epoll 的系统调用少了中间环节性能更好而且能用上 epoll 特有的功能。比如 ET边沿触发模式。JDK 的 Selector 只支持 LT 模式而 Netty 的 EpollEventLoop 可以配置为 ET 模式EventLoopGroupgroupnewEpollEventLoopGroup(EpollEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS,newDefaultSelectStrategyFactory(),EpollEventLoopGroup.DEFAULT_MAX_PENDING_TASKS,RejectedExecutionHandlers.reject());ServerBootstrapbnewServerBootstrap();b.group(group).channel(EpollServerSocketChannel.class).childHandler(newChannelInitializerSocketChannel(){OverrideprotectedvoidinitChannel(SocketChannelch){// ...}});在物联网平台中如果设备并发量很大使用EpollEventLoopGroupEpollServerSocketChannel可以获得比NioEventLoopGroup更好的性能。实测下来在万级并发连接的场景下吞吐量能提升 10%-20% 左右。不过要注意EpollEventLoopGroup 只能在 Linux 上用。如果你的开发环境是 macOS 或 Windows需要做平台判断开发时用 NioEventLoopGroup生产环境用 EpollEventLoopGroup。Netty 提供了Epoll.isAvailable()方法来做这个判断。2 物联网设备接入层的架构一个典型的物联网平台设备接入层大概长这样设备 → TCP/MQTT → Netty Server → 协议解码 → 业务处理 → 消息队列Netty 在这个架构中承担的是网络通信 协议解析的部分。它负责接收设备的连接、管理连接的生命周期、解码设备上报的协议数据然后把解析后的消息交给上层业务处理。在 Netty 中这个流程是通过 ChannelPipeline 来组织的。Pipeline 是一个处理链每个环节是一个 ChannelHandler。数据从设备端进来依次经过每个 Handler 的处理最终变成业务可以消费的消息对象。3 一个MQTT接入的Pipeline示例以 MQTT 协议为例一个典型的 ChannelPipeline 大概是这样的protectedvoidinitChannel(SocketChannelch){ChannelPipelinepipelinech.pipeline();// 空闲检测60秒没有数据就断开pipeline.addLast(newIdleStateHandler(60,0,0,TimeUnit.SECONDS));// MQTT 编解码器pipeline.addLast(mqtt-decoder,newMqttDecoder());pipeline.addLast(mqtt-encoder,MqttEncoder.INSTANCE);// MQTT 消息处理pipeline.addLast(mqtt-handler,newMqttMessageHandler());// 异常处理pipeline.addLast(exception-handler,newExceptionHandler());}每个 Handler 的职责很清晰IdleStateHandlerNetty 内置的空闲检测 Handler。物联网设备经常会出现假在线的情况——TCP 连接还在但设备已经离线了比如断电、断网但没有发送 DISCONNECT 消息。通过心跳超时检测可以及时清理这些僵尸连接释放资源。MqttDecoder / MqttEncoderMQTT 协议的编解码器。MQTT 是一个二进制协议固定头、可变头、Payload 的格式都有严格定义。Decoder 负责把字节流解析成 MQTT 消息对象Encoder 负责把消息对象编码成字节流发出去。MqttMessageHandler业务 Handler处理解码后的 MQTT 消息。比如处理 CONNECT设备认证、PUBLISH数据上报、SUBSCRIBE订阅主题等。ExceptionHandler统一处理 Pipeline 中的异常避免异常导致连接非正常关闭。4 连接管理物联网平台的命脉在物联网平台中连接管理是最核心也最容易出问题的地方。设备标识与 Session 绑定。每个设备连接上来之后你需要把设备的唯一标识比如设备 ID、Client ID和 Netty 的 Channel 关联起来。通常的做法是用一个 ConcurrentHashMap 来维护这个映射关系ConcurrentHashMapString,ChanneldeviceChannelsnewConcurrentHashMap();// 设备连接成功时deviceChannels.put(deviceId,channel);// 需要给设备下发消息时ChannelchanneldeviceChannels.get(deviceId);if(channel!nullchannel.isActive()){channel.writeAndFlush(message);}连接断开的清理。设备断开连接时不管是正常断开还是异常断开必须及时从映射表中移除否则会内存泄漏。Netty 提供了channelInactive()回调在这里做清理工作OverridepublicvoidchannelInactive(ChannelHandlerContextctx){StringdeviceIdgetDeviceId(ctx.channel());if(deviceId!null){deviceChannels.remove(deviceId);// 通知业务层设备离线deviceOffline(deviceId);}}优雅停机。服务重启时不能直接 kill 进程需要先通知所有设备我要下线了给它们时间重新连接到其他节点。Netty 的EventLoopGroup.shutdownGracefully()可以做到这一点。5 性能调优的几点经验在物联网平台中把 Netty 调到比较理想的性能我总结了几条经验。合理设置 EventLoop 线程数。默认值是 CPU 核心数 * 2。对于物联网场景如果设备数据量不大但连接数很多可以适当减少线程数比如 CPU 核心数因为大部分线程其实都在epoll_wait上睡觉多了反而浪费。如果数据量也很大保持默认值或适当增加。调整 SO_BACKLOG。这是 ServerSocket 的连接队列大小。当瞬时有大量设备同时连接时比如断电恢复后设备集中上线默认的 backlog 可能不够导致连接被拒绝。建议设大一些比如 1024 或 2048。b.option(ChannelOption.SO_BACKLOG,2048);开启 TCP_NODELAY。MQTT 的报文通常很小几十到几百字节。默认情况下Nagle 算法会把小包攒成大包再发这会增加延迟。对于物联网场景实时性往往比吞吐量更重要建议关闭 Nagleb.childOption(ChannelOption.TCP_NODELAY,true);ByteBuffer 池化。Netty 默认使用池化的 ByteBufPooledByteBufAllocator这比每次都创建新的 ByteBuffer 效率高很多。在高并发场景下这个优化能显著减少 GC 压力。确保你没有不小心切换成了非池化的 Allocator。水位线设置。Netty 的 Channel 有 write buffer 的水位线概念。当 write buffer 的字节数超过高水位线时channel.isWritable()会返回 false此时应该暂停写入等 buffer 消耗到低水位线以下再恢复。这个机制可以防止内存被 write buffer 撑爆。b.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK,64*1024);b.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK,32*1024);监控 epoll_wait 的耗时。如果发现 CPU 使用率异常高可以用jstack看一下 EventLoop 线程在干什么。正常情况下EventLoop 线程大部分时间应该阻塞在epoll_wait上。如果发现它在频繁地处理某个 Channel可能是那个 Channel 有大量的数据要读写拖慢了其他 Channel 的处理。6 从epoll到业务完整的数据链路最后让我们把整个系列的知识串成一条完整的数据链路看看一个设备上报的数据是如何从网卡到达业务层的1. 设备通过 TCP 发送 MQTT PUBLISH 报文 2. 网卡收到数据触发硬件中断 3. 内核中断处理程序把数据放入 socket 接收缓冲区 4. socket 的等待队列被唤醒触发 ep_poll_callback 5. ep_poll_callback 把对应的 epitem 挂入 epoll 的就绪链表 rdllist 6. 唤醒在 epoll_wait 上阻塞的 JVM 线程 7. Netty 的 EventLoop 线程从 Selector 中拿到就绪的 Channel 8. 数据经过 PipelineIdleStateHandler → MqttDecoder → MqttMessageHandler 9. MqttDecoder 把字节流解析成 MqttPublishMessage 对象 10. MqttMessageHandler 提取 payload交给业务层处理这十个步骤跨越了硬件中断、内核态、用户态、JVM、Netty 框架、业务逻辑六个层次。理解了这条链路你就理解了物联网平台网络通信的全貌。当然实际生产环境中还有更多的细节需要处理SSL/TLS 加密、设备认证、消息 QoS 保证、集群水平扩展等等。但万变不离其宗底层的网络通信模型就是我们在前面三篇文章中讨论的那些东西。7 写在最后这个系列从 select 的局限性讲起经过 epoll 的原理剖析到 Java NIO Selector 的使用最后落脚在 Netty 和物联网实战。整个过程其实就是在回答一个问题当大量设备同时连接到你的平台时系统是如何高效地处理这些连接的答案的核心就是 epoll 的事件驱动模型。它让一个线程可以轻松管理成千上万个连接让系统资源的消耗与活跃连接数成正比而不是与总连接数成正比。这个特性对于物联网平台来说几乎是不可或缺的。希望这个系列对你有所帮助。如果有什么问题或者想讨论的欢迎留言交流。8 参考资料Java NIO Selector - BaeldungLinux下的I/O复用与epoll详解epoll - 维基百科一文读懂Linux epoll实现原理深入理解 Linux 的 epoll 机制
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2637195.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!