gRPC 与 Protobuf 实战指南
引言gRPC 是 Google 开源的高性能 RPC 框架而 ProtobufProtocol Buffers则是其默认的序列化协议。两者结合带来了高性能、跨语言、契约优先的现代微服务通信方案。传统的 REST API 使用 JSON 或 XML 作为数据格式存在以下问题体积大JSON 文本格式冗余解析慢需要解析字符串无强类型字段变化不易发现代码生成弱缺乏好的工具链gRPC Protobuf 通过二进制格式和代码生成很好地解决了这些问题。本文将深入探讨 Protobuf 语法、gRPC 服务开发、以及生产环境中的最佳实践。一、Protobuf 语法详解1.1 消息定义基础Protobuf 的核心是.proto文件它定义消息的结构syntax proto3; // 指定 protobuf 版本 package user; // 包名用于避免命名冲突 // 定义用户消息 message User { string name 1; // 字段名 字段编号 int32 id 2; // 编号必须唯一用于二进制编码 string email 3; bool active 4; int64 created_at 5; }字段编号规则1-15常用字段使用一个字节编码16-2047非常用字段使用两个字节编码19000-19999保留编号系统使用建议将 1-15 分配给最常用的字段1.2 标量数据类型Protobuf 支持丰富的数据类型message TypesDemo { // 整数类型 int32 var_int32 1; // 变长有符号整数 int64 var_int64 2; // 变长有符号长整数 uint32 var_uint32 3; // 变长无符号整数 uint64 var_uint64 4; // 变长无符号长整数 sint32 var_sint32 5; // 变长有符号整数负数效率更高 sint64 var_sint64 6; // 变长有符号长整数 // 固定长度类型 fixed32 fixed32 7; // 固定4字节无符号整数 fixed64 fixed64 8; // 固定8字节无符号整数 sfixed32 sfixed32 9; // 固定4字节有符号整数 sfixed64 sfixed64 10; // 固定8字节有符号整数 // 浮点数类型 float float_val 11; // 32位浮点数 double double_val 12; // 64位浮点数 // 布尔和字符串 bool bool_val 13; string string_val 14; bytes bytes_val 15; }1.3 嵌套与组合// 嵌套消息 message User { message Address { string street 1; string city 2; string country 3; } string name 1; Address address 2; // 使用嵌套消息 repeated Phone phones 3; // 数组/列表 } // 枚举类型 message Order { enum Status { UNKNOWN 0; // 枚举必须从 0 开始 PENDING 1; PAID 2; SHIPPED 3; DELIVERED 4; CANCELLED 5; } string order_id 1; Status status 2; User buyer 3; repeated Item items 4; }1.4 Map 类型message Product { // 键值对映射 mapstring, string attributes 1; mapint64, User related_users 2; }1.5 oneof 联合类型当一个字段可以是多种类型之一时使用oneofmessage Response { oneof result { User user 1; Order order 2; string error 3; } int64 timestamp 4; }二、proto 文件编译与代码生成2.1 安装 protocWindows 环境安装 protoc# 下载 protoc从 GitHub releases curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v25.1/protoc-25.1-win64.zip unzip protoc-25.1-win64.zip -d $HOME/.local # 设置 PATH export PATH$PATH:$HOME/.local/bin # 验证安装 protoc --version2.2 安装 Go 插件# 安装 protoc-gen-go用于生成 .pb.go 文件 go install google.golang.org/protobuf/cmd/protoc-gen-golatest # 安装 protoc-gen-go-grpc用于生成 gRPC 服务代码 go install google.golang.org/grpc/cmd/protoc-gen-go-grpclatest # 设置 PATH export PATH$PATH:$(go env GOPATH)/bin2.3 编译 proto 文件目录结构project/ ├── proto/ │ ├── user.proto │ └── order.proto ├── generated/ └── main.go编译命令protoc \ --go_outgenerated \ --go_optpathssource_relative \ --go-grpc_outgenerated \ --go-grpc_optpathssource_relative \ proto/*.proto生成的代码结构// user.pb.go - 消息类型定义 type User struct { Name string Id int32 Email string Active bool CreatedAt int64 // ... 序列化/反序列化方法 } // user_grpc.pb.go - gRPC 服务定义 type UserServiceClient interface { GetUser(ctx context.Context, in *GetUserRequest, opts ...grpc.CallOption) (*User, error) ListUsers(ctx context.Context, in *ListUsersRequest, opts ...grpc.CallOption) (*ListUsersResponse, error) // ... }2.4 完整 proto 示例创建proto/user.protosyntax proto3; package user; option go_package github.com/example/project/gen/user;user; import google/protobuf/timestamp.proto; // 用户服务定义 service UserService { // 获取单个用户 rpc GetUser(GetUserRequest) returns (User); // 列出用户支持分页 rpc ListUsers(ListUsersRequest) returns (ListUsersResponse); // 创建用户 rpc CreateUser(CreateUserRequest) returns (User); // 更新用户 rpc UpdateUser(UpdateUserRequest) returns (User); // 删除用户 rpc DeleteUser(DeleteUserRequest) returns (Empty); // 双向流示例批量操作 rpc BatchProcessUsers(stream User) returns (stream OperationResult); } // 用户消息定义 message User { int32 id 1; string name 2; string email 3; bool active 4; google.protobuf.Timestamp created_at 5; repeated string roles 6; } // 请求消息定义 message GetUserRequest { int32 id 1; } message ListUsersRequest { int32 page 1; int32 page_size 2; string search 3; } message ListUsersResponse { repeated User users 1; int32 total 2; int32 page 3; int32 page_size 4; } message CreateUserRequest { string name 1; string email 2; repeated string roles 3; } message UpdateUserRequest { int32 id 1; string name 2; string email 3; bool active 4; } message DeleteUserRequest { int32 id 1; } // 通用响应 message Empty {} // 批量操作结果 message OperationResult { int32 id 1; bool success 2; string message 3; }三、gRPC 服务端开发3.1 项目结构grpc-demo/ ├── proto/ │ └── user.proto ├── gen/ │ ├── user.pb.go │ └── user_grpc.pb.go ├── server/ │ └── main.go ├── client/ │ └── main.go └── go.mod3.2 服务端实现package main import ( context fmt io log net google.golang.org/grpc google.golang.org/grpc/codes google.golang.org/grpc/metadata google.golang.org/grpc/status pb github.com/example/grpc-demo/gen/user google.golang.org/protobuf/types/known/timestamppb ) type UserServer struct { pb.UnimplementedUserServiceServer // 数据库模拟 users map[int32]*pb.User nextID int32 } func NewUserServer() *UserServer { return UserServer{ users: make(map[int32]*pb.User), nextID: 1, } } func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) { // 从 metadata 获取调用者信息 if md, ok : metadata.FromIncomingContext(ctx); ok { log.Printf(GetUser called by: %v, md.Get(client-id)) } user, ok : s.users[req.Id] if !ok { return nil, status.Errorf(codes.NotFound, 用户 %d 不存在, req.Id) } return user, nil } func (s *UserServer) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) { var users []*pb.User for _, user : range s.users { // 简单搜索过滤 if req.Search ! { if user.Name ! req.Search user.Email ! req.Search { continue } } users append(users, user) } // 分页 start : (req.Page - 1) * req.PageSize end : start req.PageSize if start len(users) { users []*pb.User{} } else if end len(users) { users users[start:] } else { users users[start:end] } return pb.ListUsersResponse{ Users: users, Total: int32(len(users)), Page: req.Page, PageSize: req.PageSize, }, nil } func (s *UserServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) { // 参数验证 if req.Name { return nil, status.Error(codes.InvalidArgument, 用户名为必填项) } if req.Email { return nil, status.Error(codes.InvalidArgument, 邮箱为必填项) } user : pb.User{ Id: s.nextID, Name: req.Name, Email: req.Email, Active: true, Roles: req.Roles, CreatedAt: timestamppb.Now(), } s.users[s.nextID] user s.nextID log.Printf(创建用户: ID%d, Name%s, user.Id, user.Name) return user, nil } func (s *UserServer) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.User, error) { user, ok : s.users[req.Id] if !ok { return nil, status.Errorf(codes.NotFound, 用户 %d 不存在, req.Id) } if req.Name ! { user.Name req.Name } if req.Email ! { user.Email req.Email } user.Active req.Active log.Printf(更新用户: ID%d, user.Id) return user, nil } func (s *UserServer) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*pb.Empty, error) { if _, ok : s.users[req.Id]; !ok { return nil, status.Errorf(codes.NotFound, 用户 %d 不存在, req.Id) } delete(s.users, req.Id) log.Printf(删除用户: ID%d, req.Id) return pb.Empty{}, nil } // 双向流 RPC 实现 func (s *UserServer) BatchProcessUsers(stream pb.UserService_BatchProcessUsersServer) error { for { user, err : stream.Recv() if err io.EOF { // 客户端发送完毕发送响应 return nil } if err ! nil { return err } log.Printf(处理用户: ID%d, Name%s, user.Id, user.Name) // 模拟处理 result : pb.OperationResult{ Id: user.Id, Success: true, Message: fmt.Sprintf(用户 %s 处理成功, user.Name), } // 发送响应 if err : stream.Send(result); err ! nil { return err } } }3.3 服务启动与注册func main() { // 创建监听 lis, err : net.Listen(tcp, :50051) if err ! nil { log.Fatalf(监听端口失败: %v, err) } // 创建 gRPC 服务器可以添加选项 opts : []grpc.ServerOption{ grpc.UnaryInterceptor(unaryServerInterceptor), grpc.StreamInterceptor(streamServerInterceptor), } server : grpc.NewServer(opts...) // 注册服务 userServer : NewUserServer() pb.RegisterUserServiceServer(server, userServer) log.Printf(gRPC 服务启动监听端口 :50051) // 启动服务 if err : server.Serve(lis); err ! nil { log.Fatalf(服务启动失败: %v, err) } } // 单元拦截器示例 func unaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { log.Printf(调用方法: %s, info.FullMethod) // 前置处理 start : time.Now() // 调用实际方法 resp, err : handler(ctx, req) // 后置处理 log.Printf(方法 %s 耗时: %v, info.FullMethod, time.Since(start)) return resp, err } // 流拦截器示例 func streamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Printf(流方法调用: %s, info.FullMethod) return handler(srv, ss) }四、gRPC 客户端开发4.1 简单客户端package main import ( context log time google.golang.org/grpc google.golang.org/grpc/credentials/insecure pb github.com/example/grpc-demo/gen/user ) func main() { // 连接 gRPC 服务器 conn, err : grpc.Dial( localhost:50051, grpc.WithTransportCredentials(insecure.NewCredentials()), // 测试环境不使用 TLS grpc.WithBlock(), // 阻塞直到连接成功 grpc.WithTimeout(time.Second*10), // 超时设置 ) if err ! nil { log.Fatalf(连接服务器失败: %v, err) } defer conn.Close() // 创建客户端 client : pb.NewUserServiceClient(conn) // 调用 GetUser ctx, cancel : context.WithTimeout(context.Background(), time.Second*5) defer cancel() user, err : client.GetUser(ctx, pb.GetUserRequest{Id: 1}) if err ! nil { log.Printf(获取用户失败: %v, err) } else { log.Printf(获取用户成功: %v, user) } }4.2 带认证的客户端// 认证元数据 type Auth struct { Token string } func (a *Auth) GetRequestMetadata(ctx context.Context, urls ...string) (map[string]string, error) { return map[string]string{ authorization: Bearer a.Token, }, nil } func (a *Auth) RequireTransportSecurity() bool { return false // 测试环境设为 false } // 认证连接示例 func authenticatedClient() (*grpc.ClientConn, error) { creds : Auth{Token: your-jwt-token} return grpc.Dial( localhost:50051, grpc.WithPerRPCCredentials(creds), ) }4.3 客户端流调用// 批量创建用户 func batchCreateUsers(client pb.UserServiceClient, users []*pb.User) error { ctx, cancel : context.WithTimeout(context.Background(), time.Minute) defer cancel() stream, err : client.BatchProcessUsers(ctx) if err ! nil { return err } // 发送请求流 for _, user : range users { if err : stream.Send(user); err ! nil { return err } } // 关闭发送流并接收响应 reply, err : stream.CloseAndRecv() if err ! nil { return err } log.Printf(批量处理完成: %v, reply) return nil }4.4 双向流调用// 双向流实时通信 func bidirectionalStream(client pb.UserServiceClient) error { ctx, cancel : context.WithCancel(context.Background()) defer cancel() stream, err : client.BatchProcessUsers(ctx) if err ! nil { return err } // 使用两个 goroutine 分别处理发送和接收 var wg sync.WaitGroup wg.Add(2) // 发送协程 go func() { defer wg.Done() for i : 0; i 10; i { user : pb.User{ Id: int32(i), Name: fmt.Sprintf(User%d, i), } if err : stream.Send(user); err ! nil { log.Printf(发送失败: %v, err) return } time.Sleep(100 * time.Millisecond) } stream.CloseSend() }() // 接收协程 go func() { defer wg.Done() for { result, err : stream.Recv() if err io.EOF { return } if err ! nil { log.Printf(接收失败: %v, err) return } log.Printf(收到结果: ID%d, Success%t, Message%s, result.Id, result.Success, result.Message) } }() wg.Wait() return nil }五、元数据与拦截器5.1 元数据MetadatagRPC 使用 Metadata 在请求中传递额外信息// 定义可选的消息头 message ExtraInfo { string trace_id 1; string span_id 2; mapstring, string tags 3; }服务端读取元数据func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) { // 读取元数据 md, ok : metadata.FromIncomingContext(ctx) if !ok { return nil, status.Error(codes.Internal, 无法获取元数据) } // 获取特定字段 traceID : md.Get(x-trace-id) if len(traceID) 0 { log.Printf(Trace ID: %s, traceID[0]) } // 处理请求... return s.users[req.Id], nil }客户端发送元数据func callWithMetadata(client pb.UserServiceClient) { // 创建元数据 md : metadata.Pairs( x-trace-id, abc123, x-client-version, 1.0.0, ) // 创建带元数据的上下文 ctx : metadata.NewOutgoingContext(context.Background(), md) // 调用 client.GetUser(ctx, pb.GetUserRequest{Id: 1}) }5.2 拦截器实现Unary 拦截器func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { log.Printf( 收到请求: %s, info.FullMethod) // 添加追踪 md, _ : metadata.FromIncomingContext(ctx) traceID : md.Get(x-trace-id) if len(traceID) 0 { ctx context.WithValue(ctx, trace_id, traceID[0]) } // 调用实际处理函数 resp, err : handler(ctx, req) if err ! nil { log.Printf( 请求失败: %s, error: %v, info.FullMethod, err) } else { log.Printf( 请求成功: %s, info.FullMethod) } return resp, err }Stream 拦截器func streamLoggingInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Printf( 收到流请求: %s, IsServerStream: %v, info.FullMethod, info.IsServerStream) // 包装原始流以添加日志功能 wrapped : loggingServerStream{ServerStream: ss} return handler(srv, wrapped) } type loggingServerStream struct { grpc.ServerStream } func (x *loggingServerStream) SendMsg(m interface{}) error { log.Printf( 发送消息: %T, m) return x.ServerStream.SendMsg(m) } func (x *loggingServerStream) RecvMsg(m interface{}) error { err : x.ServerStream.RecvMsg(m) if err nil { log.Printf( 收到消息: %T, m) } return err }5.3 认证拦截器// 简单 Token 认证 func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // 白名单不需要认证的方法 whitelist : map[string]bool{ /user.UserService/GetUser: true, } if whitelist[info.FullMethod] { return handler(ctx, req) } // 检查 Token md, ok : metadata.FromIncomingContext(ctx) if !ok { return nil, status.Error(codes.Unauthenticated, 缺少元数据) } tokens : md.Get(authorization) if len(tokens) 0 { return nil, status.Error(codes.Unauthenticated, 缺少认证 Token) } token : strings.TrimPrefix(tokens[0], Bearer ) if !validateToken(token) { return nil, status.Error(codes.Unauthenticated, 无效的 Token) } return handler(ctx, req) } func validateToken(token string) bool { // 实际应该验证 JWT 或其他 token return token valid-token }六、双向流 RPC 详解6.1 四种 RPC 类型gRPC 支持四种 RPC 类型1. 一元 RPC (Unary RPC) 客户端 → 服务端 ClientStream → 服务器处理 → ClientStream 2. 客户端流 RPC (Client Streaming RPC) ClientStream → 服务器处理 → ClientStream 客户端发送多个请求服务器返回一个响应 3. 服务端流 RPC (Server Streaming RPC) ClientStream → 服务器处理 → ClientStream 客户端发送一个请求服务器返回多个响应 4. 双向流 RPC (Bidirectional Streaming RPC) ClientStream ↔ 服务器处理 ↔ ClientStream 双方都可以发送多个消息6.2 双向流聊天服务示例定义 protoservice ChatService { // 双向流聊天 rpc Chat(stream ChatMessage) returns (stream ChatMessage); } message ChatMessage { string sender 1; string content 2; int64 timestamp 3; }服务端实现type ChatServer struct { pb.UnimplementedChatServiceServer clients map[string]pb.ChatService_ChatServer mu sync.Mutex } func (s *ChatServer) Chat(stream pb.ChatService_ChatServer) error { var sender string // 等待第一个消息用于注册 firstMsg, err : stream.Recv() if err ! nil { return err } sender firstMsg.Sender // 注册客户端 s.mu.Lock() s.clients[sender] stream s.mu.Unlock() defer func() { // 注销客户端 s.mu.Lock() delete(s.clients, sender) s.mu.Unlock() }() // 启动接收协程 errChan : make(chan error, 1) go func() { for { msg, err : stream.Recv() if err io.EOF { errChan - nil return } if err ! nil { errChan - err return } // 广播消息给所有客户端 s.broadcast(msg) } }() // 等待错误 select { case err : -errChan: return err } } func (s *ChatServer) broadcast(msg *pb.ChatMessage) { s.mu.Lock() defer s.mu.Unlock() for sender, stream : range s.clients { if sender ! msg.Sender { // 不发给自己 stream.Send(msg) } } }七、实战案例微服务通信框架7.1 项目架构microservices/ ├── proto/ │ ├── user.proto │ ├── order.proto │ └── product.proto ├── pkg/ │ ├── grpc/ │ │ ├── client.go │ │ ├── server.go │ │ └── interceptor.go │ └── discovery/ │ └── consul.go ├── services/ │ ├── user-service/ │ ├── order-service/ │ └── product-service/ └── go.mod7.1 通用 gRPC 客户端封装package grpc import ( context crypto/tls fmt time google.golang.org/grpc google.golang.org/grpc/credentials google.golang.org/grpc/credentials/insecure google.golang.org/grpc/resolver ) // ClientConfig 客户端配置 type ClientConfig struct { Name string Address string Timeout time.Duration TLSCert string Token string MaxRecvMsg int MaxSendMsg int } // Dial 创建 gRPC 连接 func Dial(ctx context.Context, cfg ClientConfig) (*grpc.ClientConn, error) { opts : []grpc.DialOption{ // 超时设置 grpc.WithBlock(), grpc.WithTimeout(cfg.Timeout), } // 凭证设置 if cfg.TLSCert ! { creds, err : credentials.NewClientTLSFromFile(cfg.TLSCert, ) if err ! nil { return nil, fmt.Errorf(加载 TLS 证书失败: %w, err) } opts append(opts, grpc.WithTransportCredentials(creds)) } else { opts append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } // Token 认证 if cfg.Token ! { opts append(opts, grpc.WithPerRPCCredentials(tokenAuth{Token: cfg.Token})) } // 消息大小限制 if cfg.MaxRecvMsg 0 { opts append(opts, grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsg), )) } if cfg.MaxSendMsg 0 { opts append(opts, grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(cfg.MaxSendMsg), )) } return grpc.DialContext(ctx, cfg.Address, opts...) } // tokenAuth 实现 Token 认证 type tokenAuth struct { Token string } func (t *tokenAuth) GetRequestMetadata(ctx context.Context, urls ...string) (map[string]string, error) { return map[string]string{ authorization: Bearer t.Token, }, nil } func (t *tokenAuth) RequireTransportSecurity() bool { return true }7.2 通用服务端封装package grpc import ( context crypto/tls fmt net time google.golang.org/grpc google.golang.org/grpc/keepalive google.golang.org/grpc/reflection ) // ServerConfig 服务端配置 type ServerConfig struct { Port int TLSCert string TLSKey string MaxRecvMsg int MaxSendMsg int Interceptors []grpc.UnaryServerInterceptor StreamInts []grpc.StreamServerInterceptor } // Server 通用 gRPC 服务器 type Server struct { cfg ServerConfig server *grpc.Server lis net.Listener } // NewServer 创建 gRPC 服务器 func NewServer(cfg ServerConfig) (*Server, error) { opts : []grpc.ServerOption{ // 消息大小限制 grpc.MaxRecvMsgSize(cfg.MaxRecvMsg), grpc.MaxSendMsgSize(cfg.MaxSendMsg), // Keepalive 设置 grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: 5 * time.Minute, MaxConnectionAge: 2 * time.Hour, MaxConnectionAgeGrace: 30 * time.Second, Time: 5 * time.Minute, Timeout: 30 * time.Second, }), // 拦截器 grpc.ChainUnaryInterceptor(cfg.Interceptors...), grpc.ChainStreamInterceptor(cfg.StreamInts...), } // TLS 配置 if cfg.TLSCert ! cfg.TLSKey ! { creds, err : credentials.NewServerTLSFromFile(cfg.TLSCert, cfg.TLSKey) if err ! nil { return nil, fmt.Errorf(加载 TLS 证书失败: %w, err) } opts append(opts, grpc.Creds(creds)) } server : grpc.NewServer(opts...) // 注册反射服务用于 grpcurl 等工具调试 reflection.Register(server) return Server{ cfg: cfg, server: server, }, nil } // RegisterService 注册 gRPC 服务 func (s *Server) RegisterService(registerFunc func(*grpc.Server)) { registerFunc(s.server) } // Start 启动服务器 func (s *Server) Start(ctx context.Context) error { lis, err : net.Listen(tcp, fmt.Sprintf(:%d, s.cfg.Port)) if err ! nil { return fmt.Errorf(监听端口失败: %w, err) } s.lis lis errCh : make(chan error, 1) go func() { errCh - s.server.Serve(lis) }() select { case err : -errCh: return err case -ctx.Done(): s.server.GracefulStop() return nil } } // Stop 停止服务器 func (s *Server) Stop() { s.server.GracefulStop() }7.3 服务发现集成package discovery import ( context fmt github.com/hashicorp/consul/api google.golang.org/grpc/resolver ) // ConsulResolver Consul 服务发现解析器 type ConsulResolver struct { consulClient *api.Client serviceName string scheme string } // NewConsulResolver 创建 Consul 解析器 func NewConsulResolver(consulAddr, serviceName string) (*ConsulResolver, error) { config : api.DefaultConfig() config.Address consulAddr client, err : api.NewClient(config) if err ! nil { return nil, fmt.Errorf(创建 Consul 客户端失败: %w, err) } return ConsulResolver{ consulClient: client, serviceName: serviceName, scheme: consul, }, nil } // Build 实现 resolver.Builder 接口 func (r *ConsulResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { return consulResolver{ consulClient: r.consulClient, serviceName: r.serviceName, cc: cc, }, nil } // Scheme 返回解析器 scheme func (r *ConsulResolver) Scheme() string { return r.scheme } type consulResolver struct { consulClient *api.Client serviceName string cc resolver.ClientConn } func (r *consulResolver) ResolveNow(options resolver.ResolveNowOptions) { r.resolve() } func (r *consulResolver) resolve() { services, _, err : r.consulClient.Health().Service(r.serviceName, , true, nil) if err ! nil { r.cc.ReportError(err) return } var addrs []resolver.Address for _, svc : range services { addrs append(addrs, resolver.Address{ Addr: fmt.Sprintf(%s:%d, svc.Service.Address, svc.Service.Port), }) } r.cc.UpdateState(resolver.State{Addresses: addrs}) } func (r *consulResolver) Close() {} // Register 注册服务到 Consul func Register(ctx context.Context, consulAddr, serviceName, addr string, port int) error { config : api.DefaultConfig() config.Address consulAddr client, err : api.NewClient(config) if err ! nil { return err } reg : api.AgentServiceRegistration{ ID: fmt.Sprintf(%s-%s, serviceName, addr), Name: serviceName, Port: port, Address: addr, Check: api.AgentServiceCheck{ GRPC: fmt.Sprintf(%s:%d, addr, port), Interval: 10s, Timeout: 5s, DeregisterCriticalServiceAfter: 30s, }, } return client.Agent().ServiceRegister(reg) }总结gRPC Protobuf 是一套成熟的微服务通信解决方案Protobuf 优势体积小、解析快、类型安全、向前兼容gRPC 优势高性能、双工流、代码生成、协议统一四种 RPC 类型一元、流式客户端、流式服务端、双向流元数据与拦截器实现认证、日志、追踪等横切关注点服务发现通过 resolver 实现负载均衡和服务发现最佳实践proto 文件集中管理统一版本使用拦截器实现横切关注点生产环境务必启用 TLS合理设置消息大小限制使用流式 API 处理大数据量场景
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2583134.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!