一、双向流处理概述
- 简单来讲客户端可以向服务端发送消息流,服务端也可以向客户端传输响应流,即客户端和服务端可以互相通讯
- 客户端无需发送消息即可开始双向流式处理调用 。 客户端可选择使用 RequestStream.WriteAsync发送消息。 使用ResponseStream.MoveNext()或ResponseStream.ReadAllAsync()可访问从服务流式处理的消息。ResponseStream没有更多消息时,双向流式处理调用完成。
二、案例简介
- 客户端发送请求流通过equestStream.WriteAsync传入到服务端
- 服务端响应到客户端的流通过ResponseStream.WriteAsync写入到客户端
- 服务端使用System.Threading.Channels保证线程安全交互
三、服务端配置(注意:grpc相关配置参考我之前的文章)
- 配置.proto文件
// 1.提供公共的实体proto文件
// 2.服务引用对应的proto文件
// 3.定义三个客户流方法
//定义messages.proto文件令需要注意项目文件中的特性GrpcServices=None;
syntax = "proto3";
option csharp_namespace = "GrpcProject";
package grpc.serviceing;
// 消息推送/接收实体
message ExampleMessage
{
	string msg = 1;
}
// 双向流文件twowaystream.proto
syntax = "proto3";
import "Protos/messages.proto";
option csharp_namespace = "GrpcProject";
package grpc.serviceing;
service BothWaysRpc{
	// 双向流
	rpc StreamingBothWays(stream ExampleMessage) returns (stream ExampleMessage);
}- 1 服务接口实现
    /// <summary>
    /// 双向流服务
    /// </summary>
    public class BothWaysService : BothWaysRpc.BothWaysRpcBase
    {
        /// <summary>
        /// 自动重置事件
        /// </summary>
        private readonly ManualResetEventSlim _event;
        public BothWaysService()
        {
            _event = new ManualResetEventSlim(false);
        }
        public override async Task StreamingBothWays(IAsyncStreamReader<ExampleMessage> requestStream,
                                               IServerStreamWriter<ExampleMessage> responseStream,
                                               ServerCallContext context)
        {
            // 创建线程安全的有限容量通道
            var channel = Channel.CreateBounded<ExampleMessage>(new BoundedChannelOptions(capacity: 5));
            var task = Task.Run(async () =>
            {
                await foreach (var message in requestStream.ReadAllAsync())
                {
                    // 读取消息 写入通道
                    if (!string.IsNullOrWhiteSpace(message.Msg))
                    {
                        await Console.Out.WriteLineAsync($"记录客户端传入消息:{message.Msg}");
                        // todo 消息处理
                        await channel.Writer.WriteAsync(message, context.CancellationToken);
                    }
                }
            }, context.CancellationToken);
            await foreach (var message in channel.Reader.ReadAllAsync())
            {
                // 打印通道接收的消息
                await Console.Out.WriteLineAsync($"通道传入消息:{message.Msg}");
                // 写入响应流
                ExampleMessage exampleMessage = new ExampleMessage() { Msg = $"我已经接收到消息:{message.Msg}" };
                await responseStream.WriteAsync(exampleMessage);
                if (message.Msg.ToLower() == "exit")
                {
                    break;
                }
            }
            // 完结写入通道
            channel.Writer.Complete();
            await task;
        }
    }- 2 Program注入
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);
            builder.Services.AddGrpc();
            var app = builder.Build();
            // 一元方法
            //app.MapGrpcService<DollarService>();
            // 客户端流
            //app.MapGrpcService<ClientStreamService>();
            // 服务端流
            //app.MapGrpcService<ServerStreamService>();
            // 双向流
            app.MapGrpcService<BothWaysService>();
            app.Run();
        }
    }四、客户端配置
- 引用proto文件,配置为客户端类型
- 根据编译生成的函数进行传参调用
- 创建WPF测试客户端

button按钮触发grpc
    /// <summary>
    /// BothWaysClient.xaml 的交互逻辑
    /// </summary>
    public partial class BothWaysClient : Window
    {
        public BothWaysClient()
        {
            InitializeComponent();
        }
        private async void Excute_Click(object sender, RoutedEventArgs e)
        {
            Action<string> action = str => { txtValue.Text += $"{str}\r\n"; };
            await WpfClient.Show(action);
            txtValue.Text += "\r\n\r\n";
        }
    }grpc客户端接口调用
        /// <summary>
        /// 双向流
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public static async Task Show(Action<string> action)
        {
            var messages = new List<string>()
            {
                "test",
                "one",
                "two",
                "three",
                "false",
                "four",
                "Oooo",
                "dddd",
                "vvvfff",
                "exit"
            };
            Random rnd = new Random(20);
            var channel = GrpcChannel.ForAddress("https://localhost:7188");
            var client = new GrpcProject.BothWaysRpc.BothWaysRpcClient(channel);
            var bothWays = client.StreamingBothWays();
            var requestTask = Task.Run(async () =>
              {
                  while (true)
                  {
                      var index = rnd.Next(messages.Count);
                      var msg = messages[index];
                      await bothWays.RequestStream.WriteAsync(new ExampleMessage { Msg = msg });
                      if (msg == "exit")
                      {
                          break;
                      }
                  }
              });
            await foreach (var item in bothWays.ResponseStream.ReadAllAsync())
            {
                action(item.Msg);
                if (item.Msg == "我已经接收到消息:exit")
                {
                    break;
                }
            }
            await requestTask;
        }五、执行结果
服务端:

客户端:

六、源码地址
链接:https://pan.baidu.com/s/1uCirfbexPJ7C-AujBVtkCQ 
 提取码:sd4y
七、后续进阶简介
- 接下来会讲解客户端工厂,优化客户端请求地址使用依赖注入提取各个服务
- proto文件各个字段详细介绍
- token认证
- 截止时间(中止请求)和请求取消
- AOP切面策略
- 重试策略(policy)
- 负载均衡策略(grpc本身提供的策略及nginx代理)
- 日志记录
- 健康检查
- 后续有更多特色功能会持续补充



















