文章目录
- 〇、代码逻辑
- 一、搭建Server
- 1.引入依赖
- 2.搭建一个简单的Server
 
- 二、搭建WebSocket建立连接
- 1.修改Server,增加一些支持
- 2.自定义一个WebSocketHandler
 
- 三、功能实现——用户注册上线
- 1.先定义一个工具类Result,用于封装服务端返回消息
- 2.封装客户端指令
- 3.完善WebSocketHandler
- 4.给Server添加一个存放用户映射关系的Map
- 5.定义ConnectionHandler,用于处理用户上线功能的逻辑
- 6.最后,运行项目来进行测试
 
- 四、功能实现——私聊
- 1.扩展Command类,增加封装后的消息协议
- 2.修改WebSocketHandler,增加对聊天功能的处理ChatHandler
- 3.定义ChatHandler用于处理聊天任务
- 4.运行项目进行测试
 
- 五、功能实现——群聊
- 1.给Server类添加一个channel组,实现系统默认群聊组
- 2.给CommandType增加一个加入群聊指令的枚举类型
- 3.在WebSocketHandler中增加加入群组功能的处理
- 4.定义JoinHandler,用于处理加入群聊的业务逻辑
- 5.在ChatHandler中增加发送群聊消息的代码
- 6.运行项目进行测试
 
项目源码: github
 websocket在线测试工具(在没有前端的情况下也可以与Server连接并进行通信): http://websocket-test.com/
在前两篇博客中我介绍了Java IO通信模型和Netty核心概念,在这篇博客中我会展示如何使用Netty开发一个仿WeChat通讯工具——SmartChat。
我们都知道,Netty是一个异步基于事件驱动的高性能网络通信框架,下面我们使用它来一步步搭建SmartChat。
〇、代码逻辑

一、搭建Server
首先,使用熟悉的IDE工具创建一个Java项目,我命名为了SmartChat。
1.引入依赖
<!--    netty    -->
<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.85.Final</version>
</dependency>
<!--    lombok工具    -->
<dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>provided</scope>
</dependency>
<!--    json    -->
<dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.18</version>
</dependency>
2.搭建一个简单的Server
package tracy;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
    public static void start(){
        //主线程池
        EventLoopGroup bossPool=new NioEventLoopGroup();
        //副线程池
        EventLoopGroup workPool=new NioEventLoopGroup();
        //用于监听端口
        ServerBootstrap bootstrap=new ServerBootstrap();
        bootstrap.group(bossPool,workPool)//放入两个线程池
                .channel(NioServerSocketChannel.class)//指定channel
                .childHandler(new ChannelInitializer<SocketChannel>() {//初始化channel
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                    }
                });
        //监听端口
        ChannelFuture future=bootstrap.bind(8080);
    }
}
- 在项目启动类中调用Server类的start():
package tracy;
/**
 * 启动类
 */
public class Main {
    public static void main(String[] args) {
        System.out.println("SmartChat!!!");
        System.out.println("Hello and welcome!");
        Server.start();
    }
}
- 运行启动类:
  
二、搭建WebSocket建立连接
websocket用于在服务端和客户端之间建立连接。
1.修改Server,增加一些支持
package tracy;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class Server {
    public static void start () throws InterruptedException{
        //主线程池
        EventLoopGroup bossPool=new NioEventLoopGroup();
        //副线程池
        EventLoopGroup workPool=new NioEventLoopGroup();
        //用于监听端口
        ServerBootstrap bootstrap=new ServerBootstrap();
        bootstrap.group(bossPool,workPool)//放入两个线程池
                .channel(NioServerSocketChannel.class)//指定channel
                .childHandler(new ChannelInitializer<SocketChannel>() {//初始化channel
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //获取pipeline,pipeline的工作是基于责任链模式
                        ChannelPipeline pipeline=socketChannel.pipeline();
                        //添加一些handler
                        //http编码解码器
                        pipeline.addLast(new HttpServerCodec())
                                //对大数据量的支持
                                .addLast(new ChunkedWriteHandler())
                                //对http消息进行聚合
                                .addLast(new HttpObjectAggregator(1024*24))
                                //对websocket进行支持
                                .addLast(new WebSocketServerProtocolHandler("/"))
                                //websocket具体怎么处理,需要自定义
                                .addLast(new WebSocketHandler());
                    }
                });
        //监听端口
        bootstrap.bind(8080);
    }
}
2.自定义一个WebSocketHandler
package tracy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
 * TextWebSocketFrame表示消息体为文本类型
 */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //先做一个最简单的处理,把消息内容直接打印出来
        System.out.println("客户端消息:"+msg.text());
    }
}
- 启动项目后去websocket在线测试网站http://websocket-test.com/进行测试:

- 成功:

三、功能实现——用户注册上线
功能概述: 用户A要想与用户B进行私聊,首先需要保存用户A和B和Server的连接标识,即需要保存映射关系。
- 优化目录结构:
 首先,我们先来优化一下目录结构,使其更清晰。
 command用来存放客户端的指令,handler用来存放我们的一些处理逻辑,util存放工具类。
  
