
 
文章目录
- simple
 - Server端
 - NettyServer
 - NettyServerHandler
 
- Client端
 - NettyClient
 - NettyClientHandler
 
- tcp(粘包和拆包)
 - Server端
 - NettyTcpServer
 - NettyTcpServerHandler
 
- Client端
 - NettyTcpClient
 - NettyTcpClientHandler
 
- protocol
 - codec
 - CustomMessageDecoder
 - CustomMessageEncoder
 
- server端
 - ProtocolServer
 - ProtocolServerHandler
 
- client端
 - ProtocolClient
 - ProtocolClientHandler
 
- http
 - Server端
 - HttpServer
 - HttpServerHandler
 
- Client端
 - HttpClient
 - HttpClientHandler
 
- ws
 - Server端
 - WsServer
 - WsServerHandler
 
- Client端
 - WsClient
 - WebSocketClientHandler
 
- protobuf
 - Server端
 - NettyServer
 - NettyServerHandler
 - Student.proto
 
- Client端
 - NettyClient
 - NettyClientHandler
 
simple
Server端
NettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class NettyServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));
            Channel channel = bindFuture.sync().channel();
            log.info("server start");
            channel.closeFuture().sync();
            log.info("server stop");
        } catch (Exception e) {
            log.info("server error", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
 
NettyServerHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("服务端接收到客户端数据:{}", msg);
        ctx.writeAndFlush("服务端收到客户端的数据: " + msg);
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info("【NettyServerHandler->userEventTriggered】: {}", evt);
    }
    public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("exceptionCaught异常:", cause);
        ctx.close();
    }
    public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded:{}", ctx.channel().remoteAddress());
    }
    public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved:{}", ctx.channel().remoteAddress());
    }
    public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered:{}", ctx.channel().remoteAddress());
    }
    public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered:{}", ctx.channel().remoteAddress());
    }
    public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
        log.info("客户端连接:{}", ctx.channel().remoteAddress());
    }
    public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
        log.info("客户端断开连接:{}", ctx.channel().remoteAddress());
    }
}
 
Client端
NettyClient
@Slf4j
public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);
            Channel channel = connectFuture.sync().channel();
            System.out.println("客户端连接成功");
            Scanner sc = new Scanner(System.in);
            while (true) {
                System.out.println("请输入内容: ");
                String line = sc.nextLine();
                if (line == null || line.isEmpty()) {
                    continue;
                } else if ("exit".equals(line)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
            channel.closeFuture().sync();
            System.out.println("客户端关闭");
        } catch (Exception e) {
            log.error("客户端发生异常: ", e);
        }
    }
}
 
NettyClientHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info( "【NettyClientHandler->channelRead】: {}", msg);
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info( "异常: {}", cause.getMessage());
        ctx.close();
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->channelActive】: {}", ctx);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->channelInactive】: {}", ctx);
    }
}
 
tcp(粘包和拆包)
Server端
NettyTcpServer
@Slf4j
public class NettyTcpServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyTcpServerHandler());
                        }
                    });
            ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", 9090);
            Channel channel = bindFuture.sync().channel();
            log.info("server start");
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.info("server error", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
 
NettyTcpServerHandler
@Slf4j
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        String content = new String(bytes, StandardCharsets.UTF_8);
        log.info("服务端接收到的数据字节长度为:{}, 内容为: {}", bytes.length, content);
        ByteBuf buf = Unpooled.copiedBuffer(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
        ctx.writeAndFlush(buf);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常: {}", cause.getMessage());
        ctx.close();
    }
}
 
Client端
NettyTcpClient
@Slf4j
public class NettyTcpClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyTcpClientHandler());
                        }
                    });
            ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));
            Channel channel = connectFuture.sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("客户端发生异常: ", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}
 
