IPC-CFX有两种主要的通信方式,可以通过RabbitMQ发布和订阅,也可以通过request和response进行点对点的通信,本文主要讲的是点对点的通信方式。
在vscode里建立新的dotnet项目,可以通过终端输入dotnet new console来建立,文件目录为CFXDemo->machine1和CFXDemo->machine2。
        通过nuget插件分别为两个项目都安装CFX.CFXSDK、AMQPNetLite.Core和Newtonsoft.Json这几个metapackage。
我们将machine1作为发送端(sendRequest),machine2作为接收端(Receive),则Machine1的代码如下所示:
using System.Threading;
using CFX;
using CFX.Transport;
using System;
using System.Security.Principal;
namespace machine1
{
    class Program
    {
        static void Main(string[] args)
        {
            OpenRequest();
            Console.ReadLine();
            for(int i = 0;i<5;i++){
                SendRequest();
                Thread.Sleep(2000);
            }
            
        }
        static string sendCFXHandle = "a.b.001";     
        static string receiveCFXHandle = "a.b.002";
        static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
        static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");
        #region send request
        static AmqpCFXEndpoint endpointSendRequest;
        static void OpenRequest()
        {
            if (endpointSendRequest != null)
            {
                endpointSendRequest.Close();
                endpointSendRequest = null;
            }
            endpointSendRequest = new AmqpCFXEndpoint();
            if (!endpointSendRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
                endpointSendRequest.Open(sendCFXHandle);    //这一步会绑定endpointSendRequest里的CFXHandle,即sendCFXHandle的值
                Console.WriteLine("Request.Source is : {0}",endpointSendRequest.CFXHandle);
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
            }
            // Set a timeout of 20 seconds.  If the target endpoint does not
            // respond in this time, the request will time out.
            AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);
        }
        static void SendRequest()
        {
            // Build a GetEndpointInfomation Request
            CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
            {
                CFXHandle = receiveCFXHandle
            });
            request.Source = endpointSendRequest.CFXHandle;
            request.Target = receiveCFXHandle;
            try
            {
                CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
                Console.WriteLine($"response:\n{response.ToJson(true)}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
        #endregion send request
        #region receive request
        static AmqpCFXEndpoint endpointReceiveRequest;
        static void OpenReceive()
        {
            if (endpointReceiveRequest != null)
            {
                endpointReceiveRequest.Close();
                endpointReceiveRequest = null;
            }
            endpointReceiveRequest = new AmqpCFXEndpoint();
            endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
            endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;
            if (!endpointReceiveRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
                endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
            }
        }
        static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
        {
            Console.WriteLine($"Endpoint_OnRequestReceived:  { request.ToString()}");
            // Process request.  Return Result.
            if (request.MessageBody is WhoIsThereRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }
            if (request.MessageBody is GetEndpointInformationRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "...." });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }
            return null;
        }
        #endregion receive request
    }
}作为接收端,machine2的代码如下所示:
using CFX;
using CFX.Transport;
using System;
namespace machine2
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("ReceivEndPoint is waiting Request......");
            OpenReceive();
            Console.WriteLine("Press Enter Key to end the App");
            Console.ReadKey();
        }
        static string sendCFXHandle = "a.b.001";
        static string receiveCFXHandle = "a.b.002";
        static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
        static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");
        #region send request
        static AmqpCFXEndpoint endpointSendRequest;
        static void OpenRequest()
        {
            if (endpointSendRequest != null)
            {
                endpointSendRequest.Close();
                endpointSendRequest = null;
            }
            endpointSendRequest = new AmqpCFXEndpoint();
            if (!endpointSendRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
                endpointSendRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
            }
            // Set a timeout of 20 seconds.  If the target endpoint does not
            // respond in this time, the request will time out.
            AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);
        }
        static void SendRequest()
        {
            // Build a GetEndpointInfomation Request
            CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
            {
                CFXHandle = receiveCFXHandle
            });
            request.Source = endpointSendRequest.CFXHandle;
            request.Target = receiveCFXHandle;
            try
            {
                CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
                Console.WriteLine($"response:\n{response.ToJson(true)}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
        #endregion send request
        #region receive request
        static AmqpCFXEndpoint endpointReceiveRequest;
        static void OpenReceive()
        {
            if (endpointReceiveRequest != null)
            {
                endpointReceiveRequest.Close();
                endpointReceiveRequest = null;
            }
            endpointReceiveRequest = new AmqpCFXEndpoint();
            endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
            endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;
            if (!endpointReceiveRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
                endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                //endpointReceiveRequest.Open(receiveCFXHandle);
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
            }
        }
        static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
        {
            Console.WriteLine($"Endpoint_OnRequestReceived:  { request.ToString()}");
            Console.WriteLine($"request:\n{request.ToJson(true)}");
            // Process request.  Return Result.
            if (request.MessageBody is WhoIsThereRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }
            if (request.MessageBody is GetEndpointInformationRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "..." });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }
            return null;
        }
        #endregion receive request
    }
}         运行结果,可以用json格式对response和request的内容进行解析。



















