文章目录
- simple
- Server端
- NettyServer
- NettyServerHandler
- Client端
- NettyClient
- NettyClientHandler
- tcp(粘包和拆包)
- Server端
- NettyTcpServer
- NettyTcpServerHandler
- Client端
- NettyTcpClient
- NettyTcpClientHandler
- protocol
- codec
- CustomMessageDecoder
- CustomMessageEncoder
- server端
- ProtocolServer
- ProtocolServerHandler
- client端
- ProtocolClient
- ProtocolClientHandler
- http
- Server端
- HttpServer
- HttpServerHandler
- Client端
- HttpClient
- HttpClientHandler
- ws
- Server端
- WsServer
- WsServerHandler
- Client端
- WsClient
- WebSocketClientHandler
- protobuf
- Server端
- NettyServer
- NettyServerHandler
- Student.proto
- Client端
- NettyClient
- NettyClientHandler
simple
Server端
NettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));
Channel channel = bindFuture.sync().channel();
log.info("server start");
channel.closeFuture().sync();
log.info("server stop");
} catch (Exception e) {
log.info("server error", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务端接收到客户端数据:{}", msg);
ctx.writeAndFlush("服务端收到客户端的数据: " + msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("【NettyServerHandler->userEventTriggered】: {}", evt);
}
public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("exceptionCaught异常:", cause);
ctx.close();
}
public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
log.info("handlerAdded:{}", ctx.channel().remoteAddress());
}
public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
log.info("handlerRemoved:{}", ctx.channel().remoteAddress());
}
public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
log.info("channelRegistered:{}", ctx.channel().remoteAddress());
}
public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
log.info("channelUnregistered:{}", ctx.channel().remoteAddress());
}
public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接:{}", ctx.channel().remoteAddress());
}
public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
log.info("客户端断开连接:{}", ctx.channel().remoteAddress());
}
}
Client端
NettyClient
@Slf4j
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);
Channel channel = connectFuture.sync().channel();
System.out.println("客户端连接成功");
Scanner sc = new Scanner(System.in);
while (true) {
System.out.println("请输入内容: ");
String line = sc.nextLine();
if (line == null || line.isEmpty()) {
continue;
} else if ("exit".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
channel.closeFuture().sync();
System.out.println("客户端关闭");
} catch (Exception e) {
log.error("客户端发生异常: ", e);
}
}
}
NettyClientHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info( "【NettyClientHandler->channelRead】: {}", msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info( "异常: {}", cause.getMessage());
ctx.close();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->channelActive】: {}", ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->channelInactive】: {}", ctx);
}
}
tcp(粘包和拆包)
Server端
NettyTcpServer
@Slf4j
public class NettyTcpServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyTcpServerHandler());
}
});
ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", 9090);
Channel channel = bindFuture.sync().channel();
log.info("server start");
channel.closeFuture().sync();
} catch (Exception e) {
log.info("server error", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyTcpServerHandler
@Slf4j
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);
log.info("服务端接收到的数据字节长度为:{}, 内容为: {}", bytes.length, content);
ByteBuf buf = Unpooled.copiedBuffer(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("异常: {}", cause.getMessage());
ctx.close();
}
}
Client端
NettyTcpClient
@Slf4j
public class NettyTcpClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyTcpClientHandler());
}
});
ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));
Channel channel = connectFuture.sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("客户端发生异常: ", e);
} finally {
group.shutdownGracefully();
}
}
}
NettyTcpClientHandler
@Slf4j
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info("客户端接收到数据:{}", byteBuf.toString(StandardCharsets.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
/*
粘包:
1. 这里连续发送10次byteBuf,发现服务端有可能1次就全部接收了,
也有可能3次接受了,也有可能4次接收了,这是不确定的,
这也就意味着基于底层NIO的tcp的数据传输 是基于流式传输的,会出现粘包的问题。
2. 因此服务端必须 自行处理粘包问题,区分消息边界
3. 这里测试的时候,可以多启动几个客户端来观察
4. 这里示例的粘包示例与上面simple的区别在于:这里是在短时间内连续发送
*/
/*for (int i = 0; i < 10; i++) {
ByteBuf byteBuf = Unpooled.copiedBuffer(("hello, server " + i).getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(byteBuf);
}*/
/*
拆包:
1. 这里1次发送了1个10000字节长的数据,而服务端分多次收到,有可能是2次,有可能是1次, 这是不确定的,
2. 假设真实数据包就有这么长,那么服务端可能需要分多次才能接收到完整的数据包,
3. 同时,我们发现总的数据长度服务端都接收到了,这说明底层NIO的tcp的数据传输 是可靠的
4. 1条比较长的消息,服务端分多次才能收到,所以服务端需要解决拆包的问题,将多次接收到的消息转为1条完整的消息
5. 这里示例的拆包示例与上面simple的区别在于:这里1次发送的消息数据很长
*/
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 1000; i++) {
sb.append("Netty拆包示例|");
}
ctx.writeAndFlush(Unpooled.copiedBuffer(sb.toString().getBytes(StandardCharsets.UTF_8)));
log.info("客户端发送数据长度:{}", sb.toString().length());
/* 拆包 与 粘包 的核心问题就是 tcp是流式传输的,tcp可以保证数据可靠传输,但需要对方在接收时需要能区分出消息边界,从而获取1条完整的消息 */
}
}
protocol
codec
使用自定义协议,编解码器,识别消息边界,处理粘包和拆包问题
CustomMessageDecoder
public class CustomMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int len = in.readInt();
if (in.readableBytes() < len) {
in.resetReaderIndex();
return;
}
byte[] bytes = new byte[len];
in.readBytes(bytes);
out.add(CustomMessage.builder()
.len(len)
.content(bytes)
.build());
}
}
CustomMessageEncoder
public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
server端
ProtocolServer
@Slf4j
public class ProtocolServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CustomMessageDecoder());
pipeline.addLast(new CustomMessageEncoder());
pipeline.addLast(new ProtocolServerHandler());
}
});
ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 9090));
Channel channel = bindFuture.sync().channel();
log.info("server start");
channel.closeFuture().sync();
} catch (Exception e) {
log.info("server error", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
log.info("server stop");
}
}
ProtocolServerHandler
@Slf4j
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 这里直接转, 如果不能转的话, 就说明前面的解码有问题
CustomMessage customMessage = (CustomMessage) msg;
log.info("服务端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));
// 将消息回过去(需要加上对应的编码器)
ctx.writeAndFlush(customMessage);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("ProtocolServerHandler异常: {}", cause.getMessage());
ctx.close();
}
}
client端
ProtocolClient
@Slf4j
public class ProtocolClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CustomMessageEncoder());
pipeline.addLast(new CustomMessageDecoder());
pipeline.addLast(new ProtocolClientHandler());
}
});
ChannelFuture connectFuture = bootstrap.connect("localhost", 9090);
Channel channel = connectFuture.sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.info("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
ProtocolClientHandler
@Slf4j
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这里直接转, 如果不能转的话, 就说明前面的解码有问题
CustomMessage customMessage = (CustomMessage) msg;
log.info("客户端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 1; i <= 20; i++) {
byte[] bytes = ("hello, server " + i).getBytes(StandardCharsets.UTF_8);
CustomMessage message = CustomMessage.builder()
.content(bytes)
.len(bytes.length)
.build();
ctx.writeAndFlush(message);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("【ProtocolClientHandler->exceptionCaught】: {}", cause.getMessage());
}
}
http
Server端
HttpServer
@Slf4j
public class HttpServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler("【服务端主】"))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("loggingHandler", new LoggingHandler("【服务端从】"));
pipeline.addLast("httpServerCodec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
pipeline.addLast("httpServerHandler", new HttpServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080));
channelFuture.sync();
log.info("http服务器启动成功, 您可以访问: http://localhost:8080/test");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.info("服务端发生异常: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
HttpServerHandler
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
log.info("【HttpServerHandler->处理】:{}", msg);
if (msg instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
String uri = fullHttpRequest.uri();
log.info("【uri】:{}", uri);
HttpMethod method = fullHttpRequest.method();
log.info("【method】:{}", method);
// 响应回去
byte[] bytes = ("服务器收到时间" + LocalDateTime.now()).getBytes(StandardCharsets.UTF_8);
DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.copiedBuffer(bytes)
);
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, bytes.length);
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
ChannelPromise promise = ctx.newPromise();
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
log.info("操作完成");
log.info("isDone: {}", future.isDone());
log.info("isSuccess: {}", future.isSuccess());
log.info("isCancelled: {}", future.isCancelled());
log.info("hasException: {}", future.cause() != null, future.cause());
}
});
ctx.writeAndFlush(fullHttpResponse, promise);
log.info("刚刚写完");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.error("【HttpServerHandler->userEventTriggered】:{}", evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("【HttpServerHandler->exceptionCaught】", cause);
}
public void channelRegistered(ChannelHandlerContext ctx) {
log.info("【HttpServerHandler->channelRegistered】");
}
public void channelUnregistered(ChannelHandlerContext ctx) {
log.info("【HttpServerHandler->channelUnregistered】");
}
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("【HttpServerHandler->handlerAdded】");
}
public void handlerRemoved(ChannelHandlerContext ctx) {
log.info("【HttpServerHandler->handlerRemoved】");
}
public void channelActive(ChannelHandlerContext ctx) {
log.info("【HttpServerHandler->channelActive】");
}
public void channelInactive(ChannelHandlerContext ctx) {
log.info("【HttpServerHandler->channelInactive】");
}
}
Client端
HttpClient
@Slf4j
public class HttpClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("loggingHandler", new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("httpClientCodec", new HttpClientCodec());
pipeline.addLast("", new HttpObjectAggregator(10 * 1024));
pipeline.addLast("httpClientHandler", new HttpClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);
channelFuture.sync();
Channel channel = channelFuture.channel();
sendGetRequest(channel);
// 等待通道关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.info("客户端发生异常: ", e);
} finally {
// 遇到问题, 调用此方法后客户端没有正常关闭, 将netty版本4.1.20.FINAL切换到4.1.76.FINAL即可
group.shutdownGracefully();
log.info("关闭group-finally");
}
log.info("客户端执行完毕");
}
private static void sendGetRequest(Channel channel) throws URISyntaxException {
String url = "http://localhost:8080/test"; // 测试URL
URI uri = new URI(url);
String host = uri.getHost();
String path = uri.getRawPath() + (uri.getRawQuery() == null ? "" : "?" + uri.getRawQuery());
// 构建HTTP请求
FullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.GET,
path,
Unpooled.EMPTY_BUFFER
);
request.headers()
.set(HttpHeaderNames.HOST, host)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
.set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);
// 发送请求
ChannelFuture channelFuture = channel.writeAndFlush(request);
log.info("Request sent: " + request);
}
}
HttpClientHandler
@Slf4j
public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) {
// 处理响应
log.info("处理响应, 响应头: {}", response.headers().toString());
log.info("处理响应, 响应体: {}", response.content().toString(CharsetUtil.UTF_8));
// 关闭连接
ctx.channel().close();
log.info("关闭连接");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info( "异常: {}", cause.getMessage());
ctx.close();
}
}
ws
Server端
WsServer
@Slf4j
public class WsServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("httpServerCodec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
WebSocketServerProtocolConfig config = WebSocketServerProtocolConfig.newBuilder()
.websocketPath("/ws")
.checkStartsWith(true)
.build();
pipeline.addLast("wsProtocolHandler", new WebSocketServerProtocolHandler(config));
pipeline.addLast("wsServerHandler", new WsServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 9090);
channelFuture.sync();
log.info("ws服务启动成功");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("服务端发生异常: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
log.info("ws服务关闭");
}
}
WsServerHandler
@Slf4j
public class WsServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private static Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
private static AttributeKey<String> ATTRIBUTE_KEY_TOKEN = AttributeKey.valueOf("token");
private static AttributeKey<Boolean> ATTRIBUTE_KEY_REPEAT = AttributeKey.valueOf("repeat");
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) throws Exception {
log.info("【WsServerHandler->处理】:{}", webSocketFrame);
if (webSocketFrame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;
log.info("【textWebSocketFrame.text()】:{}", textWebSocketFrame.text());
sendAll(ctx.channel(), textWebSocketFrame.text());
}
}
private void sendAll(Channel channel, String text) {
CHANNELS.forEach((token, ch) -> {
if (channel != ch) {
ch.writeAndFlush(new TextWebSocketFrame(text));
}
});
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("【WsServerHandler->userEventTriggered】: {}", evt);
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
String requestUri = handshakeComplete.requestUri();
String subprotocol = handshakeComplete.selectedSubprotocol();
log.info("【requestUri】:{}", requestUri);
log.info("【subprotocol】:{}", subprotocol);
handleAuth(requestUri, ctx);
}
}
private void handleAuth(String requestUri, ChannelHandlerContext ctx) {
try {
Map<String, String> queryParams = getQueryParams(requestUri);
String token = queryParams.get("token");
log.info("【token】:{}", token);
if (token == null) {
ctx.close();
log.info("token为空, 关闭channel");
} else {
ctx.channel().attr(ATTRIBUTE_KEY_TOKEN).set(token);
Channel oldChannel = CHANNELS.put(token, ctx.channel());
if (oldChannel != null) {
oldChannel.attr(ATTRIBUTE_KEY_REPEAT).set(true);
oldChannel.close();
} else {
sendAll(ctx.channel(), "欢迎" + token + "进入聊天室");
}
}
} catch (Exception e) {
ctx.close();
}
}
private static Map<String, String> getQueryParams(String requestUri) throws URISyntaxException {
URI uri = new URI(requestUri);
String query = uri.getQuery();
Map<String, String> queryParams = new HashMap<>();
if (query != null) {
String[] params = query.split("&");
for (String param : params) {
String[] keyValue = param.split("=");
String key = keyValue[0];
String value = keyValue.length > 1 ? keyValue[1] : "";
queryParams.put(key, value);
}
}
return queryParams;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("【WsServerHandler->exceptionCaught】", cause);
}
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("【WsServerHandler->handlerAdded】");
}
public void handlerRemoved(ChannelHandlerContext ctx) {
log.info("【WsServerHandler->handlerRemoved】");
}
public void channelRegistered(ChannelHandlerContext ctx) {
log.info("【WsServerHandler->channelRegistered】");
}
public void channelUnregistered(ChannelHandlerContext ctx) {
log.info("【WsServerHandler->channelUnregistered】");
}
public void channelActive(ChannelHandlerContext ctx) {
log.info("【WsServerHandler->channelActive】");
}
public void channelInactive(ChannelHandlerContext ctx) {
log.info("【WsServerHandler->channelInactive】");
Channel channel = ctx.channel();
Boolean isRepeat = channel.attr(ATTRIBUTE_KEY_REPEAT).get() != null
&& channel.attr(ATTRIBUTE_KEY_REPEAT).get();
if (!isRepeat) {
CHANNELS.computeIfPresent(ctx.attr(ATTRIBUTE_KEY_TOKEN).get(), (key, ch) -> {
CHANNELS.remove(channel.attr(ATTRIBUTE_KEY_TOKEN));
sendAll(channel, channel.attr(ATTRIBUTE_KEY_TOKEN).get() + "离开聊天室");
return null;
});
}
}
}
Client端
WsClient
@Slf4j
public class WsClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
CountDownLatch connectLatch = new CountDownLatch(1);
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(10 * 1024));
WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder()
.handleCloseFrames(false)
.build();
WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(
new URI("ws://localhost:9090/ws/1?token=abc"),
WebSocketVersion.V13,
null,
true,
new DefaultHttpHeaders());
pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker, config));
pipeline.addLast(new WebSocketClientHandler(connectLatch));
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);
Channel channel = channelFuture.channel();
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
System.err.println("Connection failed: " + future.cause());
connectLatch.countDown(); // 确保不会死等
}
});
// 等待连接完成(带超时)
if (!connectLatch.await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Connection timed out");
}
Scanner sc = new Scanner(System.in);
while (true) {
System.out.print("请输入:");
String line = sc.nextLine();
if (StringUtil.isNullOrEmpty(line)) {
continue;
}
if ("exit".equals(line)) {
channel.close();
break;
} else {
// 发送消息
WebSocketFrame frame = new TextWebSocketFrame(line);
channelFuture.channel().writeAndFlush(frame);
}
}
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.info("客户端发生异常: ", e);
} finally {
group.shutdownGracefully();
}
}
}
WebSocketClientHandler
@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private CountDownLatch connectLatch;
public WebSocketClientHandler(CountDownLatch connectLatch) {
this.connectLatch = connectLatch;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 处理接收到的WebSocket帧
if (frame instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) frame).text();
System.out.println("Received: " + text);
} else if (frame instanceof PingWebSocketFrame) {
// 响应Ping帧
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
System.out.println("Responded to ping");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("Received close frame");
ctx.close();
} else if (frame instanceof BinaryWebSocketFrame) {
System.out.println("Received binary data: " + frame.content().readableBytes() + " bytes");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 处理握手完成事件
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
System.out.println("WebSocket handshake complete event");
// 握手完成后可以发送初始消息
connectLatch.countDown();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("WebSocket error: ");
cause.printStackTrace();
ctx.close();
}
}
protobuf
Server端
NettyServer
@Slf4j
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));
Channel channel = bindFuture.sync().channel();
log.info("server start");
channel.closeFuture().sync();
log.info("server stop");
} catch (Exception e) {
log.info("server error", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务端接收到客户端数据:{}", msg);
if (msg instanceof StudentPOJO.Student) {
StudentPOJO.Student student = (StudentPOJO.Student) msg;
log.info( "客户端发送的数据:{}, {}, {}", student, student.getId(), student.getName());
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("【NettyServerHandler->userEventTriggered】: {}", evt);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("exceptionCaught异常:", cause);
ctx.close();
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("handlerAdded:{}", ctx.channel().remoteAddress());
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("handlerRemoved:{}", ctx.channel().remoteAddress());
}
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelRegistered:{}", ctx.channel().remoteAddress());
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelUnregistered:{}", ctx.channel().remoteAddress());
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channelActive:{}", ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channelInactive:{}", ctx.channel().remoteAddress());
}
}
Student.proto
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
string name = 2;
}
// 执行命令 protoc.exe --java_out=生成路径 Student.proto路径
Client端
NettyClient
@Slf4j
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);
Channel channel = connectFuture.sync().channel();
log.info("客户端连接成功");
channel.closeFuture().sync();
log.info("客户端关闭");
} catch (Exception e) {
log.error("客户端发生异常: ", e);
}
}
}
NettyClientHandler
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info( "【NettyClientHandler->channelRead】: {}", msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info( "异常: {}", cause.getMessage());
ctx.close();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->channelActive】: {}", ctx);
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("张三san").build();
ctx.writeAndFlush(student);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info( "【NettyClientHandler->channelInactive】: {}", ctx);
}
}