Unity-Socket通信实例详解

news2025/5/9 17:20:06

今天我们来讲解socket通信。

首先我们需要知道什么是socket通信:

Socket本质上就是一个个进程之间网络通信的基础,每一个Socket由IP+端口组成,熟悉计网的同学应该知道IP主要是应用于IP协议而端口主要应用于TCP协议,这也证明了Socket通信是一个多个层共同工作的过程。

总结:Socket是网络编程的基石,通过简单API抽象底层协议,实现进程间灵活高效的数据交换。

现在我们用一个实例来看看具体的一个Socket通信是如何实现的,既然涉及到了网络通信,那当然需要一个客户端和一个服务器,我们就拿自己的电脑来同时作为客户端和服务器即可。

Server

我们从服务器开始。

首先来看一个大体服务器代码的作用:

Main

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

public class Main : MonoBehaviour
{
    SocketServer _server;
    private void Awake()
    {
        _server = new SocketServer("127.0.0.1", 6854);
        _server.OnConnect += (client) =>
        {
            UnityEngine.Debug.LogFormat("连接成功 >> IP:{0}", client.LocalEndPoint.ToString());
        };
        _server.OnDisconnect += (client) =>
        {
            UnityEngine.Debug.LogFormat("连接断开 >> IP:{0}", client.LocalEndPoint.ToString());
        };
        _server.OnReceive += (client, data) =>
        {
            UnityEngine.Debug.LogFormat("[{0}]接收到数据>>>{1} {2}", client.LocalEndPoint.ToString(), (SocketEvent)data.Type, data.Buff.Length);

            switch ((SocketEvent)data.Type)
            {
                case SocketEvent.sc_test:
                    UnityEngine.Debug.LogFormat("接收到测试数据 >>> {0}", System.Text.Encoding.UTF8.GetString(data.Data));
                    break;
            }
        };

    }
    private void Update()
    {
        if (Input.GetKeyDown(KeyCode.A))
        {
            // 踢出连接
            foreach (var item in _server.ClientInfoDic.Keys)
            {
                _server.KickOutAll();
            }

        }
    }
    private void OnDestroy()
    {
        // 注意由于Unity编译器环境下,游戏开启/关闭只影响主线程的开关,游戏关闭回调时需要通过Close函数来关闭服务端/客户端的线程。
        if (_server != null)
        {
            _server.Close();
        }
    }
}

光看这一段代码的话其实也看不出什么名堂,我们只知道有一个SocketServer类的实例_server,我们在Awake()函数中_server分别注册了三个事件,分别对应连接、断连和接受信息。在Update中我们如果检测到按键A我们把_server中的客户端信息全部清空,以及最后关闭服务器。

那显然我们的重心是来看看SocketServer类的代码内容了。

SocketServer



using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Timers;

public class SocketInfo
{
    public Socket Client;
    public Thread ReceiveThread;
    public long HeadTime;
}

/// <summary>
/// Socket服务端
/// </summary>
public class SocketServer
{
    /// <summary>
    /// 主线程
    /// </summary>
    private SynchronizationContext _mainThread;



    public string IP;
    public int Port;

    private const int HEAD_TIMEOUT = 5000;    // 心跳超时 毫秒
    private const int HEAD_CHECKTIME = 5000;   // 心跳包超时检测 毫秒

    public Dictionary<Socket, SocketInfo> ClientInfoDic = new Dictionary<Socket, SocketInfo>();

    private Socket _server;
    private Thread _connectThread;
    private System.Timers.Timer _headCheckTimer;
    private DataBuffer _dataBuffer = new DataBuffer();

    public event Action<Socket> OnConnect;  //客户端建立连接回调
    public event Action<Socket> OnDisconnect;  // 客户端断开连接回调
    public event Action<Socket, SocketDataPack> OnReceive;  // 接收报文回调
    public event Action<Socket, SocketDataPack> OnSend;  // 发送报文回调

    // 目前捕获异常将触发OnDisconnect回调 暂不单独处理
    // public event Action<SocketException> OnError;   // 异常捕获回调

    private bool _isValid = true;

    public SocketServer(string ip, int port)
    {
        _mainThread = SynchronizationContext.Current;

        IP = ip;
        Port = port;

        _server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        IPAddress ipAddress = IPAddress.Parse(IP);//解析IP地址
        _server.Bind(new IPEndPoint(ipAddress, Port));  //绑定IP地址:端口  

        _server.Listen(10);    //设定最多10个排队连接请求

        // 启动线程监听连接
        _connectThread = new Thread(ListenClientConnect);
        _connectThread.Start();

        // 心跳包定时检测
        _headCheckTimer = new System.Timers.Timer(HEAD_CHECKTIME);
        _headCheckTimer.AutoReset = true;
        _headCheckTimer.Elapsed += delegate (object sender, ElapsedEventArgs args)
        {
            CheckHeadTimeOut();
        };
        _headCheckTimer.Start();
    }
    /// <summary>  
    /// 监听客户端连接  
    /// </summary>  
    private void ListenClientConnect()
    {
        while (true)
        {
            try
            {
                if (!_isValid) break;
                Socket client = _server.Accept();
                Thread receiveThread = new Thread(ReceiveEvent);
                ClientInfoDic.Add(client, new SocketInfo() { Client = client, ReceiveThread = receiveThread, HeadTime = GetNowTime() });
                receiveThread.Start(client);

                PostMainThreadAction<Socket>(OnConnect, client);


            }
            catch
            {
                break;
            }

        }

    }
    /// <summary>
    /// 获取当前时间戳
    /// </summary>
    /// <returns></returns>
    private long GetNowTime()
    {
        TimeSpan ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0);
        return Convert.ToInt64(ts.TotalMilliseconds);
    }

    public void Send(Socket client, UInt16 e, byte[] buff = null, Action<SocketDataPack> onTrigger = null)
    {
        buff = buff ?? new byte[] { };
        var dataPack = new SocketDataPack(e, buff);
        var data = dataPack.Buff;
        try
        {
            client.BeginSend(data, 0, data.Length, SocketFlags.None, new AsyncCallback((asyncSend) =>
            {
                Socket c = (Socket)asyncSend.AsyncState;
                c.EndSend(asyncSend);
                PostMainThreadAction<SocketDataPack>(onTrigger, dataPack);
                PostMainThreadAction<Socket, SocketDataPack>(OnSend, client, dataPack);
            }), client);
        }
        catch (SocketException ex)
        {
            CloseClient(client);
            // onError(ex);
        }

    }
    /// <summary>
    /// 线程内接收数据的函数
    /// </summary>
    private void ReceiveEvent(object client)
    {
        Socket tsocket = (Socket)client;
        while (true)
        {
            if (!_isValid) return;
            if (!ClientInfoDic.ContainsKey(tsocket))
            {
                return;
            }
            try
            {
                byte[] rbytes = new byte[8 * 1024];
                int len = tsocket.Receive(rbytes);
                if (len > 0)
                {
                    _dataBuffer.AddBuffer(rbytes, len); // 将收到的数据添加到缓存器中
                    var dataPack = new SocketDataPack();
                    if (_dataBuffer.TryUnpack(out dataPack)) // 尝试解包
                    {
                        if (dataPack.Type == (UInt16)SocketEvent.sc_head)
                        {
                            // 接收到心跳包
                            ReceiveHead(tsocket);
                        }
                        else if (dataPack.Type == (UInt16)SocketEvent.sc_disconn)
                        {
                            // 客户端断开连接
                            CloseClient(tsocket);
                        }
                        else
                        {
                            // 收到消息
                            PostMainThreadAction<Socket, SocketDataPack>(OnReceive, tsocket, dataPack);
                        }

                    }
                }
                else
                {
                    if (tsocket.Poll(-1, SelectMode.SelectRead))
                    {
                        CloseClient(tsocket);
                        return;
                    }
                }
            }
            catch (SocketException ex)
            {
                CloseClient(tsocket);
                // onError(ex);
                return;
            }
        }
    }
    /// <summary>
    /// 接收到心跳包
    /// </summary>
    private void ReceiveHead(Socket client)
    {
        SocketInfo info;
        if (ClientInfoDic.TryGetValue(client, out info))
        {
            long now = GetNowTime();
            long offset = now - info.HeadTime;
            UnityEngine.Debug.Log("更新心跳时间戳 >>>" + now + "  间隔>>>" + offset);
            if (offset > HEAD_TIMEOUT)
            {
                // 心跳包收到但超时逻辑
            }
            info.HeadTime = now;
        }
    }
    /// <summary>
    /// 检测心跳包超时
    /// </summary>
    private void CheckHeadTimeOut()
    {
        var tempList = new List<Socket>();
        foreach (var socket in ClientInfoDic.Keys)
        {
            tempList.Add(socket);
        }
        foreach (var socket in tempList)
        {
            var info = ClientInfoDic[socket];
            long now = GetNowTime();
            long offset = now - info.HeadTime;
            if (offset > HEAD_TIMEOUT)
            {
                // 心跳包超时
                KickOut(socket);
            }
        }
    }
    public void KickOut(Socket client)
    {
        // 踢出连接
        Send(client, (UInt16)SocketEvent.sc_kickout, null, (dataPack) =>
        {
            CloseClient(client);
        });
    }
    public void KickOutAll()
    {
        var tempList = new List<Socket>();
        foreach (var socket in ClientInfoDic.Keys)
        {
            tempList.Add(socket);
        }
        foreach (var socket in tempList)
        {
            KickOut(socket);
        }
    }
    /// <summary>
    /// 清理客户端连接
    /// </summary>
    /// <param name="client"></param>
    private void CloseClient(Socket client)
    {
        PostMainThreadAction<Socket>((socket) =>
        {
            if (OnDisconnect != null) OnDisconnect(socket);
            ClientInfoDic.Remove(socket);
            socket.Close();
        }, client);

    }
    /// <summary>
    /// 关闭
    /// </summary>
    public void Close()
    {
        if (!_isValid) return;
        _isValid = false;
        // if (_connectThread != null) _connectThread.Abort();
        var tempList = new List<Socket>();
        foreach (var socket in ClientInfoDic.Keys)
        {
            tempList.Add(socket);
        }
        foreach (var socket in tempList)
        {
            CloseClient(socket);
        }
        if (_headCheckTimer != null)
        {
            _headCheckTimer.Stop();
            _headCheckTimer = null;
        }
        _server.Close();
    }

    // /// <summary>
    // /// 错误回调
    // /// </summary>
    // /// <param name="e"></param>
    // private void onError(SocketException ex)
    // {
    //     PostMainThreadAction<SocketException>(OnError, ex);
    // }


    // <summary>
    /// 通知主线程回调
    /// </summary>
    private void PostMainThreadAction(Action action)
    {
        _mainThread.Post(new SendOrPostCallback((o) =>
        {
            Action e = (Action)o.GetType().GetProperty("action").GetValue(o);
            if (e != null) e();
        }), new { action = action });
    }
    private void PostMainThreadAction<T>(Action<T> action, T arg1)
    {
        _mainThread.Post(new SendOrPostCallback((o) =>
        {
            Action<T> e = (Action<T>)o.GetType().GetProperty("action").GetValue(o);
            T t1 = (T)o.GetType().GetProperty("arg1").GetValue(o);
            if (e != null) e(t1);
        }), new { action = action, arg1 = arg1 });
    }
    public void PostMainThreadAction<T1, T2>(Action<T1, T2> action, T1 arg1, T2 arg2)
    {
        _mainThread.Post(new SendOrPostCallback((o) =>
        {
            Action<T1, T2> e = (Action<T1, T2>)o.GetType().GetProperty("action").GetValue(o);
            T1 t1 = (T1)o.GetType().GetProperty("arg1").GetValue(o);
            T2 t2 = (T2)o.GetType().GetProperty("arg2").GetValue(o);
            if (e != null) e(t1, t2);
        }), new { action = action, arg1 = arg1, arg2 = arg2 });
    }
}

