SpringBoot 集成 Netty
文章目录
SpringBoot 集成 Netty 背景描述 Netty与SpringBoot整合关注点 Netty组件 Bootstrap、ServerBootstrap Channel EventLoop、EventLoopGroup ChannelHandler ChannelPipeline ByteBuf
Pom依赖 Yml 配置 整合Netty步骤
背景描述
如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。
Netty与SpringBoot整合关注点
Netty跟Springboot生命周期保持一致,同生共死Netty能用上ioc中的BeanNetty能读取到全局的配置
Netty组件
Bootstrap、ServerBootstrap
帮助 Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序 Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。ServerBootstrap 往往是用于启动一个 Netty 服务端。
Channel
Channel 是 Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 Channel 的 EventLoop 。其实就是我们平常网络编程中经常使用的socket套接字对象
EventLoop、EventLoopGroup
EventLoop定义了Netty的核心对象,用于处理IO事件,多线程模型、并发一个EventLoopGroup包含一个或者多个EventLoop 一个EventLoop在它的生命周期内只和一个Thread绑定 所有有EventLoop处理的I/O事件都将在它专有的Thread上被处理 一个Channel在它的生命周期内只注册于一个EventLoop 一个EventLoop可能会被分配给一个或者多个Channel
ChannelHandler
ChannelHandler其实就是用于负责处理接收和发送数据的的业务逻辑,Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
ChannelInboundHandler——处理入站数据ChannelOutboundHandler——处理出站数据
ChannelPipeline
ChannelPipeline是ChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline。
ByteBuf
ByteBuf就是字节缓冲区,用于高效处理输入输出。
Pom依赖
引入springboot starter web 和 netty
< ! -- SpringBoot 初始化依赖 -- >
< dependency>
< groupId> org. springframework. boot< / groupId>
< artifactId> spring- boot- starter- web< / artifactId>
< version> 2.3 .5 . RELEASE < / version>
< / dependency>
< ! -- https: / / mvnrepository. com/ artifact/ io. netty/ netty- all -- >
< dependency>
< groupId> io. netty< / groupId>
< artifactId> netty- all< / artifactId>
< version> 4.1 .85 . Final< / version>
< / dependency>
Yml 配置
server :
port : 2345
netty :
websocket :
port : 1024
ip : 0.0.0.0
max-frame-size : 10240
path : /channel
整合Netty步骤
服务端
使用 SpringBoot Runner 机制启动 Netty 服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {
@Resource
private SocketServer socketServer;
@Override
public void run ( ApplicationArguments args) {
this . socketServer. start ( ) ;
}
}
@Component
public class SocketServer {
private static final Logger logger = LoggerFactory . getLogger ( SocketServer . class ) ;
private ServerBootstrap serverBootstrap;
@Autowired
private SocketInitializer socketInitializer;
@Value ( "${netty.websocket.port}" )
private int port;
public void start ( ) {
this . init ( ) ;
this . serverBootstrap. bind ( this . port) ;
logger. info ( "Netty started on port: {} (TCP) with boss thread {}" , this . port, 2 ) ;
}
private void init ( ) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup ( 2 ) ;
NioEventLoopGroup workerGroup = new NioEventLoopGroup ( ) ;
this . serverBootstrap = new ServerBootstrap ( ) ;
this . serverBootstrap. group ( bossGroup, workerGroup) . channel ( NioServerSocketChannel . class ) . childHandler ( this . socketInitializer) ;
}
}
public class SocketHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory . getLogger ( SocketHandler . class ) ;
public static final ChannelGroup clients = new DefaultChannelGroup ( GlobalEventExecutor . INSTANCE ) ;
@Override
public void channelRead ( ChannelHandlerContext ctx, Object msg) {
byte [ ] data = ( byte [ ] ) msg;
log. info ( "收到消息: " + new String ( data) ) ;
for ( Channel client : clients) {
if ( ! client. equals ( ctx. channel ( ) ) ) {
client. writeAndFlush ( data) ;
}
}
}
@Override
public void handlerAdded ( ChannelHandlerContext ctx) {
log. info ( "新的客户端链接:" + ctx. channel ( ) . id ( ) . asShortText ( ) ) ;
clients. add ( ctx. channel ( ) ) ;
}
@Override
public void handlerRemoved ( ChannelHandlerContext ctx) {
clients. remove ( ctx. channel ( ) ) ;
}
@Override
public void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause) {
cause. printStackTrace ( ) ;
ctx. channel ( ) . close ( ) ;
clients. remove ( ctx. channel ( ) ) ;
}
}
@Component
public class SocketInitializer extends ChannelInitializer < SocketChannel > {
@Override
protected void initChannel ( SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel. pipeline ( ) ;
pipeline. addLast ( new ByteArrayDecoder ( ) ) ;
pipeline. addLast ( new ByteArrayEncoder ( ) ) ;
pipeline. addLast ( new SocketHandler ( ) ) ;
}
}
客户端
public class ChatClient {
public void start ( String name) throws IOException {
SocketChannel socketChannel = SocketChannel . open ( new InetSocketAddress ( "127.0.0.1" , 1024 ) ) ;
socketChannel. configureBlocking ( false ) ;
Selector selector = Selector . open ( ) ;
socketChannel. register ( selector, SelectionKey . OP_READ ) ;
new Thread ( new ClientThread ( selector) ) . start ( ) ;
Scanner scanner = new Scanner ( System . in) ;
while ( scanner. hasNextLine ( ) ) {
String message = scanner. next ( ) ;
if ( StringUtils . hasText ( message) ) {
socketChannel. write ( StandardCharsets . UTF_8 . encode ( name + ": " + message) ) ;
}
}
}
private class ClientThread implements Runnable {
private final Logger logger = LoggerFactory . getLogger ( ClientThread . class ) ;
private final Selector selector;
public ClientThread ( Selector selector) {
this . selector = selector;
}
@Override
public void run ( ) {
try {
while ( true ) {
int channels = selector. select ( ) ;
if ( channels == 0 ) {
continue ;
}
Set < SelectionKey > selectionKeySet = selector. selectedKeys ( ) ;
Iterator < SelectionKey > keyIterator = selectionKeySet. iterator ( ) ;
while ( keyIterator. hasNext ( ) ) {
SelectionKey selectionKey = keyIterator. next ( ) ;
keyIterator. remove ( ) ;
if ( selectionKey. isReadable ( ) ) {
handleRead ( selector, selectionKey) ;
}
}
}
} catch ( IOException e) {
logger. error ( e. getMessage ( ) , e) ;
}
}
}
private void handleRead ( Selector selector, SelectionKey selectionKey) throws IOException {
SocketChannel channel = ( SocketChannel ) selectionKey. channel ( ) ;
ByteBuffer byteBuffer = ByteBuffer . allocate ( 1024 ) ;
StringBuilder message = new StringBuilder ( ) ;
if ( channel. read ( byteBuffer) > 0 ) {
byteBuffer. flip ( ) ;
message. append ( StandardCharsets . UTF_8 . decode ( byteBuffer) ) ;
}
channel. register ( selector, SelectionKey . OP_READ ) ;
System . out. println ( message) ;
}
}
public static void main ( String [ ] args) throws IOException {
new ChatClient ( ) . start ( "张三" ) ;
}