分布式部署可能遇到的问题
常见 nginx 反向代理方案
 
 
- 假设按照上述架构方案来
- A用户接入后connA(ws客户端) 由节点1来维护
- B用户接入后connA(ws客户端) 由节点2来维护
- 流程: A->B 发信息: A -> connA -> 分析处理 -> connB -> B
- 实际上,上述流程是没有办法通信的,因为 A找不到B在哪里
- 核心问题:系统如何将消息投递到 connB?
常用解决方案
1 ) 使用消息总线
 
 
- 优点:简单
- 去电:各个节点不知道彼此节点状态
- 例子:redis/kafka/MQ
2 ) 局域网通信协议
 
 
- 节点间通过通信协议来通信
- 优点:简单,成本低
- 缺点:不知道节点状态
- 例子:UDP
3 ) 实现调度应用
 
 
- 自己实现调度应用,保存各个客户端的状态
- 优点:可靠
- 缺点:复杂
基于局域网通信UDP协议解决
- 以上三种方案,我们选择第二种来解决
- 首先,回顾单体应用 
  - 开启ws接收协程recvproc/ws发送协程sendproc
- websocket收到消息->dispatch发送给dstid
 
- 基于UDP的分布式应用 
  - 开启ws接收协程recvproc/ws发送协程sendproc
- 开启udp接收协程udprecvproc/udp发送协程udpsendproc
- websocket收到消息->broadMsg广播到局域网
- udp接收到收到消息->dispatch发送给dstid
- 自己是局域网一份子,所以也能接收到消息
 