非常长的代码内容啊,我们一点一点来看:

public class SocketInfo
{
    public Socket Client;
    public Thread ReceiveThread;
    public long HeadTime;
}

这是我们的Socket的信息,可以看到有Socket类的实例,对于服务器来说要处理的Socket类当然就是客户端的Socket,有一个线程和一个时间值,这个时间值的作用我们暂时按下不表。

    /// <summary>
    /// 主线程
    /// </summary>
    private SynchronizationContext _mainThread;
    public string IP;
    public int Port;
    private const int HEAD_TIMEOUT = 5000;    // 心跳超时 毫秒
    private const int HEAD_CHECKTIME = 5000;   // 心跳包超时检测 毫秒
    public Dictionary<Socket, SocketInfo> ClientInfoDic = new Dictionary<Socket, SocketInfo>();
    private Socket _server;
    private Thread _connectThread;
    private System.Timers.Timer _headCheckTimer;
    private DataBuffer _dataBuffer = new DataBuffer();
    public event Action<Socket> OnConnect;  //客户端建立连接回调
    public event Action<Socket> OnDisconnect;  // 客户端断开连接回调
    public event Action<Socket, SocketDataPack> OnReceive;  // 接收报文回调
    public event Action<Socket, SocketDataPack> OnSend;  // 发送报文回调
    // 目前捕获异常将触发OnDisconnect回调 暂不单独处理
    // public event Action<SocketException> OnError;   // 异常捕获回调
    private bool _isValid = true;

可以看到密密麻麻的一系列参数啊,这里就是我们SocketServer类的成员变量了,首先是这个我们似乎第一次见的类:SynchronizationContext。

看名字也知道这个类和异步操作以及上下文有关系,概括来说:

然后是我们的IP和端口,这个不多说。 然后是两个int时间值,还记得之前SocketInfo里定义的HeadTime吗?我们称其为心跳时间:

那在一个Socket网络通信中心跳时间的意义不用多说了吧,就是检查连接是否正常的一个时间阈值,具体是怎么个检查法我们后续介绍。

然后是一个 存储客户端Socket信息的字典,代表服务器的Socket类实例,一个线程,一个计时器,然后是一个数据缓冲类(自定义的),然后是一系列event(大家应该都知道什么是event吧?),更准确的说是Action,分别代表连接,断连,接收和发送。最后一个bool变量表示能否建立连接。

    public SocketServer(string ip, int port)
    {
        _mainThread = SynchronizationContext.Current;

        IP = ip;
        Port = port;

        _server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        IPAddress ipAddress = IPAddress.Parse(IP);//解析IP地址
        _server.Bind(new IPEndPoint(ipAddress, Port));  //绑定IP地址:端口  

        _server.Listen(10);    //设定最多10个排队连接请求

        // 启动线程监听连接
        _connectThread = new Thread(ListenClientConnect);
        _connectThread.Start();

        // 心跳包定时检测
        _headCheckTimer = new System.Timers.Timer(HEAD_CHECKTIME);
        _headCheckTimer.AutoReset = true;
        _headCheckTimer.Elapsed += delegate (object sender, ElapsedEventArgs args)
        {
            CheckHeadTimeOut();
        };
        _headCheckTimer.Start();
    }

SocketServer的有参构造,参数是IP和端口号。

我们把当前线程上下文给到_mainThread,IP和端口也给到。然后是服务器的初始化:

        _headCheckTimer.Elapsed += delegate (object sender, ElapsedEventArgs args)
        {
            CheckHeadTimeOut();
        };

这一系列初始化服务器的操作都是在调用Socket类内部的函数。

然后是启动我们线程的监听状态,然后启动我们的心跳包定时检测,注意我们在new一个计时器的构造函数的参数:

_headCheckTimer = new System.Timers.Timer(HEAD_CHECKTIME);

这里的HEAD_CHECKTIME代表时间间隔。

我们开启计时器的自动重置之后可以看到:

        _headCheckTimer.Elapsed += delegate (object sender, ElapsedEventArgs args)
        {
            CheckHeadTimeOut();
        };

 这是一个匿名委托:用法类似于匿名函数,我们直接写委托内容,随写随用,每次委托触发时执行CheckHeadTimeOut()函数。