1.先定义一个工具类Result,用于封装服务端返回消息
package tracy.util;
import com.alibaba.fastjson2.JSON;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
public class Result {
    private String name;//消息类型
    private LocalDateTime time;
    private String message;//消息内容
    public static TextWebSocketFrame fail(String message){
        return new TextWebSocketFrame(JSON.toJSONString(new Result("系统消息",LocalDateTime.now(),message)));
    }
    public static TextWebSocketFrame success(String message){
        return new TextWebSocketFrame(JSON.toJSONString(new Result("系统消息",LocalDateTime.now(),message)));
    }
	public static TextWebSocketFrame success(String name,String message){
        return new TextWebSocketFrame(JSON.toJSONString(new Result(name,LocalDateTime.now(),message)));
    }
}
2.封装客户端指令
- 定义一个Command类,用于描述客户端的指令:
package tracy.command;
import lombok.Data;
@Data
public class Command {
    //用户昵称
    private String nickname;
    //指令
    private Integer code;
}
- 定义一个CommandType枚举类型,用于罗列客户端的指令类型:
package tracy.command;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum CommandType{
    //指令:建立连接
    CONNECTION(1),
    //指令:错误指令
    ERROR(0),
    ;
    private final Integer code;
    public static CommandType match(Integer code){
        //遍历枚举类型的所有值,看输入的code是否能与某一个匹配上
        for (CommandType value:CommandType.values()){
            if(value.getCode().equals(code))return value;
        }
        //匹配不上,说明输入的指令不是合法的枚举类
        return ERROR;
    }
}
3.完善WebSocketHandler
package tracy.handler;
import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import tracy.command.*;
import tracy.util.Result;
/**
 * TextWebSocketFrame表示消息体为文本类型
 */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //将消息解析成一个Command能兼容的对象
        try{
            //将json形式的文本解析成Command类
            Command command= JSON.parseObject(msg.text(),Command.class);
            //每一种指令都定义一个对应的Handler来进行处理
            switch(CommandType.match(command.getCode())){
                case CONNECTION: ConnectionHandler.execute(command,ctx);break;
                default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));
            }
        }catch (Exception e){
            e.printStackTrace();
            ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));
        }
    }
}
4.给Server添加一个存放用户映射关系的Map
//用于保存映射关系
public static final Map<String,Channel> USERS=new ConcurrentHashMap<>();
5.定义ConnectionHandler,用于处理用户上线功能的逻辑
package tracy.handler;
import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.command.Command;
import tracy.util.Result;
public class ConnectionHandler {
    public static void execute(Command command, ChannelHandlerContext ctx){
        //容错处理:避免相同昵称的用户重复上线
        if(Server.USERS.containsKey(command.getNickname())){
            ctx.channel().writeAndFlush(Result.fail("该用户已上线,请更换昵称后重试!"));
            ctx.channel().disconnect();
            return;
        }
        //将用户加入到服务端的映射队列中
        Server.USERS.put(command.getNickname(),ctx.channel());
        //返回一条表示用户上线成功的消息
        ctx.channel().writeAndFlush(Result.success("与服务端连接建立成功!"));
        //再以json字符串的形式返回当前在线的用户列表
        ctx.channel().writeAndFlush(Result.success(JSON.toJSONString(Server.USERS.keySet())));
    }
}
6.最后,运行项目来进行测试
http://websocket-test.com/
wang001上线成功
{
	"nickname": "tracy001",
	"code": 1
}

 新开一个测试窗口,tracy002上线成功
{
	"nickname": "tracy002",
	"code": 1
}

 新开一个测试窗口,tracy001上线失败,原因是昵称重复
{
	"nickname": "tracy001",
	"code": 1
}

 新开一个测试窗口,tracy003上线失败,原因是指令=2不合法
{
	"nickname": "tracy003",
	"code": 2
}