代码实现解决,ctrl/chat.go
package ctrl
import (
	"net/http"
	"github.com/gorilla/websocket"
	"gopkg.in/fatih/set.v0"
	"sync"
	"strconv"
	"log"
	"encoding/json"
	"net"
)
const (
	CMD_SINGLE_MSG = 10
	CMD_ROOM_MSG   = 11
	CMD_HEART      = 0
)
type Message struct {
	Id      int64  `json:"id,omitempty" form:"id"` //消息ID
	Userid  int64  `json:"userid,omitempty" form:"userid"` //谁发的
	Cmd     int    `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
	Dstid   int64  `json:"dstid,omitempty" form:"dstid"`//对端用户ID/群ID
	Media   int    `json:"media,omitempty" form:"media"` //消息按照什么样式展示
	Content string `json:"content,omitempty" form:"content"` //消息的内容
	Pic     string `json:"pic,omitempty" form:"pic"` //预览图片
	Url     string `json:"url,omitempty" form:"url"` //服务的URL
	Memo    string `json:"memo,omitempty" form:"memo"` //简单描述
	Amount  int    `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
/**
消息发送结构体
1、MEDIA_TYPE_TEXT
{id:1,userid:2,dstid:3,cmd:10,media:1,content:"hello"}
2、MEDIA_TYPE_News
{id:1,userid:2,dstid:3,cmd:10,media:2,content:"标题",pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/dsturl","memo":"这是描述"}
3、MEDIA_TYPE_VOICE,amount单位秒
{id:1,userid:2,dstid:3,cmd:10,media:3,url:"http://www.a,com/dsturl.mp3",anount:40}
4、MEDIA_TYPE_IMG
{id:1,userid:2,dstid:3,cmd:10,media:4,url:"http://www.baidu.com/a/log,jpg"}
5、MEDIA_TYPE_REDPACKAGR //红包amount 单位分
{id:1,userid:2,dstid:3,cmd:10,media:5,url:"http://www.baidu.com/a/b/c/redpackageaddress?id=100000","amount":300,"memo":"恭喜发财"}
6、MEDIA_TYPE_EMOJ 6
{id:1,userid:2,dstid:3,cmd:10,media:6,"content":"cry"}
7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}
7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}
8、MEDIA_TYPE_VIDEO 8
{id:1,userid:2,dstid:3,cmd:10,media:8,pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/a.mp4"}
9、MEDIA_TYPE_CONTACT 9
{id:1,userid:2,dstid:3,cmd:10,media:9,"content":"10086","pic":"http://www.baidu.com/a/avatar,jpg","memo":"胡大力"}
*/
//本核心在于形成userid和Node的映射关系
type Node struct {
	Conn *websocket.Conn
	//并行转串行,
	DataQueue chan []byte
	GroupSets set.Interface
}
//映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node,0)
//读写锁
var rwlocker sync.RWMutex
// ws://127.0.0.1/chat?id=1&token=xxxx
func Chat(writer http.ResponseWriter,
	request *http.Request) {
	//fmt.Printf("%+v",request.Header)
	// 检验接入是否合法
    //checkToken(userId int64,token string)
    query := request.URL.Query()
    id := query.Get("id")
    token := query.Get("token")
    userId ,_ := strconv.ParseInt(id,10,64)
	isvalida := checkToken(userId,token)
	//如果isvalida=true
	//isvalida=false
	conn,err :=(&websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return isvalida
		},
	}).Upgrade(writer,request,nil)
	if err!=nil{
		log.Println(err.Error())
		return
	}
	// 获得conn
	node := &Node{
		Conn:conn,
		DataQueue:make(chan []byte,50),
		GroupSets:set.New(set.ThreadSafe),
	}
	// 获取用户全部群Id
	comIds := contactService.SearchComunityIds(userId)
	for _,v:=range comIds{
		node.GroupSets.Add(v)
	}
	// userid和node形成绑定关系
	rwlocker.Lock()
	clientMap[userId]=node
	rwlocker.Unlock()
	//todo 完成发送逻辑,con
	go sendproc(node)
	//todo 完成接收逻辑
	go recvproc(node)
    log.Printf("<-%d\n",userId)
	sendMsg(userId,[]byte("hello,world!"))
}
// 添加新的群ID到用户的groupset中
func AddGroupId(userId,gid int64){
	//取得node
	rwlocker.Lock()
	node,ok := clientMap[userId]
	if ok{
		node.GroupSets.Add(gid)
	}
	//clientMap[userId] = node
	rwlocker.Unlock()
	//添加gid到set
}
// ws发送协程
func sendproc(node *Node) {
	for {
		select {
		case data:= <-node.DataQueue:
			err := node.Conn.WriteMessage(websocket.TextMessage,data)
			if err!=nil{
				log.Println(err.Error())
				return
			}
		}
	}
}
// ws接收协程
func recvproc(node *Node) {
	for{
		_,data,err := node.Conn.ReadMessage()
		if err!=nil{
			log.Println(err.Error())
			return
		}
		//dispatch(data)
		//把消息广播到局域网
		broadMsg(data)
		log.Printf("[ws]<=%s\n",data)
	}
}
func init(){
	go udpsendproc()
	go udprecvproc()
}
// 用来存放发送的要广播的数据
var udpsendchan chan []byte=make(chan []byte,1024)
// 将消息广播到局域网
func broadMsg(data []byte){
	udpsendchan<-data
}
// 完成udp数据的发送协程
func udpsendproc(){
	log.Println("start udpsendproc")
	//todo 使用udp协议拨号
	con,err:=net.DialUDP("udp",nil,
		&net.UDPAddr{
			IP:net.IPv4(192,168,0,255), // 这里代表网段,这个可以在部署的时候,抽离出去配置
			Port:3000,
		})
	defer con.Close()
	if err!=nil{
		log.Println(err.Error())
		return
	}
	// 通过的到的con发送消息
	// con.Write()
	for{
		select {
		case data := <- udpsendchan:
			_,err=con.Write(data)
			if err!=nil{
				log.Println(err.Error())
				return
			}
		}
	}
}
// 完成upd接收并处理功能
func udprecvproc(){
	log.Println("start udprecvproc")
	 //todo 监听udp广播端口
	 con,err:=net.ListenUDP("udp",&net.UDPAddr{
	 	IP:net.IPv4zero,
	 	Port:3000,
	 })
	 defer con.Close()
	 if err!=nil{log.Println(err.Error())}
	// 处理端口发过来的数据
	for{
		var buf [512]byte
		n,err:=con.Read(buf[0:])
		if err!=nil{
			log.Println(err.Error())
			return
		}
		//直接数据处理
		dispatch(buf[0:n])
	}
	log.Println("stop updrecvproc")
}
// 后端调度逻辑处理
func dispatch(data[]byte){
	//todo 解析data为message
	msg := Message{}
	err := json.Unmarshal(data,&msg)
	if err!=nil{
		log.Println(err.Error())
		return
	}
	// 根据cmd对逻辑进行处理
	switch msg.Cmd {
	case CMD_SINGLE_MSG:
		sendMsg(msg.Dstid,data)
	case CMD_ROOM_MSG:
		//todo 群聊转发逻辑
		for _,v:= range clientMap{
			if v.GroupSets.Has(msg.Dstid){
				v.DataQueue<-data
			}
		}
	case CMD_HEART:
		//todo 一般啥都不做
	}
}
// 发送消息
func sendMsg(userId int64,msg []byte) {
	rwlocker.RLock()
	node,ok:=clientMap[userId]
	rwlocker.RUnlock()
	if ok{
		node.DataQueue<- msg
	}
}
// 检测是否有效
func checkToken(userId int64,token string)bool{
	//从数据库里面查询并比对
	user := userService.Find(userId)
	return user.Token==token
}
nginx 反向代理
	upstream wsbackend {
			server 192.168.0.102:8080;
			server 192.168.0.100:8080;
			hash $request_uri;
	}
	map $http_upgrade $connection_upgrade {
	      default upgrade;
	      ''      close; 
	}
    server {
	  listen  80;
	  server_name localhost;
	  location / {
	   	   proxy_pass http://wsbackend;
	  }
	  location ^~ /chat {
		   proxy_pass http://wsbackend;
		   proxy_connect_timeout 500s;
	       proxy_read_timeout 500s;
		   proxy_send_timeout 500s;
		   proxy_set_header Upgrade $http_upgrade;
	       proxy_set_header Connection "Upgrade";
	  }
	 }
}
注意,这里在 server 192.168.0.102; server 192.168.0.100; 这两台服务器上启动 chat.exe 程序
 需要提前 go build 并进行相关部署,此处不在赘述,下面会有说明
打包部署
- 我们要打包应用,同时需要 asset 和 view 目录,这两个在 go build 中是不会被打包进去的
- 所以,我们要同时把两者和二进制程序一起进行部署
- 这里,我们写两个脚本文件,分别对应在 window 和 linux 平台的部署文件
- 这里说下,一般会借助 jenkins 来操作
1 )windows 下
rd /s/q release
md release
::go build -ldflags "-H windowsgui" -o chat.exe
go build -o chat.exe
COPY chat.exe release\
COPY favicon.ico release\favicon.ico
XCOPY asset\*.* release\asset\  /s /e
XCOPY view\*.* release\view\  /s /e
2 )Linux 下
#!/bin/sh
rm -rf ./release
mkdir  release
go build -o chat
chmod +x ./chat 
cp chat ./release/
cp favicon.ico ./release/ 
cp -arf ./asset ./release/
cp -arf ./view ./release/
- 注意,linux 下这里使用 nohup: nohup ./chat >>./log.log 2>&1 &
3 ) 总结
- 上面两种是分别在不同平台手动部署的样例
- 实际上,我们如果借助jenkins只需要配置下linux下的相关命令即可



