这个过程中涉及到两个函数:

    /// <summary>  
    /// 监听客户端连接  
    /// </summary>  
    private void ListenClientConnect()
    {
        while (true)
        {
            try
            {
                if (!_isValid) break;
                Socket client = _server.Accept();
                Thread receiveThread = new Thread(ReceiveEvent);
                ClientInfoDic.Add(client, new SocketInfo() { Client = client, ReceiveThread = receiveThread, HeadTime = GetNowTime() });
                receiveThread.Start(client);

                PostMainThreadAction<Socket>(OnConnect, client);


            }
            catch
            {
                break;
            }

        }

    }

用try catch避免异常,从服务器处获取接受的客户端Socket类和线程,这里可以看到构造新线程的参数为ReceiveEvent,代表这个线程构造时就会绑定一个委托。

   /// <summary>
   /// 检测心跳包超时
   /// </summary>
   private void CheckHeadTimeOut()
   {
       var tempList = new List<Socket>();
       foreach (var socket in ClientInfoDic.Keys)
       {
           tempList.Add(socket);
       }
       foreach (var socket in tempList)
       {
           var info = ClientInfoDic[socket];
           long now = GetNowTime();
           long offset = now - info.HeadTime;
           if (offset > HEAD_TIMEOUT)
           {
               // 心跳包超时
               KickOut(socket);
           }
       }
   }

这个是我们在构造函数中关于心跳包超时检测的函数,我们创建一个存储Socket的list,把存储客户端信息的字典中的键更新到list中,然后获取当前时间之后减去客户端socket信息之中的HeadTime来得到时间偏差,如果这个时间偏差大于我们的允许的时间值我们就认为这个心跳包超时并执行KickOut函数。

这里又涉及到了两个函数:GetNowTime()和KickOut();

   /// <summary>
   /// 获取当前时间戳
   /// </summary>
   /// <returns></returns>
   private long GetNowTime()
   {
       TimeSpan ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0);
       return Convert.ToInt64(ts.TotalMilliseconds);
   }

这一段代码是一个获取当前时间的方法:

而关于KickOut():

    public void KickOut(Socket client)
    {
        // 踢出连接
        Send(client, (UInt16)SocketEvent.sc_kickout, null, (dataPack) =>
        {
            CloseClient(client);
        });
    }

似乎牵扯的函数越来越多了,我们来看看Send是什么:

    public void Send(Socket client, UInt16 e, byte[] buff = null, Action<SocketDataPack> onTrigger = null)
    {
        buff = buff ?? new byte[] { };
        var dataPack = new SocketDataPack(e, buff);
        var data = dataPack.Buff;
        try
        {
            client.BeginSend(data, 0, data.Length, SocketFlags.None, new AsyncCallback((asyncSend) =>
            {
                Socket c = (Socket)asyncSend.AsyncState;
                c.EndSend(asyncSend);
                PostMainThreadAction<SocketDataPack>(onTrigger, dataPack);
                PostMainThreadAction<Socket, SocketDataPack>(OnSend, client, dataPack);
            }), client);
        }
        catch (SocketException ex)
        {
            CloseClient(client);
            // onError(ex);
        }

    }

回到我们的KickOut()函数:

 我们实现了一个异步的解除客户端连接的方法:我们向客户端发送事件码告知其断开连接之后不会阻塞当前线程。

然后就是两个负责关闭连接的函数:

    private void CloseClient(Socket client)
    {
        PostMainThreadAction<Socket>((socket) =>
        {
            if (OnDisconnect != null) OnDisconnect(socket);
            ClientInfoDic.Remove(socket);
            socket.Close();
        }, client);

    }

这是关闭客户端连接的代码,其中的PostMainThreadAction<Socket>:

 的作用就是把这个Socket类型的实例放在主线程上进行操作。操作的内容包括检查是否有方法注册在OnDisconnect上,有的话就执行;然后从字典中删除相关Socket信息,关闭相关socket即可。

这里可能涉及到一个问题就是:为什么我们Socket的关闭一定要在主线程上执行呢?

   /// <summary>
   /// 关闭
   /// </summary>
   public void Close()
   {
       if (!_isValid) return;
       _isValid = false;
       // if (_connectThread != null) _connectThread.Abort();
       var tempList = new List<Socket>();
       foreach (var socket in ClientInfoDic.Keys)
       {
           tempList.Add(socket);
       }
       foreach (var socket in tempList)
       {
           CloseClient(socket);
       }
       if (_headCheckTimer != null)
       {
           _headCheckTimer.Stop();
           _headCheckTimer = null;
       }
       _server.Close();
   }

这个是关闭整个服务器的函数,我们把所有的客户端socket先关闭掉,然后停止计时器后最后关闭服务器。

这个是整个SocketServer类的内容,其中还有几个自定义类的内容我们没有介绍:


DataBuffer

代码如下:

using System;
/// <summary>
/// Socket传输过程的缓冲区,尝试拆包获得数据
/// </summary>
public class DataBuffer
{
    // 缓存区长度
    private const int MIN_BUFF_LEN = 1024;

    private byte[] _buff;
    private int _buffLength = 0;

    public DataBuffer(int minBuffLen = MIN_BUFF_LEN)
    {
        if (minBuffLen <= 0)
        {
            minBuffLen = MIN_BUFF_LEN;
        }
        _buff = new byte[minBuffLen];
    }

    /// <summary>
    /// 添加缓存数据
    /// </summary>
    public void AddBuffer(byte[] data, int len)
    {
        byte[] buff = new byte[len];
        Array.Copy(data, buff, len);
        if (len > _buff.Length - _buffLength)  //超过当前缓存
        {
            byte[] temp = new byte[_buffLength + len];
            Array.Copy(_buff, 0, temp, 0, _buffLength);
            Array.Copy(buff, 0, temp, _buffLength, len);
            _buff = temp;
        }
        else
        {
            Array.Copy(data, 0, _buff, _buffLength, len);
        }
        _buffLength += len;//修改当前数据标记
    }

    public bool TryUnpack(out SocketDataPack dataPack)
    {
        dataPack = SocketDataPack.Unpack(_buff);
        if (dataPack == null)
        {
            return false;
        }
        // 清理旧缓存
        _buffLength -= dataPack.BuffLength;
        byte[] temp = new byte[_buffLength < MIN_BUFF_LEN ? MIN_BUFF_LEN : _buffLength];
        Array.Copy(_buff, dataPack.BuffLength, temp, 0, _buffLength);
        _buff = temp;
        return true;
    }
}

我们来一点点解读:
 

    // 缓存区长度
    private const int MIN_BUFF_LEN = 1024;

    private byte[] _buff;
    private int _buffLength = 0;

定义了缓冲区的长度:注意这里的长度针对的是字节数,也就是至少1024个字节的缓冲区大小,其实也就是初始的缓冲区大小,一个字节数组和一个当前缓冲区长度。

    public DataBuffer(int minBuffLen = MIN_BUFF_LEN)
    {
        if (minBuffLen <= 0)
        {
            minBuffLen = MIN_BUFF_LEN;
        }
        _buff = new byte[minBuffLen];
    }

public的构造函数,给参数提供了默认参数,构造时自动生成一个设定好大小的字节数组。

    /// <summary>
    /// 添加缓存数据
    /// </summary>
    public void AddBuffer(byte[] data, int len)
    {
        byte[] buff = new byte[len];
        Array.Copy(data, buff, len);
        if (len > _buff.Length - _buffLength)  //超过当前缓存
        {
            byte[] temp = new byte[_buffLength + len];
            Array.Copy(_buff, 0, temp, 0, _buffLength);
            Array.Copy(buff, 0, temp, _buffLength, len);
            _buff = temp;
        }
        else
        {
            Array.Copy(data, 0, _buff, _buffLength, len);
        }
        _buffLength += len;//修改当前数据标记
    }

我们新生成一个长度为len的数组,然后把data数组拷贝到buff中,如果这个时候我们的len超过了缓冲区的大小我们需要去新开辟一个数组并把现有的数据拷贝到新开辟的数组中;否则我们直接复制即可,然后修改缓冲区长度。这里有一个C#的内置函数Copy。

