gRPC 的四种基本通信模式,包括完整的 .proto
文件定义和 Go 语言实现代码:
1. 简单 RPC (Unary RPC) - 请求/响应模式
客户端发送单个请求,服务端返回单个响应
calculator.proto
protobuf
syntax = "proto3";
package calculator;
service Calculator {
// Unary RPC
rpc Add (AddRequest) returns (AddResponse) {}
}
message AddRequest {
int32 a = 1;
int32 b = 2;
}
message AddResponse {
int32 result = 1;
}
服务端实现 (Go)
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
pb "path/to/calculator"
)
type server struct {
pb.UnimplementedCalculatorServer
}
func (s *server) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddResponse, error) {
result := req.A + req.B
return &pb.AddResponse{Result: result}, nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterCalculatorServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端实现 (Go)
package main
import (
"context"
"log"
"os"
"time"
"google.golang.org/grpc"
pb "path/to/calculator"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewCalculatorClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Unary RPC 调用
r, err := c.Add(ctx, &pb.AddRequest{A: 5, B: 3})
if err != nil {
log.Fatalf("could not add: %v", err)
}
log.Printf("Result: %d", r.GetResult())
}
2. 服务端流式 RPC (Server Streaming RPC)
客户端发送单个请求,服务端返回流式响应
calculator.proto
// Server Streaming RPC
rpc PrimeFactors (PrimeRequest) returns (stream PrimeResponse) {
message PrimeRequest {
int32 number = 1;
}
message PrimeResponse {
int32 factor = 1;
}
服务端实现
func (s *server) PrimeFactors(req *pb.PrimeRequest, stream pb.Calculator_PrimeFactorsServer) error {
n := req.GetNumber()
factor := 2
for n > 1 {
if n%int32(factor) == 0 {
// 发送因数
stream.Send(&pb.PrimeResponse{Factor: int32(factor)})
n /= int32(factor)
} else {
factor++
}
}
return nil
}
客户端实现
// 服务端流式调用
stream, err := c.PrimeFactors(ctx, &pb.PrimeRequest{Number: 120})
if err != nil {
log.Fatalf("could not get prime factors: %v", err)
}
log.Print("Prime factors of 120:")
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error receiving factor: %v", err)
}
log.Printf("- %d", res.GetFactor())
}
3. 客户端流式 RPC (Client Streaming RPC)
客户端发送流式请求,服务端返回单个响应
calculator.proto
// Client Streaming RPC
rpc Average (stream AverageRequest) returns (AverageResponse) {}
message AverageRequest {
double number = 1;
}
message AverageResponse {
double average = 1;
}
服务端实现
func (s *server) Average(stream pb.Calculator_AverageServer) error {
sum := 0.0
count := 0
for {
req, err := stream.Recv()
if err == io.EOF {
// 计算平均值并返回响应
average := sum / float64(count)
return stream.SendAndClose(&pb.AverageResponse{Average: average})
}
if err != nil {
return err
}
sum += req.GetNumber()
count++
}
}
客户端实现
// 客户端流式调用
avgStream, err := c.Average(ctx)
if err != nil {
log.Fatalf("could not calculate average: %v", err)
}
numbers := []float64{1.5, 2.5, 3.5, 4.5, 5.5}
for _, num := range numbers {
if err := avgStream.Send(&pb.AverageRequest{Number: num}); err != nil {
log.Fatalf("error sending number: %v", err)
}
}
avgRes, err := avgStream.CloseAndRecv()
if err != nil {
log.Fatalf("error receiving average: %v", err)
}
log.Printf("Average: %.2f", avgRes.GetAverage())
4. 双向流式 RPC (Bidirectional Streaming RPC)
客户端和服务端同时发送流式消息
calculator.proto
// Bidirectional Streaming RPC
rpc Chat (stream ChatMessage) returns (stream ChatMessage) {}
message ChatMessage {
string text = 1;
int64 timestamp = 2;
}
服务端实现
func (s *server) Chat(stream pb.Calculator_ChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 模拟处理消息
log.Printf("Received: %s", in.GetText())
response := &pb.ChatMessage{
Text: "Echo: " + in.GetText(),
Timestamp: time.Now().UnixNano(),
}
// 发送响应
if err := stream.Send(response); err != nil {
return err
}
}
}
客户端实现
// 双向流式调用
chatStream, err := c.Chat(ctx)
if err != nil {
log.Fatalf("could not start chat: %v", err)
}
// 接收消息的goroutine
go func() {
for {
res, err := chatStream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Fatalf("error receiving message: %v", err)
}
log.Printf("Server: %s (time: %d)", res.GetText(), res.GetTimestamp())
}
}()
// 发送消息
messages := []string{"Hello", "How are you?", "Goodbye"}
for _, msg := range messages {
if err := chatStream.Send(&pb.ChatMessage{
Text: msg,
Timestamp: time.Now().UnixNano(),
}); err != nil {
log.Fatalf("error sending message: %v", err)
}
time.Sleep(1 * time.Second)
}
chatStream.CloseSend()
项目结构建议
grpc-demo/ ├── proto/ │ └── calculator.proto ├── server/ │ └── main.go ├── client/ │ └── main.go └── gen/ └── calculator.pb.go # 自动生成的代码
编译和运行步骤
-
安装依赖:
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28 go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
-
生成 gRPC 代码:
protoc --go_out=./gen --go_opt=paths=source_relative \
--go-grpc_out=./gen --go-grpc_opt=paths=source_relative \
proto/calculator.proto
-
启动服务端:
cd server go run main.go
-
启动客户端:
cd client go run main.go
各模式适用场景总结
模式 | 特点 | 适用场景 |
---|---|---|
简单RPC | 1请求 → 1响应 | 常规API调用,如用户验证、数据查询 |
服务端流式 | 1请求 → N响应 | 实时数据推送、大文件下载、实时监控 |
客户端流式 | N请求 → 1响应 | 批量上传、日志收集、传感器数据汇总 |
双向流式 | N请求 ↔ N响应 | 实时聊天、游戏状态同步、双向数据流处理 |
这些示例展示了 gRPC 的核心通信模式,您可以根据实际需求组合使用这些模式构建复杂的分布式系统。