NettyTcpClientHandler
@Slf4j
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        log.info("客户端接收到数据:{}", byteBuf.toString(StandardCharsets.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        /*
         粘包:
         1. 这里连续发送10次byteBuf,发现服务端有可能1次就全部接收了,
            也有可能3次接受了,也有可能4次接收了,这是不确定的,
            这也就意味着基于底层NIO的tcp的数据传输 是基于流式传输的,会出现粘包的问题。
         2. 因此服务端必须 自行处理粘包问题,区分消息边界
         3. 这里测试的时候,可以多启动几个客户端来观察
         4. 这里示例的粘包示例与上面simple的区别在于:这里是在短时间内连续发送
         */
        /*for (int i = 0; i < 10; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer(("hello, server " + i).getBytes(StandardCharsets.UTF_8));
            ctx.writeAndFlush(byteBuf);
        }*/
        /*
        拆包:
        1. 这里1次发送了1个10000字节长的数据,而服务端分多次收到,有可能是2次,有可能是1次, 这是不确定的,
        2. 假设真实数据包就有这么长,那么服务端可能需要分多次才能接收到完整的数据包,
        3. 同时,我们发现总的数据长度服务端都接收到了,这说明底层NIO的tcp的数据传输 是可靠的
        4. 1条比较长的消息,服务端分多次才能收到,所以服务端需要解决拆包的问题,将多次接收到的消息转为1条完整的消息
        5. 这里示例的拆包示例与上面simple的区别在于:这里1次发送的消息数据很长
        */
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 1000; i++) {
            sb.append("Netty拆包示例|");
        }
        ctx.writeAndFlush(Unpooled.copiedBuffer(sb.toString().getBytes(StandardCharsets.UTF_8)));
        log.info("客户端发送数据长度:{}", sb.toString().length());
		/* 拆包 与 粘包 的核心问题就是 tcp是流式传输的,tcp可以保证数据可靠传输,但需要对方在接收时需要能区分出消息边界,从而获取1条完整的消息 */
    }
}
 
protocol
codec
使用自定义协议,编解码器,识别消息边界,处理粘包和拆包问题
CustomMessageDecoder
public class CustomMessageDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int len = in.readInt();
        if (in.readableBytes() < len) {
            in.resetReaderIndex();
            return;
        }
        byte[] bytes = new byte[len];
        in.readBytes(bytes);
        out.add(CustomMessage.builder()
                .len(len)
                .content(bytes)
                .build());
    }
}
 
CustomMessageEncoder
public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}
 
server端
ProtocolServer
@Slf4j
public class ProtocolServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new CustomMessageDecoder());
                            pipeline.addLast(new CustomMessageEncoder());
                            pipeline.addLast(new ProtocolServerHandler());
                        }
                    });
            ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 9090));
            Channel channel = bindFuture.sync().channel();
            log.info("server start");
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.info("server error", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        log.info("server stop");
    }
}
 
ProtocolServerHandler
@Slf4j
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 这里直接转, 如果不能转的话, 就说明前面的解码有问题
        CustomMessage customMessage = (CustomMessage) msg;
        log.info("服务端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));
        // 将消息回过去(需要加上对应的编码器)
        ctx.writeAndFlush(customMessage);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("ProtocolServerHandler异常: {}", cause.getMessage());
        ctx.close();
    }
}
 
client端
ProtocolClient
@Slf4j
public class ProtocolClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new CustomMessageEncoder());
                            pipeline.addLast(new CustomMessageDecoder());
                            pipeline.addLast(new ProtocolClientHandler());
                        }
                    });
            ChannelFuture connectFuture = bootstrap.connect("localhost", 9090);
            Channel channel = connectFuture.sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.info("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}
 
ProtocolClientHandler
@Slf4j
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 这里直接转, 如果不能转的话, 就说明前面的解码有问题
        CustomMessage customMessage = (CustomMessage) msg;
        log.info("客户端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for (int i = 1; i <= 20; i++) {
            byte[] bytes = ("hello, server " + i).getBytes(StandardCharsets.UTF_8);
            CustomMessage message = CustomMessage.builder()
                    .content(bytes)
                    .len(bytes.length)
                    .build();
            ctx.writeAndFlush(message);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("【ProtocolClientHandler->exceptionCaught】: {}", cause.getMessage());
    }
}
 