最后的一个函数:

    public bool TryUnpack(out SocketDataPack dataPack)
    {
        dataPack = SocketDataPack.Unpack(_buff);
        if (dataPack == null)
        {
            return false;
        }
        // 清理旧缓存
        _buffLength -= dataPack.BuffLength;
        byte[] temp = new byte[_buffLength < MIN_BUFF_LEN ? MIN_BUFF_LEN : _buffLength];
        Array.Copy(_buff, dataPack.BuffLength, temp, 0, _buffLength);
        _buff = temp;
        return true;
    }

这是一个拆包的函数,我们将缓冲区的Socket数据包进行拆包,如果包是空的则返回false(表示缓冲区内数据不足,无法组成完整的包),否则将数据从缓冲区移除,具体来说首先更新缓冲区大小,检查缓冲区剩余容量保证不低于最低容量,将未使用的缓冲数据从后续位置移动(复制到)缓冲区前端方便使用。

可以看到这个函数中有一个我们没有说过的SocketDataPack类。

SocketDataPack

Socket数据包的代码如下,以下简称数据包。

using System;
using System.IO;
/// <summary>
/// Socket通信过程中的数据包 处理具体拆包装包逻辑
/// </summary>
public class SocketDataPack
{
    // 消息:数据总长度(4byte) + 数据类型(2byte) + 数据(N byte)
    public static int HEAD_DATA_LEN = 4;
    public static int HEAD_TYPE_LEN = 2;
    public static int HEAD_LEN
    {
        get { return HEAD_DATA_LEN + HEAD_TYPE_LEN; }
    }

    /// <summary>
    /// 数据包类型
    /// </summary>
    public UInt16 Type;
    /// <summary>
    /// 数据包数据
    /// </summary>
    public byte[] Data;
    public byte[] Buff;

    public int BuffLength
    {
        get { return Buff.Length; }
    }
    public int DataLength
    {
        get { return Data.Length; }
    }

    public SocketDataPack()
    {

    }
    public SocketDataPack(UInt16 type, byte[] data)
    {
        Type = type;
        Data = data;

        Buff = GetBuff(Type, Data);
    }

    public static byte[] GetBuff(UInt16 type, byte[] data)
    {
        byte[] buff = new byte[data.Length + HEAD_LEN];
        byte[] temp;
        temp = BitConverter.GetBytes(buff.Length);
        Array.Copy(temp, 0, buff, 0, HEAD_DATA_LEN);
        temp = BitConverter.GetBytes(type);
        Array.Copy(temp, 0, buff, HEAD_DATA_LEN, HEAD_TYPE_LEN);

        Array.Copy(data, 0, buff, HEAD_LEN, data.Length);

        return buff;
    }

    public static SocketDataPack Unpack(byte[] buff)
    {
        try
        {
            if (buff.Length < HEAD_LEN)
            {
                // 头部没取完则返回
                return null;
            }
            byte[] temp;
            // 取数据长度
            temp = new byte[HEAD_DATA_LEN];
            Array.Copy(buff, 0, temp, 0, HEAD_DATA_LEN);
            int buffLength = BitConverter.ToInt32(temp, 0);
            if (buffLength <= 0) return null;
            if (buffLength > buff.Length)
            {
                // 数据没取完
                return null;
            }
            int dataLength = buffLength - HEAD_LEN;
            // 取数据类型
            temp = new byte[HEAD_TYPE_LEN];
            Array.Copy(buff, HEAD_DATA_LEN, temp, 0, HEAD_TYPE_LEN);
            UInt16 dataType = BitConverter.ToUInt16(temp, 0);
            // 取数据
            byte[] data = new byte[dataLength];
            Array.Copy(buff, HEAD_LEN, data, 0, dataLength);

            var dataPack = new SocketDataPack(dataType, data);
            // UnityEngine.Debug.LogFormat("buffLen:{0} type:{1} dataLength:{2}", buffLength, dataType, data.Length);

            return dataPack;

        }
        catch
        {
            // 存在不完整数据解包 则返回null
            return null;
        }


    }
}

首先看看成员变量:

    // 消息:数据总长度(4byte) + 数据类型(2byte) + 数据(N byte)
    public static int HEAD_DATA_LEN = 4;
    public static int HEAD_TYPE_LEN = 2;
    public static int HEAD_LEN
    {
        get { return HEAD_DATA_LEN + HEAD_TYPE_LEN; }
    }

    /// <summary>
    /// 数据包类型
    /// </summary>
    public UInt16 Type;
    /// <summary>
    /// 数据包数据
    /// </summary>
    public byte[] Data;
    public byte[] Buff;

定义了数据包的格式:数据长度为4,类型长度为2,然后是数据本身,设置为一个只读的属性,长度为前二者之和。

数据包类型使用一个UInt16的数据类型来表示,数据分为Data和Buff两种。

    public int BuffLength
    {
        get { return Buff.Length; }
    }
    public int DataLength
    {
        get { return Data.Length; }
    }

这两个也是只读的属性,返回的是Data和Buff类型数据的长度。

    public SocketDataPack(UInt16 type, byte[] data)
    {
        Type = type;
        Data = data;

        Buff = GetBuff(Type, Data);
    }

有参构造,参数就是类型和数据,然后缓冲由GetBuff函数得到。

    public static byte[] GetBuff(UInt16 type, byte[] data)
    {
        byte[] buff = new byte[data.Length + HEAD_LEN];
        byte[] temp;
        temp = BitConverter.GetBytes(buff.Length);
        Array.Copy(temp, 0, buff, 0, HEAD_DATA_LEN);
        temp = BitConverter.GetBytes(type);
        Array.Copy(temp, 0, buff, HEAD_DATA_LEN, HEAD_TYPE_LEN);

        Array.Copy(data, 0, buff, HEAD_LEN, data.Length);

        return buff;
    }

GetBuff函数就是一个根据类型和数据来获取缓冲的函数,我们新生成一个长度为数据长度加上头部长度的数组,然后我们使用BitConverter.GetBytes函数来生成字节流之后把这些字节流丢到中,更准确地说,我们把缓冲区的长度信息,类型信息(都转换为字节流)以及具体的数据都拷贝到数组中。

关于为什么要转换为字节流:

public static SocketDataPack Unpack(byte[] buff)
{
    try
    {
        if (buff.Length < HEAD_LEN)
        {
            // 头部没取完则返回
            return null;
        }
        byte[] temp;
        // 取数据长度
        temp = new byte[HEAD_DATA_LEN];
        Array.Copy(buff, 0, temp, 0, HEAD_DATA_LEN);
        int buffLength = BitConverter.ToInt32(temp, 0);
        if (buffLength <= 0) return null;
        if (buffLength > buff.Length)
        {
            // 数据没取完
            return null;
        }
        int dataLength = buffLength - HEAD_LEN;
        // 取数据类型
        temp = new byte[HEAD_TYPE_LEN];
        Array.Copy(buff, HEAD_DATA_LEN, temp, 0, HEAD_TYPE_LEN);
        UInt16 dataType = BitConverter.ToUInt16(temp, 0);
        // 取数据
        byte[] data = new byte[dataLength];
        Array.Copy(buff, HEAD_LEN, data, 0, dataLength);

        var dataPack = new SocketDataPack(dataType, data);
        // UnityEngine.Debug.LogFormat("buffLen:{0} type:{1} dataLength:{2}", buffLength, dataType, data.Length);

        return dataPack;

    }
    catch
    {
        // 存在不完整数据解包 则返回null
        return null;
    }


}

这里是我们数据包的拆包函数,参数是一个字节数组,我们首先检测这个数组长度如果没有头部长度大的话说明这个数组的内容根本不完整,直接返回null。接着我们分别从参数传递的数组中取长度信息、类型信息以及数据本身,将其复制到数组中,最后生成SocketDataPack类型的数据包并返回。

小小的总结一下我们的Server代码的作用:

对于我们的服务器来说,最重要的部分是通过心跳包来检查连接是否正常,以及接收信息之后的各种回调事件。

Client

客户端这边的DataBuffer和SocketDataPack的内容是完全相同的,主要是Main和SocketClient的区别:

Main

内容如下:
 

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

