1.引入netty依赖
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.92.Final</version>
</dependency>
</dependencies>
2.准备一个服务端ChatServer和一个服务端处理器ChatServerHandler和若干个客户端ChatClient(有几个用户设置几个)和一个客户端处理器ChatClientHandler
服务端
public class ChatServer {
public static void main(String[] args) {
// 新建两个事件循环组,bossGroup 用于监听客户端的连接请求,将连接请求发送给 workerGroup 用于处理客户端连接的数据读写
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ChatServerHandler());
}
});
// 服务端绑定9000端口
serverBootstrap.bind(9000);
}
public void test(){
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
nioSocketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
nioSocketChannel.pipeline().addLast(new ChatServerHandler());
}
});
serverBootstrap.bind(9001);
}
}
服务端处理器
public class ChatServerHandler extends ChannelInboundHandlerAdapter {
// 创建一个list集合存储连接上的所有客户端channel
private static List<Channel> channels = new ArrayList<>();
// 当有客户端连接上服务端,底层会调用此方法,执行此方法的逻辑
// 这里大概的处理逻辑是:先添加新客户端channel到集合中,然后循环遍历list集合
// 然后根据不同的channel发送不同的系统消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.add(channel);
channels.forEach(ch -> {
if (ch == channel) {
channel.writeAndFlush("恭喜您,上线成功");
} else {
ch.writeAndFlush("系统消息:[" + ch.remoteAddress() + "]客户端已上线");
}
});
System.out.println("客户端[" + channel.remoteAddress() + "]请求连接");
}
// 当有客户端连断开连接,底层会调用此方法,执行此方法的逻辑
// 这里大概的处理逻辑是:然后循环遍历list集合,找到已断开连接的channel并删除
// 向集合内的channel发送系统消息
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
Iterator<Channel> iterator = channels.iterator();
while (iterator.hasNext()) {
Channel ch = iterator.next();
if (ch == channel) {
iterator.remove();
}
ch.writeAndFlush("系统消息:[" + channel.remoteAddress() + "]客户端已下线");
}
System.out.println("客户端[" + channel.remoteAddress() + "]断开连接");
}
// 当有客户端发送消息到服务端,底层会调用此方法,执行此方法的逻辑
// 这里大概的处理逻辑是:循环遍历向集合中的channel(除发送消息的客户端)发送消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date(System.currentTimeMillis());
String now = formatter.format(date);
System.out.println(now+"收到用户[" + channel.remoteAddress() + "]发来的消息:" + msg.toString());
channels.forEach(ch -> {
if (ch != channel) {
ch.writeAndFlush("消息时间:" + now + " 用户[" + channel.remoteAddress() + "]说:" + msg.toString());
}
});
}
//异常触发
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端
public class ChatClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup);
bootstrap.channel(NioSocketChannel.class);
// 设置处理器
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 将字符串编解码器及客户端处理器添加到pipeline中
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ChatClientHandler());
}
});
// 连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000);
channelFuture.sync();
// 获取客户端输入的内容,并发送至服务端(因为添加了字符串编解码器,所以此处可以直接发送字符串)
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
channelFuture.channel().writeAndFlush(scanner.nextLine());
}
}
}
客户端处理器
public class ChatClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
如果需要多个客户端用户,将客户端多复制几个就可以了.
启动时先启动服务端再启动客户端




