四、功能实现——私聊
1.扩展Command类,增加封装后的消息协议
- Command:
package tracy.command;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Command {
    //用户昵称
    private String nickname;
    //指令
    private Integer code;
    //消息
    private Message message;
}
- CommandType也需要增加相应的枚举值:
//指令:聊天
CHAT(2),
- Message:
package tracy.command;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message{
    //消息类型
    private Integer type;
    //接收对象
    private String target;
    //内容
    private String content;
    public Message(Integer type,String target){
        this.type=type;
        this.target=target;
    }
    public Message(Integer type){
        this.type=type;
    }
}
- MessageType:
package tracy.command;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum MessageType {
    //私聊
    PRIVATE(1),
    //群聊
    GROUP(2),
    //错误
    ERROR(0);
    private Integer type;
    public static MessageType match(Integer type){
        //遍历枚举类型的所有值,看输入的type否能与某一个匹配上
        for (MessageType value:MessageType.values()){
            if(value.getType().equals(type))return value;
        }
        //匹配不上,说明输入的type不是合法的枚举类
        return ERROR;
    }
}
2.修改WebSocketHandler,增加对聊天功能的处理ChatHandler
package tracy.handler;
import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import tracy.command.*;
import tracy.util.Result;
/**
 * TextWebSocketFrame表示消息体为文本类型
 */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        try{
            //将json形式的文本解析成Command类
            Command command= JSON.parseObject(msg.text(),Command.class);
            //每一种指令都定义一个对应的Handler来进行处理
            switch(CommandType.match(command.getCode())){
                case CONNECTION: ConnectionHandler.execute(command,ctx);break;
                case CHAT: ChatHandler.execute(command,ctx);break;
                default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));
            }
        }catch (Exception e){
            e.printStackTrace();
            ctx.channel().writeAndFlush(Result.fail("发送消息格式错误,请确认后再试"));
        }
    }
}
3.定义ChatHandler用于处理聊天任务
package tracy.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.command.Command;
import tracy.command.Message;
import tracy.command.MessageType;
import tracy.util.Result;
public class ChatHandler {
    //用户聊天逻辑
    public static void execute(Command command, ChannelHandlerContext ctx){
        try{
            Message message=command.getMessage();
            //按不同聊天类型进行处理
            switch(MessageType.match(message.getType())){
                //私聊
                case PRIVATE: {
                    //信息接收对象为空
                    String target=message.getTarget();
                    Channel channel=Server.USERS.get(target);
                    if(target==null||target.isEmpty()){
                        ctx.channel().writeAndFlush(Result.fail("接收者信息为空,请确认后再试"));
                        return;
                    }
                    //信息接收对象不存在
                    else if(channel==null){
                        ctx.channel().writeAndFlush(Result.fail("接收者"+target+"不存在,请确认后再试"));
                        return;
                    }
                    //信息接收对象下线了
                    else if(!channel.isActive()){
                        ctx.channel().writeAndFlush(Result.fail("接收者"+target+"已下线,请确认后再试"));
                        return;
                    }
                    else{
                        channel.writeAndFlush(Result.success("私聊消息("+command.getNickname()+")",message.getContent()));
                    }
                };break;
                //群聊,先空着,下一章实现
                case GROUP: ;break;
                default: ctx.channel().writeAndFlush(Result.fail("不支持的TYPE"));
            }
        }catch (Exception e){
            e.printStackTrace();
            ctx.channel().writeAndFlush(Result.fail("发送消息格式错误,请确认后再试"));
        }
    }
}
4.运行项目进行测试
http://websocket-test.com/
wang001、wang002、wang003上线成功
 
 
 
wang001向wang002发送消息,wang002接收到了
{
	"nickname": "wang001",
	"code": 2,
	"message": {
		"type": 1,
		"target": "wang002",
		"content": "你好,我是1号"
	}
}

 
wang001向并不存在的wang004发送消息,失败

wang003下线,然后wang001向下线的wang003发送消息,失败

五、功能实现——群聊
功能概述:系统提供一个群里组,但是用户需要有加入群聊这个操作,才能进行后续的收发群聊消息。
1.给Server类添加一个channel组,实现系统默认群聊组
//添加一个channel组,用于实现群聊一对多通信
public static final ChannelGroup CHANNEL_GROUP=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
2.给CommandType增加一个加入群聊指令的枚举类型
//指令:加入群聊
JOIN(3),
3.在WebSocketHandler中增加加入群组功能的处理
            switch(CommandType.match(command.getCode())){
                case CONNECTION: ConnectionHandler.execute(command,ctx);break;
                case CHAT: ChatHandler.execute(command,ctx);break;
                case JOIN: JoinHandler.execute(ctx);break;
                default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));
            }
4.定义JoinHandler,用于处理加入群聊的业务逻辑
package tracy.handler;
import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.util.Result;
public class JoinHandler {
    public static void execute(ChannelHandlerContext ctx){
        Server.CHANNEL_GROUP.add(ctx.channel());
        ctx.channel().writeAndFlush(Result.success("加入系统默认群聊成功"));
    }
}
5.在ChatHandler中增加发送群聊消息的代码
            switch(MessageType.match(message.getType())){
                //私聊
                case PRIVATE: {
                	……
                    };break;
                //群聊
                case GROUP: {
                    Server.CHANNEL_GROUP.writeAndFlush(Result.success("群聊消息("+command.getNickname()+")",message.getContent()));
                };break;
                default: ctx.channel().writeAndFlush(Result.fail("不支持的TYPE"));
            }
6.运行项目进行测试
http://websocket-test.com/
 自行完成这一工作。
- 加入群聊json:
{
	"nickname": "wang001",
	"code": 3
}
- 发送群聊消息json:
{
	"nickname": "wang001",
	"code": 2,
	"message": {
		"type": 2,
		"content": "你好,我是1号"
	}
}
成功!
 
到这里,我们就完成了mini版微信聊天工具SmartChat的开发工作了,在此基础上,可以增加前端的开发,以及更多功能的实现,实际上开发步骤都是类似的,只是针对不同的功能具体的业务逻辑不同罢了,感兴趣的同学可以尝试着扩展一下SmartChat的功能。
 源码请看文章的最顶部。感谢阅读。



