public class Main : MonoBehaviour
{
    SocketClient _client;
    private void Awake()
    {

        _client = new SocketClient("127.0.0.1", 6854);
        _client.OnDisconnect += () =>
        {
            UnityEngine.Debug.Log("断开连接");
        };

        _client.OnReceive += (dataPack) =>
        {
            UnityEngine.Debug.LogFormat("接收数据>>>{0}", (SocketEvent)dataPack.Type);
        };
        _client.OnSend += (dataPack) =>
        {
            UnityEngine.Debug.LogFormat("发送数据>>>{0}", (SocketEvent)dataPack.Type);
        };
        _client.OnError += (ex) =>
        {
            UnityEngine.Debug.LogFormat("出现异常>>>{0}", ex);
        };

        _client.OnReConnectSuccess += (num) =>
        {
            UnityEngine.Debug.LogFormat("第{0}次重连成功", num);
        };
        _client.OnReConnectError += (num) =>
        {
            UnityEngine.Debug.LogFormat("第{0}次重连失败", num);
        };
        _client.OnReconnecting += (num) =>
        {
            UnityEngine.Debug.LogFormat("正在进行第{0}次重连", num);
        };


        _client.Connect(() =>
        {
            UnityEngine.Debug.Log("连接成功");

            // _client.DisConnect();
        }, () =>
        {
            UnityEngine.Debug.Log("连接失败");
        });

    }

    private void Update()
    {
    }

    public void ClickSendTest()
    {
        var bytes = System.Text.Encoding.UTF8.GetBytes("我是测试数据");
        _client.Send((System.UInt16)SocketEvent.sc_test, bytes);
    }

    public void ClickDisConnect()
    {
        _client.DisConnect();
    }

    private void OnDestroy()
    {
        // 注意由于Unity编译器环境下,游戏开启/关闭只影响主线程的开关,游戏关闭回调时需要通过Close函数来关闭服务端/客户端的线程。
        if (_client != null)
        {
            _client.Close();
        }

    }
}

可以看到客户端的Main函数内容多得多。

首先是一个SocketClient类的实例,然后就是一系列的委托事件,分别代表:断开连接、接受消息、发送消息、发现错误、第num次重连成功、第num次重连失败、正在进行第num次重连。

然后是一个关于连接成功与否的包含两个lambda参数的方法:将是否连接成功打印出来。

然后是三个函数:点击发送测试消息、点击断开连接、关闭客户端socket。

SocketClient

这是客户端的Socket代码:



using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Timers;



/// <summary>
/// Socket客户端
/// </summary>
public class SocketClient
{
    /// <summary>
    /// 主线程
    /// </summary>
    private SynchronizationContext _mainThread;


    public string IP;
    public int Port;

    private const int TIMEOUT_CONNECT = 3000;   // 连接超时时间 毫秒
    private const int TIMEOUT_SEND = 3000;  // 发送超时时间 毫秒
    private const int TIMEOUT_RECEIVE = 3000;   //接收超时时间 毫秒

    private const int HEAD_OFFSET = 2000; //心跳包发送间隔 毫秒
    private const int RECONN_MAX_SUM = 3;   //最大重连次数

    private Socket _client;
    private Thread _receiveThread;
    private System.Timers.Timer _connTimeoutTimer;
    private System.Timers.Timer _headTimer;
    private DataBuffer _dataBuffer = new DataBuffer();

    public event Action OnConnectSuccess;    // 连接成功回调
    public event Action OnConnectError;    // 连接失败回调
    public event Action OnDisconnect;  // 断开回调
    public event Action<SocketDataPack> OnReceive;  // 接收报文回调
    public event Action<SocketDataPack> OnSend;  // 发送报文回调
    public event Action<SocketException> OnError;   // 异常捕获回调
    public event Action<int> OnReConnectSuccess; // 重连成功回调
    public event Action<int> OnReConnectError; // 单次重连失败回调
    public event Action<int> OnReconnecting;  // 单次重连中回调

    private bool _isConnect = false;
    private bool _isReconnect = false;


    public SocketClient(string ip, int port)
    {
        _mainThread = SynchronizationContext.Current;

        IP = ip;
        Port = port;
    }
    public void Connect(Action success = null, Action error = null)
    {
        Action<bool> onTrigger = (flag) =>
        {
            if (flag)
            {
                PostMainThreadAction(success);
                PostMainThreadAction(OnConnectSuccess);
            }
            else
            {
                PostMainThreadAction(error);
                PostMainThreadAction(OnConnectError);
            }
            if (_connTimeoutTimer != null)
            {
                _connTimeoutTimer.Stop();
                _connTimeoutTimer = null;
            }
        };
        try
        {
            _client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);//创建套接字
            _client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, TIMEOUT_SEND);
            _client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, TIMEOUT_RECEIVE);
            IPAddress ipAddress = IPAddress.Parse(IP);//解析IP地址
            IPEndPoint ipEndpoint = new IPEndPoint(ipAddress, Port);
            IAsyncResult result = _client.BeginConnect(ipEndpoint, new AsyncCallback((iar) =>
            {
                try
                {
                    Socket client = (Socket)iar.AsyncState;
                    client.EndConnect(iar);

                    _isConnect = true;
                    // 开始发送心跳包
                    _headTimer = new System.Timers.Timer(HEAD_OFFSET);
                    _headTimer.AutoReset = true;
                    _headTimer.Elapsed += delegate (object sender, ElapsedEventArgs args)
                    {
                        Send((UInt16)SocketEvent.sc_head);
                    };
                    _headTimer.Start();

                    // 开始接收数据
                    _receiveThread = new Thread(new ThreadStart(ReceiveEvent));
                    _receiveThread.IsBackground = true;
                    _receiveThread.Start();

                    onTrigger(true);
                }
                catch (SocketException ex)
                {
                    onTrigger(false);
                }
            }), _client);//异步连接

            _connTimeoutTimer = new System.Timers.Timer(TIMEOUT_CONNECT);
            _connTimeoutTimer.AutoReset = false;
            _connTimeoutTimer.Elapsed += delegate (object sender, ElapsedEventArgs args)
            {
                onTrigger(false);
            };
            _connTimeoutTimer.Start();

        }
        catch (SocketException ex)
        {
            onTrigger(false);
            // throw;
        }
    }
    /// <summary>
    /// 断线重连
    /// </summary>
    /// <param name="num"></param>
    public void ReConnect(int num = RECONN_MAX_SUM, int index = 0)
    {
        _isReconnect = true;

        num--;
        index++;
        if (num < 0)
        {
            onDisconnect();
            _isReconnect = false;
            return;
        }
        PostMainThreadAction<int>(OnReconnecting, index);
        Connect(() =>
        {
            PostMainThreadAction<int>(OnReConnectSuccess, index);
            _isReconnect = false;
        }, () =>
        {
            PostMainThreadAction<int>(OnReConnectError, index);
            ReConnect(num, index);
        });

    }
    public void Send(UInt16 e, byte[] buff = null, Action<SocketDataPack> onTrigger = null)
    {
        buff = buff ?? new byte[] { };
        var dataPack = new SocketDataPack(e, buff);
        var data = dataPack.Buff;
        try
        {
            _client.BeginSend(data, 0, data.Length, SocketFlags.None, new AsyncCallback((asyncSend) =>
            {
                Socket c = (Socket)asyncSend.AsyncState;
                c.EndSend(asyncSend);
                PostMainThreadAction<SocketDataPack>(onTrigger, dataPack);
                PostMainThreadAction<SocketDataPack>(OnSend, dataPack);
            }), _client);
        }
        catch (SocketException ex)
        {
            onError(ex);
        }

    }
    /// <summary>
    /// 线程内接收数据的函数
    /// </summary>
    private void ReceiveEvent()
    {
        while (true)
        {
            try
            {
                if (!_isConnect) break;
                if (_client.Available <= 0) continue;
                byte[] rbytes = new byte[8 * 1024];
                int len = _client.Receive(rbytes);
                if (len > 0)
                {
                    _dataBuffer.AddBuffer(rbytes, len); // 将收到的数据添加到缓存器中
                    var dataPack = new SocketDataPack();
                    if (_dataBuffer.TryUnpack(out dataPack)) // 尝试解包
                    {
                        if (dataPack.Type == (UInt16)SocketEvent.sc_kickout)
                        {
                            // 服务端踢出
                            onDisconnect();
                        }
                        else
                        {
                            // 收到消息
                            PostMainThreadAction<SocketDataPack>(OnReceive, dataPack);
                        }
                    }
                }
            }
            catch (SocketException ex)
            {
                onError(ex);
                // throw;
            }
        }
    }
    /// <summary>
    /// 业务逻辑 - 客户端主动断开
    /// </summary>
    public void DisConnect()
    {
        Send((UInt16)SocketEvent.sc_disconn);
        onDisconnect();
    }

    /// <summary>
    /// 缓存数据清理
    /// </summary>
    public void Close()
    {
        if (!_isConnect) return;
        _isConnect = false;
        if (_headTimer != null)
        {
            _headTimer.Stop();
            _headTimer = null;
        }
        // if (_receiveThread != null)
        // {
        //     _receiveThread.Abort();
        //     _receiveThread = null;
        // }
        if (_connTimeoutTimer != null)
        {
            _connTimeoutTimer.Stop();
            _connTimeoutTimer = null;
        }
        if (_client != null)
        {
            _client.Close();
            _client = null;
        }



    }


    /// <summary>
    /// 错误回调
    /// </summary>
    /// <param name="e"></param>
    private void onError(SocketException ex)
    {
        Close();

        PostMainThreadAction<SocketException>(OnError, ex);

        if (!_isReconnect)
        {
            ReConnect();
        }
    }


    /// <summary>
    /// 断开回调
    /// </summary>
    private void onDisconnect()
    {

        Close();

        PostMainThreadAction(OnDisconnect);
    }

    /// <summary>
    /// 通知主线程回调
    /// </summary>
    private void PostMainThreadAction(Action action)
    {
        _mainThread.Post(new SendOrPostCallback((o) =>
        {
            Action e = (Action)o.GetType().GetProperty("action").GetValue(o);
            if (e != null) e();
        }), new { action = action });
    }
    private void PostMainThreadAction<T>(Action<T> action, T arg1)
    {
        _mainThread.Post(new SendOrPostCallback((o) =>
        {
            Action<T> e = (Action<T>)o.GetType().GetProperty("action").GetValue(o);
            T t1 = (T)o.GetType().GetProperty("arg1").GetValue(o);
            if (e != null) e(t1);
        }), new { action = action, arg1 = arg1 });
    }
    public void PostMainThreadAction<T1, T2>(Action<T1, T2> action, T1 arg1, T2 arg2)
    {
        _mainThread.Post(new SendOrPostCallback((o) =>
        {
            Action<T1, T2> e = (Action<T1, T2>)o.GetType().GetProperty("action").GetValue(o);
            T1 t1 = (T1)o.GetType().GetProperty("arg1").GetValue(o);
            T2 t2 = (T2)o.GetType().GetProperty("arg2").GetValue(o);
            if (e != null) e(t1, t2);
        }), new { action = action, arg1 = arg1, arg2 = arg2 });
    }
}