http
Server端
HttpServer
@Slf4j
public class HttpServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler("【服务端主】"))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("loggingHandler", new LoggingHandler("【服务端从】"));
                            pipeline.addLast("httpServerCodec", new HttpServerCodec());
                            pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
                            pipeline.addLast("httpServerHandler", new HttpServerHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080));
            channelFuture.sync();
            log.info("http服务器启动成功, 您可以访问: http://localhost:8080/test");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            log.info("服务端发生异常: ", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
 
HttpServerHandler
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
        log.info("【HttpServerHandler->处理】:{}", msg);
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
            String uri = fullHttpRequest.uri();
            log.info("【uri】:{}", uri);
            HttpMethod method = fullHttpRequest.method();
            log.info("【method】:{}", method);
            // 响应回去
            byte[] bytes = ("服务器收到时间" + LocalDateTime.now()).getBytes(StandardCharsets.UTF_8);
            DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(
                    HttpVersion.HTTP_1_1,
                    HttpResponseStatus.OK,
                    Unpooled.copiedBuffer(bytes)
            );
            fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, bytes.length);
            fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
            ChannelPromise promise = ctx.newPromise();
            promise.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    log.info("操作完成");
                    log.info("isDone: {}", future.isDone());
                    log.info("isSuccess: {}", future.isSuccess());
                    log.info("isCancelled: {}", future.isCancelled());
                    log.info("hasException: {}", future.cause() != null, future.cause());
                }
            });
            ctx.writeAndFlush(fullHttpResponse, promise);
            log.info("刚刚写完");
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.error("【HttpServerHandler->userEventTriggered】:{}", evt);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         log.error("【HttpServerHandler->exceptionCaught】", cause);
    }
    public void channelRegistered(ChannelHandlerContext ctx) {
        log.info("【HttpServerHandler->channelRegistered】");
    }
    public void channelUnregistered(ChannelHandlerContext ctx) {
        log.info("【HttpServerHandler->channelUnregistered】");
    }
    public void handlerAdded(ChannelHandlerContext ctx) {
        log.info("【HttpServerHandler->handlerAdded】");
    }
    public void handlerRemoved(ChannelHandlerContext ctx) {
        log.info("【HttpServerHandler->handlerRemoved】");
    }
     public void channelActive(ChannelHandlerContext ctx) {
        log.info("【HttpServerHandler->channelActive】");
    }
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("【HttpServerHandler->channelInactive】");
    }
}
 
Client端
HttpClient
@Slf4j
public class HttpClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("loggingHandler", new LoggingHandler(LogLevel.DEBUG));
                            pipeline.addLast("httpClientCodec", new HttpClientCodec());
                            pipeline.addLast("", new HttpObjectAggregator(10 * 1024));
                            pipeline.addLast("httpClientHandler", new HttpClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);
            channelFuture.sync();
            Channel channel = channelFuture.channel();
            sendGetRequest(channel);
            // 等待通道关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            log.info("客户端发生异常: ", e);
        } finally {
            // 遇到问题, 调用此方法后客户端没有正常关闭, 将netty版本4.1.20.FINAL切换到4.1.76.FINAL即可
            group.shutdownGracefully();
            log.info("关闭group-finally");
        }
        log.info("客户端执行完毕");
    }
    private static void sendGetRequest(Channel channel) throws URISyntaxException {
        String url = "http://localhost:8080/test"; // 测试URL
        URI uri = new URI(url);
        String host = uri.getHost();
        String path = uri.getRawPath() + (uri.getRawQuery() == null ? "" : "?" + uri.getRawQuery());
        // 构建HTTP请求
        FullHttpRequest request = new DefaultFullHttpRequest(
                HttpVersion.HTTP_1_1,
                HttpMethod.GET,
                path,
                Unpooled.EMPTY_BUFFER
        );
        request.headers()
                .set(HttpHeaderNames.HOST, host)
                .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
                .set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);
        // 发送请求
        ChannelFuture channelFuture = channel.writeAndFlush(request);
        log.info("Request sent: " + request);
    }
}
 
HttpClientHandler
@Slf4j
public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) {
        // 处理响应
        log.info("处理响应, 响应头: {}", response.headers().toString());
        log.info("处理响应, 响应体: {}", response.content().toString(CharsetUtil.UTF_8));
        // 关闭连接
        ctx.channel().close();
        log.info("关闭连接");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info( "异常: {}", cause.getMessage());
        ctx.close();
    }
}
 
ws
Server端
WsServer
@Slf4j
public class WsServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("httpServerCodec", new HttpServerCodec());
                            pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
                            WebSocketServerProtocolConfig config = WebSocketServerProtocolConfig.newBuilder()
                                    .websocketPath("/ws")
                                    .checkStartsWith(true)
                                    .build();
                            pipeline.addLast("wsProtocolHandler", new WebSocketServerProtocolHandler(config));
                            pipeline.addLast("wsServerHandler", new WsServerHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 9090);
            channelFuture.sync();
            log.info("ws服务启动成功");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("服务端发生异常: ", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        log.info("ws服务关闭");
    }
}
 
