使用 Tonic 构建高性能异步 gRPC 服务
使用 Tonic 构建高性能异步 gRPC 服务在分布式系统开发中gRPC 作为 Google 开源的高性能 RPC 框架凭借 Protobuf 二进制序列化的高效性和 HTTP/2 传输的优势成为服务间通信的首选方案。而在 Rust 生态中Tonic 框架以其原生异步支持、类型安全、高性能的特性成为实现 gRPC 服务的最优选择之一。本文将从 Tonic 基础入手带你从零搭建 gRPC 服务与客户端。搭建 Tonic 开发环境安装 Protobuf 编译器Tonic 依赖 Protobuf 编译器来解析.proto文件并生成 Rust 代码不同系统的安装方式如下# LinuxUbuntu/Debiansudoapt-getinstallprotobuf-compiler# macOSbrewinstallprotobuf# Windowswingetinstall-e--idGoogle.Protobuf安装完成后执行protoc --version验证是否成功。创建项目并配置依赖接下来创建一个包含服务端和客户端的 Rust 项目模拟真实的服务通信场景cargonew tonic-democdtonic-demo修改Cargo.toml添加核心依赖和构建依赖# 二进制目标服务端和客户端 [[bin]] name server path src/server.rs [[bin]] name client path src/client.rs [dependencies] tokio { version 1, features [full] } # tokio 的流式处理需要用到 tokio-stream { version 0.1, features [full] } tonic 0.14 tonic-prost 0.14 prost 0.14 prost-types 0.14 anyhow 1.0 chrono 0.4 [build-dependencies] tonic-prost-build 0.14 anyhow 1.0创建build.rs文件用于配置 Protobuf 编译规则告诉 Tonic 在构建时自动生成代码useanyhow::Result;fnmain()-Result(){println!(cargo:rerun-if-changedbuild.rs);println!(cargo:rerun-if-changedproto/chat.proto);// 编译 proto 目录下的所有 .proto 文件tonic_prost_build::compile_protos(proto/chat.proto)?;Ok(())}入门案例实现一个双向流式聊天服务gRPC 支持四种服务方法一元 RPC单次请求-响应、服务端流式、客户端流式、双向流式。其中双向流式适合实时聊天、即时数据推送等场景本文将基于 Tonic 实现一个简单的双向流式聊天服务完整覆盖 Protobuf 定义、服务端实现、客户端测试全流程。定义 Protobuf 接口gRPC 是接口定义优先的首先需要通过 Protobuf 定义服务接口和数据结构。在项目根目录创建proto文件夹新建chat.proto文件syntax proto3; package chat; // 包名用于生成 Rust 模块 // 定义聊天消息结构 message ChatMessage { string username 1; // 用户名 string content 2; // 消息内容 string timestamp 3; // 发送时间戳 } // 定义聊天服务接口 service ChatService { // 双向流式 RPC客户端和服务端可同时发送消息 rpc ChatStream (stream ChatMessage) returns (stream ChatMessage); }生成 Rust 代码执行cargo build命令Tonic 会根据build.rs的配置编译chat.proto并生成对应的 Rust 代码生成的代码位于target/debug/build/tonic-demo-*/out/chat.rs需要注意的是在使用时在代码中可通过tonic::include_proto!(chat)引入生成的模块。实现服务端逻辑服务端需要实现 Protobuf 定义的ChatService特征核心逻辑是接收客户端消息并将消息广播给所有连接的客户端。这里利用 Tokio 的广播通道实现消息分发新建src/server.rsusetokio::sync::Mutex;usetokio::sync::broadcast;usetokio::sync::mpsc;usetokio_stream::StreamExt;usetokio_stream::wrappers::BroadcastStream;usetokio_stream::wrappers::ReceiverStream;usetokio_stream::wrappers::errors::BroadcastStreamRecvError;usetonic::transport::Server;usetonic::{Request,Response,Status,Streaming};usecrate::chat::ChatMessage;usecrate::chat::chat_service_server::ChatService;usecrate::chat::chat_service_server::ChatServiceServer;// 引入生成的 Rust 代码pubmodchat{tonic::include_proto!(chat);}#[derive(Debug, Default)]pubstructChatServer{// 广播通道发送端用于向所有客户端广播消息// 用 Mutex 包裹实现异步环境的内部可变性broadcaster:MutexOptionbroadcast::SenderChatMessage,}#[tonic::async_trait]implChatServiceforChatServer{// 双向流式方法的实现返回值为 StreamItem ResultChatMessage, StatustypeChatStreamStreamReceiverStreamResultChatMessage,Status;asyncfnchat_stream(self,request:RequestStreamingChatMessage,)-ResultResponseSelf::ChatStreamStream,Status{// 获取客户端发送的消息流letmutstreamrequest.into_inner();// 初始化广播通道首次连接时创建letmutbroadcaster_lockself.broadcaster.lock().await;lettxmatch*broadcaster_lock{Some(sender)sender.clone(),None{// 首次连接创建广播通道并存储到服务端实例中let(sender,_)broadcast::channel(128);*broadcaster_lockSome(sender.clone());sender}};// 释放锁避免长时间占用drop(broadcaster_lock);// 当前客户端订阅广播通道letrxtx.subscribe();// 处理 broadcast 通道的 Lagged 错误消费者消息落后letbroadcast_streamBroadcastStream::new(rx).filter_map(|msg|matchmsg{Ok(msg)Some(Ok(msg)),Err(BroadcastStreamRecvError::Lagged(_)){eprintln!(客户端消息落后跳过旧消息);None}});// 创建客户端下行通道let(client_tx,client_rx)mpsc::channel(128);letresponse_streamReceiverStream::new(client_rx);// 监听客户端发送的消息广播给所有人tokio::spawn(asyncmove{whileletSome(result)stream.next().await{matchresult{Ok(msg){// 广播消息忽略发送错误无客户端时正常let_tx.send(msg);}Err(e){eprintln!(接收客户端消息失败: {},e);break;}}}eprintln!(客户端断开连接上行);});// 转发广播消息给当前客户端tokio::spawn(asyncmove{tokio::pin!(broadcast_stream);whileletSome(msg)broadcast_stream.next().await{ifclient_tx.send(msg).await.is_err(){// 客户端断开连接终止任务break;}}eprintln!(客户端断开连接下行);});// 返回流给客户端Ok(Response::new(response_stream))}}// 服务端入口函数#[tokio::main]asyncfnmain()-Result(),Boxdynstd::error::Error{// 绑定服务地址letaddr[::1]:50051.parse()?;// 创建服务实例letchat_serverChatServer::default();println!(Chat server running on {},addr);// 启动 gRPC 服务Server::builder().add_service(ChatServiceServer::new(chat_server)).serve(addr).await?;Ok(())}实现客户端逻辑客户端需要连接服务端同时执行两个操作读取用户输入并发送消息、监听服务端广播的消息并打印。新建src/client.rsuseanyhow::Result;usetokio::{io::{self,AsyncBufReadExt,AsyncWriteExt,BufReader},sync::mpsc,};usetokio_stream::{StreamExt,wrappers::ReceiverStream};usecrate::chat::{ChatMessage,chat_service_client::ChatServiceClient};// 引入生成的 Rust 代码pubmodchat{tonic::include_proto!(chat);}#[tokio::main]asyncfnmain()-Result(){// 连接服务端letmutclientChatServiceClient::connect(http://[::1]:50051).await?;println!(Connected to chat server);// 获取用户名println!(Enter your username: );io::stdout().flush().await?;letmutreaderBufReader::new(io::stdin());letmutusernameString::new();reader.read_line(mutusername).await?;letusernameusername.trim().to_string();// 创建消息发送通道let(tx,rx)mpsc::channel(128);letrequesttonic::Request::new(ReceiverStream::new(rx));// 发起双向流请求获取服务端响应流letmutresponse_streamclient.chat_stream(request).await?.into_inner();// 启动异步任务监听服务端消息并打印tokio::spawn(asyncmove{whileletSome(Ok(msg))response_stream.next().await{println!([{}] {}: {},msg.timestamp,msg.username,msg.content);}println!(Disconnected from server);});// 读取用户输入并发送消息letmutlinesreader.lines();whileletSome(line)lines.next_line().await?{ifline.eq_ignore_ascii_case(exit){println!(Exiting chat...);break;}// 构造聊天消息letmsgChatMessage{username:username.clone(),content:line,timestamp:chrono::Local::now().format(%H:%M:%S).to_string(),};// 发送消息到服务端iftx.send(msg).await.is_err(){println!(Failed to send message);break;}}Ok(())}测试服务分别启动服务端和客户端测试双向流式通信# 启动服务端cargorun--binserver# 新终端启动多个客户端cargorun--binclient输入用户名后即可发送消息所有连接的客户端都会实时收到广播消息实现简单的群聊功能。Tonic 进阶拦截器中间件拦截器类似于 Web 框架的中间件可在请求/响应处理前后执行自定义逻辑如认证、日志记录、指标收集等。下面实现一个简单的 Token 认证拦截器usetonic::service::InterceptorLayer;usetonic::{Request,Status};// 自定义认证拦截器#[derive(Debug, Clone, Copy)]pubstructAuthInterceptor;implInterceptorforAuthInterceptor{fncall(mutself,request:Request())-ResultRequest(),Status{// 从请求头中获取 Tokenlettokenrequest.metadata().get(authorization).and_then(|v|v.to_str().ok()).unwrap_or();// 简单验证 Token实际场景需结合密钥验证iftoken!Bearer tonic-demo-token{returnErr(Status::unauthenticated(Invalid or missing token));}Ok(request)}}usetonic::service::InterceptorLayer;// 服务端启动时添加拦截器Server::builder().layer(InterceptorLayer::new(AuthInterceptor)).add_service(ChatServiceServer::new(chat_server)).serve(addr).await?;客户端发送请求时需在请求头中添加 Tokenletmutrequesttonic::Request::new(ReceiverStream::new(rx));request.metadata_mut().insert(authorization,Bearer tonic-demo-token.parse()?);TLS 加密传输生产环境中服务间通信需要加密Tonic 基于 rustls 原生支持 TLS 加密。只需修改服务端和客户端的连接配置即可实现加密通信具体可参考 Tonic 官方示例。四种 gRPC 服务方法对比Tonic 完全支持 gRPC 的四种服务方法适用于不同场景方法类型描述适用场景一元 RPC客户端发送单次请求服务端返回单次响应简单查询、接口调用如用户登录服务端流式 RPC客户端发送单次请求服务端返回流式响应大数据量返回如日志查询、文件下载客户端流式 RPC客户端发送流式请求服务端返回单次响应大数据量上传如文件上传、批量数据提交双向流式 RPC客户端和服务端同时发送流式消息双向独立通信实时通信如聊天、实时监控、行情推送总结Tonic 作为 Rust 生态中成熟的 gRPC 框架凭借其异步原生、类型安全、高性能的优势极大地降低了 Rust 开发 gRPC 服务的门槛。如果你正在用 Rust 构建分布式服务Tonic 绝对值得你深入学习和实践。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2557817.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!