我们依然先从成员变量开始说起:

    /// <summary>
    /// 主线程
    /// </summary>
    private SynchronizationContext _mainThread;


    public string IP;
    public int Port;

    private const int TIMEOUT_CONNECT = 3000;   // 连接超时时间 毫秒
    private const int TIMEOUT_SEND = 3000;  // 发送超时时间 毫秒
    private const int TIMEOUT_RECEIVE = 3000;   //接收超时时间 毫秒

    private const int HEAD_OFFSET = 2000; //心跳包发送间隔 毫秒
    private const int RECONN_MAX_SUM = 3;   //最大重连次数

    private Socket _client;
    private Thread _receiveThread;
    private System.Timers.Timer _connTimeoutTimer;
    private System.Timers.Timer _headTimer;
    private DataBuffer _dataBuffer = new DataBuffer();

    public event Action OnConnectSuccess;    // 连接成功回调
    public event Action OnConnectError;    // 连接失败回调
    public event Action OnDisconnect;  // 断开回调
    public event Action<SocketDataPack> OnReceive;  // 接收报文回调
    public event Action<SocketDataPack> OnSend;  // 发送报文回调
    public event Action<SocketException> OnError;   // 异常捕获回调
    public event Action<int> OnReConnectSuccess; // 重连成功回调
    public event Action<int> OnReConnectError; // 单次重连失败回调
    public event Action<int> OnReconnecting;  // 单次重连中回调

    private bool _isConnect = false;
    private bool _isReconnect = false;

依然是主线程的线程上下文,端口,IP,然后是连接超时的时间、发送超时时间、接收超时时间,都设置为3000ms(就是3s),然后是心跳包发送的最大间隔为2000ms,以及最大的重连次数为3。

然后是一个Socket类的实例client,一个接收线程,一个用于连接的计时器和一个心跳计时器,以及一个数据缓冲区。

然后是一系列event,注释里都有写明,我就不多赘述。

最后是两个bool变量表示是否连接以及是否重连。

    public SocketClient(string ip, int port)
    {
        _mainThread = SynchronizationContext.Current;

        IP = ip;
        Port = port;
    }

有参构造:把线程上下文给到主线程,IP和端口都同步。

    public void Connect(Action success = null, Action error = null)
    {
        Action<bool> onTrigger = (flag) =>
        {
            if (flag)
            {
                PostMainThreadAction(success);
                PostMainThreadAction(OnConnectSuccess);
            }
            else
            {
                PostMainThreadAction(error);
                PostMainThreadAction(OnConnectError);
            }
            if (_connTimeoutTimer != null)
            {
                _connTimeoutTimer.Stop();
                _connTimeoutTimer = null;
            }
        };
        try
        {
            _client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);//创建套接字
            _client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, TIMEOUT_SEND);
            _client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, TIMEOUT_RECEIVE);
            IPAddress ipAddress = IPAddress.Parse(IP);//解析IP地址
            IPEndPoint ipEndpoint = new IPEndPoint(ipAddress, Port);
            IAsyncResult result = _client.BeginConnect(ipEndpoint, new AsyncCallback((iar) =>
            {
                try
                {
                    Socket client = (Socket)iar.AsyncState;
                    client.EndConnect(iar);

                    _isConnect = true;
                    // 开始发送心跳包
                    _headTimer = new System.Timers.Timer(HEAD_OFFSET);
                    _headTimer.AutoReset = true;
                    _headTimer.Elapsed += delegate (object sender, ElapsedEventArgs args)
                    {
                        Send((UInt16)SocketEvent.sc_head);
                    };
                    _headTimer.Start();

                    // 开始接收数据
                    _receiveThread = new Thread(new ThreadStart(ReceiveEvent));
                    _receiveThread.IsBackground = true;
                    _receiveThread.Start();

                    onTrigger(true);
                }
                catch (SocketException ex)
                {
                    onTrigger(false);
                }
            }), _client);//异步连接

            _connTimeoutTimer = new System.Timers.Timer(TIMEOUT_CONNECT);
            _connTimeoutTimer.AutoReset = false;
            _connTimeoutTimer.Elapsed += delegate (object sender, ElapsedEventArgs args)
            {
                onTrigger(false);
            };
            _connTimeoutTimer.Start();

        }
        catch (SocketException ex)
        {
            onTrigger(false);
            // throw;
        }
    }

这是我们的连接函数,参数中包含了两个action,分别表示连接是否成功。

然后是一个名为onTrigger的接收参数类型为bool类型的Action委托,接收flag参数来决定回调函数以及计时器的处理。如果flag为true则在主线程中触发success委托和OnConnectSuccess委托,否则触发error委托和OnConnectError委托,此时如果存在连接超时计时器则暂停计时并清空。

为什么要清空计时器?

然后是一系列的Socket对象实例和设置,我们创建一个基于IPV4的TCP流式套接字对象,并设置他的发送消息超时阈值和接收消息超时阈值,最后解析得到源IP和目标IP。

接着是发起连接的函数,这里我们采用异步连接的方式,也就是APM模式(Asynchronous Programming Model,异步编程模型):