WsServerHandler
@Slf4j
public class WsServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    private static Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
    private static AttributeKey<String> ATTRIBUTE_KEY_TOKEN = AttributeKey.valueOf("token");
    private static AttributeKey<Boolean> ATTRIBUTE_KEY_REPEAT = AttributeKey.valueOf("repeat");
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) throws Exception {
        log.info("【WsServerHandler->处理】:{}", webSocketFrame);
        if (webSocketFrame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;
            log.info("【textWebSocketFrame.text()】:{}", textWebSocketFrame.text());
            sendAll(ctx.channel(), textWebSocketFrame.text());
        }
    }
    private void sendAll(Channel channel, String text) {
        CHANNELS.forEach((token, ch) -> {
            if (channel != ch) {
                ch.writeAndFlush(new TextWebSocketFrame(text));
            }
        });
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info("【WsServerHandler->userEventTriggered】: {}", evt);
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            String requestUri = handshakeComplete.requestUri();
            String subprotocol = handshakeComplete.selectedSubprotocol();
            log.info("【requestUri】:{}", requestUri);
            log.info("【subprotocol】:{}", subprotocol);
            handleAuth(requestUri, ctx);
        }
    }
    private void handleAuth(String requestUri, ChannelHandlerContext ctx) {
        try {
            Map<String, String> queryParams = getQueryParams(requestUri);
            String token = queryParams.get("token");
            log.info("【token】:{}", token);
            if (token == null) {
                ctx.close();
                log.info("token为空, 关闭channel");
            } else {
                ctx.channel().attr(ATTRIBUTE_KEY_TOKEN).set(token);
                Channel oldChannel = CHANNELS.put(token, ctx.channel());
                if (oldChannel != null) {
                    oldChannel.attr(ATTRIBUTE_KEY_REPEAT).set(true);
                    oldChannel.close();
                } else {
                    sendAll(ctx.channel(), "欢迎" + token + "进入聊天室");
                }
            }
        } catch (Exception e) {
            ctx.close();
        }
    }
    private static Map<String, String> getQueryParams(String requestUri) throws URISyntaxException {
        URI uri = new URI(requestUri);
        String query = uri.getQuery();
        Map<String, String> queryParams = new HashMap<>();
        if (query != null) {
            String[] params = query.split("&");
            for (String param : params) {
                String[] keyValue = param.split("=");
                String key = keyValue[0];
                String value = keyValue.length > 1 ? keyValue[1] : "";
                queryParams.put(key, value);
            }
        }
        return queryParams;
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("【WsServerHandler->exceptionCaught】", cause);
    }
    public void handlerAdded(ChannelHandlerContext ctx) {
        log.info("【WsServerHandler->handlerAdded】");
    }
    public void handlerRemoved(ChannelHandlerContext ctx) {
        log.info("【WsServerHandler->handlerRemoved】");
    }
    public void channelRegistered(ChannelHandlerContext ctx) {
        log.info("【WsServerHandler->channelRegistered】");
    }
    public void channelUnregistered(ChannelHandlerContext ctx) {
        log.info("【WsServerHandler->channelUnregistered】");
    }
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("【WsServerHandler->channelActive】");
    }
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("【WsServerHandler->channelInactive】");
        Channel channel = ctx.channel();
        Boolean isRepeat = channel.attr(ATTRIBUTE_KEY_REPEAT).get() != null
                && channel.attr(ATTRIBUTE_KEY_REPEAT).get();
        if (!isRepeat) {
            CHANNELS.computeIfPresent(ctx.attr(ATTRIBUTE_KEY_TOKEN).get(), (key, ch) -> {
                CHANNELS.remove(channel.attr(ATTRIBUTE_KEY_TOKEN));
                sendAll(channel, channel.attr(ATTRIBUTE_KEY_TOKEN).get() + "离开聊天室");
                return null;
            });
        }
    }
}
 
