客户端和服务端的交互
客户端与服务端建立连接
- 客户端:客户端通过浏览器或者其他应用程序发起一个 HTTP 请求到服务端的
/socket.io
路径。在请求中会携带用户的 UUID 作为参数(通过c.Query("user")
获取)。
// router/socket.go
func RunSocket(c *gin.Context) {
user := c.Query("user") // 获取用户标识
if user == "" {
return // 无用户标识则拒绝连接
}
log.Info("newUser", log.String("newUser", user))
// 升级 HTTP 连接为 WebSocket 连接
ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
// 创建客户端对象
client := &server.Client{
Name: user,
Conn: ws,
Send: make(chan []byte),
}
// 注册客户端到服务器
server.MyServer.Register <- client
// 启动读写协程
go client.Read()
go client.Write()
}
- 服务端:服务端接收到请求后,使用
websocket.Upgrader
将 HTTP 连接升级为 WebSocket 连接。然后创建一个Client
实例,并将其发送到Server
的Register
通道。
客户端注册到服务端
// internal/server/server.go
func (s *Server) Start() {
for {
select {
// 处理客户端注册
case conn := <-s.Register:
log.Info("login", log.String("login", conn.Name))
s.Clients[conn.Name] = conn
msg := &protocol.Message{
From: "System",
To: conn.Name,
Content: "welcome!",
}
protoMsg, _ := proto.Marshal(msg) // 序列化为字节切片
conn.Send <- protoMsg
}
}
}
- 客户端:无特定操作,等待服务端响应。
- 服务端:服务端的
Start
方法会监听Register
通道,当有新的客户端注册时,将客户端信息保存到Clients
映射中,并向客户端发送欢迎消息。
客户端发送消息到服务端
- 客户端:客户端通过
Client
结构体的Read
方法从 WebSocket 连接读取消息。如果是心跳消息,客户端会发送 Pong 响应;否则,根据配置将消息发送到 Kafka 或者服务端的Broadcast
通道。
// internal/server/client.go
func (c *Client) Read() {
// 消息读取
for {
c.Conn.PongHandler()
_, message, err := c.Conn.ReadMessage()
if err != nil {
log.Error("client read message error", log.Err(err))
MyServer.Ungister <- c
c.Conn.Close()
break
}
msg := &protocol.Message{}
proto.Unmarshal(message, msg)
// 处理心跳消息
if msg.Type == constant.HEAT_BEAT {
pong := &protocol.Message{
Content: constant.PONG,
Type: constant.HEAT_BEAT,
}
pongBytes, err2 := proto.Marshal(pong)
if err2 != nil {
log.Error("client marshal message error", log.Err(err))
}
c.Conn.WriteMessage(websocket.BinaryMessage, pongBytes)
} else {
MyServer.Broadcast <- message
}
}
}
- 服务端:服务端的
Start
方法会监听Broadcast
通道,当接收到消息时,根据消息的类型(单聊、群聊)将消息转发给相应的客户端。
// internal/server/server.go
func (s *Server) Start() {
for {
select {
// 处理消息广播
case message := <-s.Broadcast:
msg := &protocol.Message{}
proto.Unmarshal(message, msg)
if msg.To != "" {
// 有指定接收者的消息处理
if msg.ContentType >= constant.TEXT && msg.ContentType <= constant.VIDEO {
// 一般消息,比如文本消息,视频文件消息等
_, exists := s.Clients[msg.From]
if exists { // 检查发送者是否在连接列表中
saveMessage(msg)
}
if msg.ContentType == constant.MESSAGE_TYPE_USER {
// 单人消息处理
s.sendUserMessage(msg)
} else if msg.ContentType == constant.MESSAGE_TYPE_GROUP {
// 多人消息处理
s.sendGroupMessage(msg)
} else {
// 语音电话,视频电话等,仅支持单人聊天,不支持群聊
// 不保存文件,直接进行转发
client, ok := s.Clients[msg.To]
if ok {
client.Send <- message
}
}
} else {
// 无指定接收者的广播消息处理
for id, conn := range s.Clients {
log.Info("allUser", log.String("allUser", id))
select {
case conn.Send <- message: // 发送消息给客户端,成功继续处理
default: // 失败关闭客户端
close(conn.Send)
delete(s.Clients, conn.Name)
}
}
}
}
}
}
}
服务端发送消息到客户端
- 服务端:服务端将消息发送到客户端的
Send
通道。
// internal/server/server.go
client.Send <- msgByte
client.Send <- message
- 客户端:客户端的
Write
方法会监听Send
通道,当接收到消息时,将消息通过 WebSocket 连接发送给客户端。
// internal/server/client.go
func (c *Client) Write() {
defer func() {
c.Conn.Close()
}()
for message := range c.Send {
c.Conn.WriteMessage(websocket.BinaryMessage, message)
}
}
客户端断开连接
- 客户端:无特定操作,等待服务端响应。
- 服务端:服务端的
Start
方法会监听Ungister
通道,当有客户端断开连接时,将客户端信息从Clients
映射中删除,并关闭客户端的Send
通道。
// internal/server/server.go
func (s *Server) Start() {
for {
// 处理客户端注销
case conn := <-s.Ungister:
log.Info("loginout", log.String("loginout", conn.Name))
if _, ok := s.Clients[conn.Name]; ok {
close(conn.Send)
delete(s.Clients, conn.Name)
}
}
}
客户端和服务端通过 WebSocket 连接进行实时通信,通过 Register
、Ungister
和 Broadcast
通道进行客户端的注册、注销和消息广播,通过客户端的 Send
通道进行消息的发送。整个交互过程基于 Go 语言的协程和通道机制,实现了高效、并发的通信。
代码地址:server.go,client.go
言的协程和通道机制,实现了高效、并发的通信。
代码地址:server.go,client.go