其中涉及到的核心:IAsyncResult对象的概念:

 

 我们将客户端的socket对象作为状态对象传入函数参数中,可以看到有一个从iar.AsyncState冲取出套接字的过程,然后客户端结束连接的异步操作。

为什么要在这里执行EndConnect呢?

后续就是一系列的连接成功后要处理的内容比如生成计时器,发送心跳包和生成接收消息的线程。当然,如果连接失败的话,我们就返回OnTrigger(false)。 

后续是一个单次触发的连接超时计时器(AutoReset=false代表单次触发),触发后执行OnTrigger(false)。

/// <summary>
/// 断线重连
/// </summary>
/// <param name="num"></param>
public void ReConnect(int num = RECONN_MAX_SUM, int index = 0)
{
    _isReconnect = true;

    num--;
    index++;
    if (num < 0)
    {
        onDisconnect();
        _isReconnect = false;
        return;
    }
    PostMainThreadAction<int>(OnReconnecting, index);
    Connect(() =>
    {
        PostMainThreadAction<int>(OnReConnectSuccess, index);
        _isReconnect = false;
    }, () =>
    {
        PostMainThreadAction<int>(OnReConnectError, index);
        ReConnect(num, index);
    });

}

然后是我们重连的函数,我们接收的参数最大的重连次数和重连次数的序号。每次重连都更新isReconnect和最大重连次数和重连次数的序号,如果已经没有最大的重连次数我们就放弃尝试重连。通知主线程执行重连回调函数,然后返回Connect根据是否连接成功返回的两个回调函数。

    public void Send(UInt16 e, byte[] buff = null, Action<SocketDataPack> onTrigger = null)
    {
        buff = buff ?? new byte[] { };
        var dataPack = new SocketDataPack(e, buff);
        var data = dataPack.Buff;
        try
        {
            _client.BeginSend(data, 0, data.Length, SocketFlags.None, new AsyncCallback((asyncSend) =>
            {
                Socket c = (Socket)asyncSend.AsyncState;
                c.EndSend(asyncSend);
                PostMainThreadAction<SocketDataPack>(onTrigger, dataPack);
                PostMainThreadAction<SocketDataPack>(OnSend, dataPack);
            }), _client);
        }
        catch (SocketException ex)
        {
            onError(ex);
        }

    }

Send函数,上来有一个null的合并运算符:

 我们将数据类型和数据封装成data,然后执行Socket的BeginSend和EndSend异步操作,并通知主线程执行OnTrigger回调和OnSend回调。

    /// <summary>
    /// 线程内接收数据的函数
    /// </summary>
    private void ReceiveEvent()
    {
        while (true)
        {
            try
            {
                if (!_isConnect) break;
                if (_client.Available <= 0) continue;
                byte[] rbytes = new byte[8 * 1024];
                int len = _client.Receive(rbytes);
                if (len > 0)
                {
                    _dataBuffer.AddBuffer(rbytes, len); // 将收到的数据添加到缓存器中
                    var dataPack = new SocketDataPack();
                    if (_dataBuffer.TryUnpack(out dataPack)) // 尝试解包
                    {
                        if (dataPack.Type == (UInt16)SocketEvent.sc_kickout)
                        {
                            // 服务端踢出
                            onDisconnect();
                        }
                        else
                        {
                            // 收到消息
                            PostMainThreadAction<SocketDataPack>(OnReceive, dataPack);
                        }
                    }
                }
            }
            catch (SocketException ex)
            {
                onError(ex);
                // throw;
            }
        }
    }

接收消息的函数,我们利用while(true)来实时监听Socket数据流,检查连接状态以及socket是否有数据,这里使用了一个Available。

 如果缓冲区无数据可读而依然执行Receive的话可能会导致CPU的空转:

有数据的话我们就去接收数据,生成一个新的数组进行数据的接收,接收到的数据我们丢到缓冲区中,并尝试解包,如果解包函数返回的类型是断开连接,意味着服务器主动要求客户端断开连接,这时候我们就会去执行断开连接,否则我们都会通知主线程来执行OnReceive回调。

/// <summary>
/// 业务逻辑 - 客户端主动断开
/// </summary>
public void DisConnect()
{
    Send((UInt16)SocketEvent.sc_disconn);
    onDisconnect();
}

主动断开连接的函数,我们会向服务器发送预定义好的断开连接的事件码,然后执行断开连接的委托。

/// <summary>
/// 缓存数据清理
/// </summary>
public void Close()
{
    if (!_isConnect) return;
    _isConnect = false;
    if (_headTimer != null)
    {
        _headTimer.Stop();
        _headTimer = null;
    }
    // if (_receiveThread != null)
    // {
    //     _receiveThread.Abort();
    //     _receiveThread = null;
    // }
    if (_connTimeoutTimer != null)
    {
        _connTimeoutTimer.Stop();
        _connTimeoutTimer = null;
    }
    if (_client != null)
    {
        _client.Close();
        _client = null;
    }

}

关闭客户端的操作就是将一系列连接状态和心跳包还有计数器和socket本身全部关闭。

    /// <summary>
    /// 错误回调
    /// </summary>
    /// <param name="e"></param>
    private void onError(SocketException ex)
    {
        Close();

        PostMainThreadAction<SocketException>(OnError, ex);

        if (!_isReconnect)
        {
            ReConnect();
        }
    }

错误时执行的回调函数,首先是执行关闭客户端,然后通知主线程执行报错的委托,同时自动尝试重连。

    /// <summary>
    /// 断开回调
    /// </summary>
    private void onDisconnect()
    {

        Close();

        PostMainThreadAction(OnDisconnect);
    }

断开连接的回调。

   /// <summary>
   /// 通知主线程回调
   /// </summary>
   private void PostMainThreadAction(Action action)
   {
       _mainThread.Post(new SendOrPostCallback((o) =>
       {
           Action e = (Action)o.GetType().GetProperty("action").GetValue(o);
           if (e != null) e();
       }), new { action = action });
   }
   private void PostMainThreadAction<T>(Action<T> action, T arg1)
   {
       _mainThread.Post(new SendOrPostCallback((o) =>
       {
           Action<T> e = (Action<T>)o.GetType().GetProperty("action").GetValue(o);
           T t1 = (T)o.GetType().GetProperty("arg1").GetValue(o);
           if (e != null) e(t1);
       }), new { action = action, arg1 = arg1 });
   }
   public void PostMainThreadAction<T1, T2>(Action<T1, T2> action, T1 arg1, T2 arg2)
   {
       _mainThread.Post(new SendOrPostCallback((o) =>
       {
           Action<T1, T2> e = (Action<T1, T2>)o.GetType().GetProperty("action").GetValue(o);
           T1 t1 = (T1)o.GetType().GetProperty("arg1").GetValue(o);
           T2 t2 = (T2)o.GetType().GetProperty("arg2").GetValue(o);
           if (e != null) e(t1, t2);
       }), new { action = action, arg1 = arg1, arg2 = arg2 });
   }

通知主线程执行委托的函数,这里是三个参数不同的版本,这里的代码格式有些复杂:

       _mainThread.Post(new SendOrPostCallback((o) =>
       {
           Action e = (Action)o.GetType().GetProperty("action").GetValue(o);
           if (e != null) e();
       }), new { action = action });

中的(o) =>{...}是一个lambda表达式,表示接收参数为o的一个匿名函数,函数内部的内容是从o处获取运行时类型,从中获取运行时属性中名为“action”的属性,如果有的话获取其值并转换成Action类型给到e。在o处获取的action会赋值给我们新生成的名为action匿名变量,这一步的目的是:

客户端的功能总结如下:

Test

大体上这就是我们整个项目的代码了,我们来看看最终的效果如何吧:

这是服务器的打印内容。 

这是客户端的打印内容。

我们先测试发送测试消息:

 客户端发送测试消息:

服务器接收到测试消息。

客户端断开连接:

 

服务器的打印信息。

现在我们再来测试主动断开服务器:

客户端开始重连。

 到达最大重连次数之后就断开连接。

我们再开启服务器之后发送测试信息:

就这样我们实现了一个基于C#的Socket通信项目。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2371654.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MATLAB仿真定点数转浮点数(对比VIVADO定点转浮点)