Client端
WsClient
@Slf4j
public class WsClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            CountDownLatch connectLatch = new CountDownLatch(1);
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new HttpClientCodec());
                            pipeline.addLast(new HttpObjectAggregator(10 * 1024));
                            WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder()
                                    .handleCloseFrames(false)
                                    .build();
                            WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(
                                    new URI("ws://localhost:9090/ws/1?token=abc"),
                                    WebSocketVersion.V13,
                                    null,
                                    true,
                                    new DefaultHttpHeaders());
                            pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker, config));
                            pipeline.addLast(new WebSocketClientHandler(connectLatch));
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);
            Channel channel = channelFuture.channel();
            channelFuture.addListener((ChannelFutureListener) future -> {
                if (!future.isSuccess()) {
                    System.err.println("Connection failed: " + future.cause());
                    connectLatch.countDown(); // 确保不会死等
                }
            });
            // 等待连接完成(带超时)
            if (!connectLatch.await(10, TimeUnit.SECONDS)) {
                throw new RuntimeException("Connection timed out");
            }
            Scanner sc = new Scanner(System.in);
            while (true) {
                System.out.print("请输入:");
                String line = sc.nextLine();
                if (StringUtil.isNullOrEmpty(line)) {
                    continue;
                }
                if ("exit".equals(line)) {
                    channel.close();
                    break;
                } else {
                    // 发送消息
                    WebSocketFrame frame = new TextWebSocketFrame(line);
                    channelFuture.channel().writeAndFlush(frame);
                }
            }
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            log.info("客户端发生异常: ", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}
 
WebSocketClientHandler
@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    private CountDownLatch connectLatch;
    public WebSocketClientHandler(CountDownLatch connectLatch) {
        this.connectLatch = connectLatch;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 处理接收到的WebSocket帧
        if (frame instanceof TextWebSocketFrame) {
            String text = ((TextWebSocketFrame) frame).text();
            System.out.println("Received: " + text);
        } else if (frame instanceof PingWebSocketFrame) {
            // 响应Ping帧
            ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
            System.out.println("Responded to ping");
        } else if (frame instanceof CloseWebSocketFrame) {
            System.out.println("Received close frame");
            ctx.close();
        } else if (frame instanceof BinaryWebSocketFrame) {
            System.out.println("Received binary data: " + frame.content().readableBytes() + " bytes");
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 处理握手完成事件
        if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            System.out.println("WebSocket handshake complete event");
            // 握手完成后可以发送初始消息
            connectLatch.countDown();
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.println("WebSocket error: ");
        cause.printStackTrace();
        ctx.close();
    }
}
 
protobuf
Server端
NettyServer
@Slf4j
public class NettyServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));
            Channel channel = bindFuture.sync().channel();
            log.info("server start");
            channel.closeFuture().sync();
            log.info("server stop");
        } catch (Exception e) {
            log.info("server error", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
 
NettyServerHandler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("服务端接收到客户端数据:{}", msg);
        if (msg instanceof StudentPOJO.Student) {
            StudentPOJO.Student student = (StudentPOJO.Student) msg;
            log.info( "客户端发送的数据:{}, {}, {}", student, student.getId(), student.getName());
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info("【NettyServerHandler->userEventTriggered】: {}", evt);
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("exceptionCaught异常:", cause);
        ctx.close();
    }
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded:{}", ctx.channel().remoteAddress());
    }
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved:{}", ctx.channel().remoteAddress());
    }
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered:{}", ctx.channel().remoteAddress());
    }
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered:{}", ctx.channel().remoteAddress());
    }
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive:{}", ctx.channel().remoteAddress());
    }
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive:{}", ctx.channel().remoteAddress());
    }
}
 
Student.proto
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
    int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
    string name = 2;
}
// 执行命令 protoc.exe --java_out=生成路径 Student.proto路径
 
Client端
NettyClient
@Slf4j
public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);
            Channel channel = connectFuture.sync().channel();
            log.info("客户端连接成功");
            channel.closeFuture().sync();
            log.info("客户端关闭");
        } catch (Exception e) {
            log.error("客户端发生异常: ", e);
        }
    }
}
 
NettyClientHandler
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info( "【NettyClientHandler->channelRead】: {}", msg);
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info( "异常: {}", cause.getMessage());
        ctx.close();
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->channelActive】: {}", ctx);
        StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("张三san").build();
        ctx.writeAndFlush(student);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info( "【NettyClientHandler->channelInactive】: {}", ctx);
    }
}
                ![[RoarCTF 2019]Easy Calc](https://i-blog.csdnimg.cn/direct/fc85943b4a2c484aa8bec23398b13206.png)
![[Windows]在Win上安装bash和zsh - 一个脚本搞定](https://i-blog.csdnimg.cn/direct/3dc68110aee647cd8b7321ea4fa99694.png)
