MATLAB仿真定点数转浮点数 定点数可设置位宽&#xff0c;小数位宽&#xff1b;浮点数是单精度浮点数 对比VIVADO定点转浮点 目录 前言 一、定点数 二、浮点数 三、定点数转浮点数 四、函数代码 总结 前言 在FPGA上实现算法时&#xff0c;相比MATLAB实现往往需要更长的开发…

【计算机网络】Cookie、Session、Token之间有什么区别?

大家在日常使用浏览器时可能会遇到&#xff1a;是否清理Cookie&#xff1f;这个问题。 那么什么是Cookie呢&#xff1f;与此相关的还有Session、Token这些。这两个又是什么呢&#xff1f; 本文将对这三个进行讲解区分&#xff0c;如果对小伙伴有帮助的话&#xff0c;也请点赞、…

SpringCloud服务拆分:Nacos服务注册中心 + LoadBalancer服务负载均衡使用

SpringCloud中Nacos服务注册中心 LoadBalancer服务负载均衡使用 前言Nacos工作流程nacos安装docker安装window安装 运行nacos微服务集成nacos高级特性1.服务集群配置方法效果图模拟服务实例宕机 2.权重配置3.环境隔离 如何启动集群节点本地启动多个节点方法 LoadBalancer集成L…

Apache Doris 使用指南:从入门到生产实践

目录 一、Doris 核心概念 1.1 架构组成 1.2 数据模型 二、Doris 部署方式 2.1 单机部署&#xff08;测试环境&#xff09; 2.2 集群部署&#xff08;生产环境&#xff09; 三、数据操作指南 3.1 数据库与表管理 3.2 数据导入方式 3.2.1 批量导入 3.2.2 实时导入 3.…

26届秋招收割offer指南

26届暑期实习已经陆续启动&#xff0c;这也意味着对于26届的同学们来说&#xff0c;“找工作”已经提上了日程。为了帮助大家更好地准备暑期实习和秋招&#xff0c;本期主要从时间线、学习路线、核心知识点及投递几方面给大家介绍&#xff0c;希望能为大家提供一些实用的建议和…

拷贝多个Excel单元格区域为图片并粘贴到Word

Excel工作表Sheet1中有两个报表&#xff0c;相应单元格区域分别定义名称为Report1和Report2&#xff0c;如下图所示。 现在需要将图片拷贝图片粘贴到新建的Word文档中。 示例代码如下。 Sub Demo()Dim oWordApp As ObjectDim ws As Worksheet: Set ws ThisWorkbook.Sheets(&…

【Bluedroid】蓝牙 SDP(服务发现协议)模块代码解析与流程梳理

本文深入剖析Bluedroid蓝牙协议栈中 SDP&#xff08;服务发现协议&#xff09;服务记录的全生命周期管理流程&#xff0c;涵盖初始化、记录创建、服务搜索、记录删除等核心环节。通过解析代码逻辑与数据结构&#xff0c;揭示各模块间的协作机制&#xff0c;包括线程安全设计、回…

中国自动驾驶研发解决方案,第一!

4月28日&#xff0c;IDC《中国汽车云市场(2024下半年)跟踪》报告发布&#xff0c;2024下半年中国汽车云市场整体规模达到65.1亿元人民币&#xff0c;同比增长27.4%。IDC认为&#xff0c;自动驾驶技术深化与生成式AI的发展将为汽车云打开新的成长天花板&#xff0c;推动云计算在…

Kubernetes(k8s)学习笔记(四)--入门基本操作

本文通过kubernetes部署tomcat集群&#xff0c;来学习和掌握kubernetes的一些入门基本操作 前提条件 1.各个节点处于Ready状态&#xff1b; 2.配置好docker镜像库(否则会出现ImagePullBackOff等一些问题)&#xff1b; 3.网络配置正常(否则即使应用发布没问题&#xff0c;浏…

【项目篇之统一硬盘操作】仿照RabbitMQ模拟实现消息队列

统一硬盘操作 创建出实例封装交换机的操作封装队列的操作封装绑定的操作封装消息的操作总的完整代码&#xff1a; 我们之前已经使用了数据库去管理交换机&#xff0c;绑定&#xff0c;队列 还使用了数据文件去管理消息 此时我们就搞一个类去把上述两个部分都整合在一起&#…

基于 GO 语言的 Ebyte 勒索软件——简要分析

一种新的勒索软件变种,采用Go 语言编写,使用ChaCha20进行加密,并使用ECIES进行安全密钥传输,加密用户数据并修改系统壁纸。其开发者EvilByteCode曾开发过多种攻击性安全工具,现已在 GitHub 上公开 EByte 勒索软件。尽管该勒索软件声称仅用于教育目的,但滥用可能会导致严重…

0基础 | STM32 | STM32F103C8T6开发板 | 项目开发

注&#xff1a;本专题系列基于该开发板进行&#xff0c;会分享源代码 F103C8T6核心板链接&#xff1a; https://pan.baidu.com/s/1EJOlrTcProNQQhdTT_ayUQ 提取码&#xff1a;8c1w 图 STM32F103C8T6开发板 1、黑色制版工艺、漂亮、高品质 2、入门级配置STM32芯片(SEM32F103…

南京大学OpenHarmony技术俱乐部正式揭牌 仓颉编程语言引领生态创新

2025年4月24日&#xff0c;由OpenAtom OpenHarmony&#xff08;以下简称“OpenHarmony”&#xff09;项目群技术指导委员会与南京大学软件学院共同举办的“南京大学OpenHarmony技术俱乐部成立大会暨基础软件与生态应用论坛”在南京大学仙林校区召开。 大会聚焦国产自主编程语言…

主场景 工具栏 植物卡牌的渲染

前置知识&#xff1a;使用easyx图形库 1.IMAGE内存变量存储的是一张位图(图像)&#xff0c;存储了像素数据(颜色&#xff0c;尺寸等) 2.loadimage(&变量名&#xff0c;"加载的文件路径")表示从文件中加载图像到变量中 3. saveimage("文件路径", &变…

Java三大基本特征之多态

多态&#xff08;Polymorphism&#xff09;是面向对象编程&#xff08;OOP&#xff09;的三大特性之一&#xff08;另外两个是 封装 和 继承&#xff09;&#xff0c;它允许 同一个行为具有不同的表现形式。在 Java 中&#xff0c;多态主要通过 方法重写&#xff08;Override&a…

OpenCV 基于生物视觉模型的工具------模拟人眼视网膜的生物视觉机制类cv::bioinspired::Retina

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::bioinspired::Retina 是 OpenCV 中用于仿生视觉处理的一个类&#xff0c;它基于生物视觉模型进行图像预处理。该算法特别适用于动态范围调整…

前端跨域问题怎么在后端解决

目录 简单的解决方法&#xff1a; 添加配置类&#xff1a; 为什么会跨域 1. 什么是源 2. URL结构 3. 同源不同源举&#x1f330; 同源例子 不同源例子 4. 浏览器为什么需要同源策略 5. 常规前端请求跨域 简单的解决方法&#xff1a; 添加配置类&#xff1a; packag…

Python小程序:上班该做点摸鱼的事情

系统提醒 上班会忘记一些自己的事&#xff0c;所以你需要在上班的的时候突然给你弹窗&#xff0c;你就知道要做啥了 源码 这里有一个智能家居项目可以看看(开源) # -*- coding:utf-8 -*- """ 作者:YTQ 日期: 2025年04日29 21:51:24 """ impor…

飞云分仓操盘副图指标操作技术图文分解

如上图&#xff0c;副图指标-飞云分仓操盘指标&#xff0c;指标三条线蓝色“首峰线”&#xff0c;红色“引力1”&#xff0c;青色“引力2”&#xff0c;多头行情时“首峰线”和“引力1”之间显示为红色&#xff0c;“引力1”和“引力2”多头是区间颜色显示为紫色。 如上图图标信…

基于vueflow可拖拽元素的示例(基于官网示例的单文件示例)

效果图 代码 <template><div style"width: 100%;height: calc(100vh - 84px)"><VueFlow :nodes"nodes" :edges"edges" drop"onDrop" dragover"onDragOver" dragleave"onDragLeave"><div cl…