用go从零构建写一个RPC(仿gRPC,tRPC)--- 版本1

news2025/7/7 11:09:57

希望借助手写这个go的中间件项目,能够理解go语言的特性以及用go写中间件的优势之处,同时也是为了更好的使用和优化公司用到的trpc,并且作者之前也使用过grpc并有一定的兴趣,所以打算从0构建一个rpc系统,对于生产环境已经投入使用的项目抽丝剥茧后,再从0构建,从而更好的理解这个项目和做一个RPC需要注意的地方

打算分为多个版本,从最基本的功能,到逐渐加入新功能和新特性,不断的完善。其中也有一些作者本身的思考优化,其中不足和有误之处还望大家指正

代码地址(目前已经有两个版本): https://github.com/karatttt/MyRPC

Server端

rpc首先有多个service,每一个service对应多个方法,当请求到来时再正确路由到对应的方法,通过server端处理后返回client端。所以server端主要做的就是一:注册service和对应的method,二:解析配置文件启动Server, 三:能够正确路由到来的请求并返回client。

service和Method的注册

grpc和trpc都是使用protobuf作为序列化格式,这里我们的项目也用protobuf格式进行序列化,成熟的rpc项目正常会有对应的工具,我们写好proto文件和对应的service的实现类后,使用自动化构建工具可以生成桩代码,包括以下部分:

  1. 消息类(Message Struct): 把你 .proto 里面定义的请求、响应对象变成对应的语言结构体,比如 UserRequest、UserReply
  2. 服务接口(Service Interface): 把你 .proto 里面定义的方法变成一组接口或基类,供你实现,比如 GetUser(ctx, req)
  3. 客户端 Stub :客户端可以直接用来调用远程方法的代码(自动封装了序列化、网络传输、重试等逻辑),类似于java的动态代理
  4. 服务端 Stub :服务端接收到请求后,自动反序列化,然后回调你实现的业务逻辑,也类似于java的动态代理

这里我们尝试通过一个proto文件,自己实现一个server端的桩代码

syntax = "proto3";

package myrpc.helloworld;
option go_package="/pb";

service Greeter {
  rpc Hello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string msg = 1;
}

message HelloReply {
  string msg = 1;
}

第一步,根据填的方法写一个接口:

// 具体方法接口
type HelloServer interface {
	// SayHello is the method that will be called by the client.
	Hello(req *HelloRequest) (*HelloReply, error)
}

第二步,我们对每一个方法写一个handler,写实际上的处理逻辑,即如何反序列化,然后回调实际写的业务逻辑,再返回结构体。具体的如何序列化反序列化实现我们后面再看

func HelloServer_Hello_Handler(srv interface{}, req []byte) (interface{}, error) {
	
	// 这里的srv是HelloServer的实现类,我们自己写的
	// 通过类型断言将srv转换为HelloServer类型
	helloServer, ok := srv.(HelloServer)
	if !ok {
		return nil, fmt.Errorf("HelloServer_Hello_Handler: %v", "type assertion failed")
	}
	// 调用HelloServer的Hello方法
	// 将req反序列化
	reqBody := &HelloRequest{}
	err := codec.Unmarshal(req, reqBody)
	if err != nil {
		return nil, fmt.Errorf("HelloServer_Hello_Handler: %v", err)
	}
	// 调用实际我们写的业务逻辑
	reply, err := helloServer.Hello(reqBody)
	
	if err != nil {
		fmt.Printf("HelloServer_Hello_Handler: %v", err)
		return nil, err
	}
	return reply, nil
}

第三步,我们写了Handler,当然要让server端能够路由到这个Handler,所以这个Handler需要绑定一个方法名和服务名,作为key保存再server端的一个map里,这样就可以正确路由。所以我们可以写一个方法,将这个映射关系注册到server里。

这个server的Register方法,我们后面再来实现。

// 映射关系
var HelloServer_ServiceDesc = server.ServiceDesc{
		ServiceName: "helloworld",
		HandlerType: (*HelloServer)(nil),
		Methods: []server.MethodDesc{
			{
				MethodName: "Hello",
				// 当接受到客户端调用的Hello方法时,server将会调用这个方法
				Func:    HelloServer_Hello_Handler,
			},
		},
	}

// 绑定方法
func RegisterHelloServer(s *server.Server, svr interface{}) error {
	if err := s.Register(&HelloServer_ServiceDesc, svr); err != nil {
		panic(fmt.Sprintf("Greeter register error:%v", err))
	}
	return nil
}

Server端的启动

  • Server启动的时候,需要根据我们写的配置文件以得知每一个service的name,以及他们对应的ip和端口号(当然后续还有其他的配置),正常多个service的ip和端口号是一样的,也就是说serve启动的时候,统一暴露一个端口用于rpc调用。
  • 所以server启动的流程是:一:读取配置,二:根据配置名创建多个service并保存
func NewServer() *Server {
	// 1. 创建一个Server实例
	server := &Server{
		services: make(map[string]Service),
	}

	// 2. 读取配置文件
	config, err := loadConfig("./rpc.yaml")
	if err != nil {
		fmt.Print("读取配置文件出错")
	}

	// 3. 创建服务
	for _, svc := range config.Server.Service {
		// 创建服务,这里创建了service实例
		service := NewService(svc.Name, WithAddress(fmt.Sprintf("%s:%d", svc.IP, svc.Port)))

		// 添加到服务映射
		server.services[svc.Name] = service
	}
	return server
}

Service类的实现

前面server端启动的时候创建了所有的service类,这里我们看看具体service应该做什么。

当请求进来时,首先找到service,再找到对应的Method,所以service应该持有method的map,以及在这里实现前面提到的Register逻辑。

同时,每一个service应该有一个serve方法,即提供服务,就是这里开始监听请求,路由和处理,这个后续会详细展开。

我们再为service实现Handler接口,赋予处理业务逻辑的能力,这个接口就是为了路由找到service里的method并调用它,这个Handler详细我们后面再看

// 定义接口,提供一些服务的注册和开启服务的功能
type Service interface {
	// Register registers a service with the server.
	// The serviceName is the name of the service, and service is the implementation of the service.
	Register(serviceDesc *ServiceDesc, service interface{}) error
	// Serve starts the server and listens for incoming connections.
	Serve(address string) error
}
// 定义一个Handler接口,service实现了这个接口
type Handler interface {
	Handle(ctx context.Context, frame []byte) (rsp []byte, err error)
}
	

我们先看看比较简单的regsiter方法,虽然registerMethods看起来复杂,但是实际上就是将前面桩代码的Handler作为一个函数存在map里

// 实现service的Register方法,填充service的各个属性
func (s *service) Register(serviceDesc *ServiceDesc, service interface{}) error {

	// 初始化Transport
	s.opts.Transport = transport.DefaultServerTransport

	s.registerMethods(serviceDesc.Methods, service)

	return nil
}

// 注册普通方法
func (s *service) registerMethods(methods []MethodDesc, serviceImpl interface{}) error {
	for _, method := range methods {
		if _, exists := s.handler[method.MethodName]; exists {
			return fmt.Errorf("duplicate method name: %s", method.MethodName)
		}
		s.handler[method.MethodName] = func(req []byte) (rsp interface{}, err error) {
			if fn, ok := method.Func.(func(svr interface{}, req []byte) (rsp interface{}, err error)); ok {
				// 这里调用的就是rpc.go里面的实际的handler方法
				return fn(serviceImpl, req)
			}
			return nil, fmt.Errorf("method.Func is not a valid function")
		}
	}
	return nil
}

Service类的Server方法处理请求

func (s *service) Serve(address string) error {
	fmt.Printf("Server is listening on %s\n", address)

	// 将service作为Handler传入transport,后续接收到请求,会调用service的Handle方法
	s.opts.Transport.RegisterHandler(s)
	err := s.opts.Transport.ListenAndServe(context.Background(), "tcp", address)
	if err != nil {
		return fmt.Errorf("failed to listen: %v", err)
	}
	return nil
}
  • 这个Serve方法会在Server端启动的时候,依次触发每一个service类的这个Serve方法,意即为每一个service提供处理请求的能力
  • 这里做了一个serverTransport主要负责网络请求,我们重点关注ListenAndServe
// ListenAndServe 监听并处理 TCP 连接
func (t *serverTransport) ListenAndServe(ctx context.Context, network, address string) error {

	ln, err := net.Listen(network, address)
	if err != nil {
		return fmt.Errorf("failed to listen: %w", err)
	}
	defer ln.Close()

	go func() {
		<-ctx.Done()
		ln.Close()
	}()

	return t.serveTCP(ctx, ln)
}

// serveTCP 处理 TCP 连接
func (t *serverTransport) serveTCP(ctx context.Context, ln net.Listener) error {
	fmt.Print("开始监听TCP连接")
	for {
		conn, err := ln.Accept()
		if err != nil {
			select {
			case <-ctx.Done():
				return nil // 退出监听
			default:
				fmt.Println("accept error:", err)
			}
			continue
		}
		go t.handleConnection(ctx, conn)
	}
}
// handleConnection 处理单个连接
func (t *serverTransport) handleConnection(ctx context.Context, conn net.Conn) {
	//TODO 这里可以做一个处理业务逻辑的协程池
	// 实际上每个连接一个协程,同时负责读取请求并直接处理业务逻辑也是可行的,读取请求时如果阻塞,Go调度器会自动切换到其他协程执行
	// 但是协程池可以限制同时处理业务逻辑的协程数量,避免请求量大时,过多协程导致的资源消耗

	// 这里是处理完一个请求就释放连接,后续可以考虑长连接
	defer conn.Close()
	fmt.Println("New connection from", conn.RemoteAddr())
	// 读取帧
	frame, err := codec.ReadFrame(conn)
	if err != nil {
		fmt.Println("read frame error:", err)
		return
	}
	// 调用service的Handler执行结果
	response, err := t.ConnHandler.Handle(ctx, frame)
	if err != nil {
		fmt.Println("handle error:", err)
		return
	}
	// 发送响应,此时已经是完整帧
	conn.Write(response)
}
  • 以上的代码简单来说就是,开启一个coonection,for循环accept请求,一旦请求到达,开启协程进行实际的业务逻辑处理
  • 这个 t.ConnHandler.Handle(ctx, frame),实际上就是service里的Handler方法,当transport收到请求时,回到我们的service的Handler方法执行。
  • 对于codec.ReadFrame(conn)我们下面重点看看

Service类的Handler方法

接收到请求,我们的处理过程应该是这样:

  1. 接收codec.ReadFrame后得到原始字节流(frame)
  2. 解码frame
  3. 调用对应的业务方法 handler(其间反序列化)
  4. 把业务返回结果序列化
  5. 编码生成frame返回给调用方

首先设计Frame结构如下:
在这里插入图片描述

ReadFrame

即根据帧头读取一段完整的自定义协议数据,解决半包和粘包问题,先读16字节的帧头解析各字段,校验魔数和版本号,再根据帧头中记录的协议数据长度和消息体长度继续读取剩下的内容,最后把帧头和帧体拼成一个完整的字节数组返回。


func ReadFrame(conn net.Conn) ([]byte, error) {
	buf := bufio.NewReader(conn)
	// 读取帧头
	headerBuf := make([]byte, HeaderLength)
	n, err := io.ReadFull(buf, headerBuf)
	if err != nil {
		return nil, fmt.Errorf("read header error: %v, read %d bytes", err, n)
	}
	// 正确解析所有字段
	header := FrameHeader{
		MagicNumber:    binary.BigEndian.Uint16(headerBuf[0:2]),
		Version:        headerBuf[2],
		MessageType:    headerBuf[3],
		SequenceID:     binary.BigEndian.Uint32(headerBuf[4:8]),
		ProtocolLength: binary.BigEndian.Uint32(headerBuf[8:12]),
		BodyLength:     binary.BigEndian.Uint32(headerBuf[12:16]),
	}
	if header.MagicNumber != MagicNumber {
		return nil, fmt.Errorf("invalid magic number: %d", header.MagicNumber)
	}
	if header.Version != Version {
		return nil, fmt.Errorf("unsupported version: %d", header.Version)
	}
	// 读取协议数据 + 消息体
	frameBody := make([]byte, header.ProtocolLength+header.BodyLength)
	_, err = io.ReadFull(buf, frameBody)
	if err != nil {
		return nil, fmt.Errorf("read body error: %v", err)
	}
	// 拼接完整帧
	frame := append(headerBuf, frameBody...)
	return frame, nil
}

Decode

读取到Frame后,需要解析出其中的消息体,并将读取到的协议数据存起来

func (c *servercodec) Decode(msg internel.Message, frame []byte) ([]byte, error) {
	// 解析帧头
	header := FrameHeader{
		MagicNumber:    binary.BigEndian.Uint16(frame[0:]),
		Version:        frame[2],
		MessageType:    frame[3],
		SequenceID:     binary.BigEndian.Uint32(frame[4:]),
		ProtocolLength: binary.BigEndian.Uint32(frame[8:]),
		BodyLength:     binary.BigEndian.Uint32(frame[12:]),
	}

	// 验证魔数和版本
	if header.MagicNumber != MagicNumber {
		return nil, fmt.Errorf("invalid magic number: %d", header.MagicNumber)
	}
	if header.Version != Version {
		return nil, fmt.Errorf("unsupported version: %d", header.Version)
	}

	// 提取协议数据
	protocolData := frame[HeaderLength : HeaderLength+header.ProtocolLength]

	// 解析协议数据
	proto, err := DeserializeProtocolData(protocolData)
	if err != nil {
		return nil, fmt.Errorf("parse protocol data error: %v", err)
	}

	// 设置到消息中
	msg.WithServiceName(proto.ServiceName)
	msg.WithMethodName(proto.MethodName)

	// 返回消息体
	return frame[HeaderLength+header.ProtocolLength:], nil
}

Unmarshal

得到消息体后,还是字节数组,这个时候根据protobuf的格式,反序列化成对应的结构体(这个方法的调用在前面的桩代码的HelloServer_Hello_Handler里)

// Unmarshal 将 protobuf 字节数组反序列化为结构体
func Unmarshal(rspDataBuf []byte, rspBody interface{}) error {
	msg, ok := rspBody.(proto.Message)
	if !ok {
		return fmt.Errorf("Unmarshal: rspBody does not implement proto.Message")
	}
	return proto.Unmarshal(rspDataBuf, msg)
}

反序列化后,即可处理业务逻辑,返回的响应的结构仍需要序列化,编码(补充协议数据和帧头),返回客户端,这里就不再详细说明

这里先写server,client下一篇文章再讲

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2344007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

django之账号管理功能

账号管理功能 目录 1.账号管理页面 2.新增账号 3.修改账号 4.账号重置密码 5.删除账号功能 6.所有代码展示集合 7.运行结果 这一片文章, 我们需要新增账号管理功能, 今天我们写到的代码, 基本上都是用到以前所过的知识, 不过也有需要注意的细节。 一、账号管理界面 …

月之暗面开源 Kimi-Audio-7B-Instruct,同时支持语音识别和语音生成

我们向您介绍在音频理解、生成和对话方面表现出色的开源音频基础模型–Kimi-Audio。该资源库托管了 Kimi-Audio-7B-Instruct 的模型检查点。 Kimi-Audio 被设计为通用的音频基础模型&#xff0c;能够在单一的统一框架内处理各种音频处理任务。主要功能包括&#xff1a; 通用功…

IDEA配置将Servlet真正布署到Tomcat

刚开始只能IDEA运行完Servlet web application 并保持IDEA运行才能通过浏览器访问到我的Servlet&#xff0c;跟想象中的不一样&#xff0c;不应该是IDEA运行完项目以后只要打开Tomcat就能访问吗&#xff1f;事实时运行完项目只要关掉IDEA就不能再访问到应用了&#xff0c;而且T…

刚体运动 (位置向量 - 旋转矩阵) 笔记 1.1~1.3 (台大机器人学-林沛群)

目录 1. 理解刚体的“自由度”&#xff08;Degrees of Freedom, DOF&#xff09; 1.1 平面运动 (2D) 1.2 空间运动 (3D) 2. 统一描述&#xff1a;引入“体坐标系”&#xff08;Body Frame&#xff09; 3. 从“状态”到“运动”&#xff1a;引入微分 3.1 补充&#xff1a;…

openAICEO山姆奥特曼未来预测雄文之三个观察

《三个观察》 山姆奥特曼 这篇文章主要讲的是关于AGI&#xff08;人工通用智能&#xff09;的未来发展及其对社会的影响&#xff0c;用大白话总结如下&#xff1a; 核心观点&#xff1a; AGI是什么&#xff1f; AGI是一种能像人类一样解决各种复杂问题的智能系统&#xff0c;比…

比象AI创作系统,多模态大模型:问答分析+AI绘画+管理后台系统

比象AI创作系统是新一代集智能问答、内容创作与商业运营于一体的综合型AI平台。本系统深度融合GPT-4.0/GPT-4o多模态大模型技术&#xff0c;结合实时联网搜索与智能分析能力&#xff0c;打造了从内容生产到商业变现的完整闭环解决方案。 智能问答中枢 系统搭载行业领先的对话…

Docker-高级使用

前言 书接上文Docker-初级安装及使用_用docker安装doccano-CSDN博客&#xff0c;我们讲解了Docker的基本操作&#xff0c;下面我们讲解的是高级使用&#xff0c;请大家做好准备&#xff01; 大家如果是从初级安装使用过来的话&#xff0c;建议把之前镜像和搭载的容器数据卷里面…

计算机网络 | Chapter1 计算机网络和因特网

&#x1f493;个人主页&#xff1a;mooridy-CSDN博客 &#x1f493;文章专栏&#xff1a;《计算机网络&#xff1a;自定向下方法》 大纲式阅读笔记_mooridy的博客-CSDN博客 &#x1f339;关注我&#xff0c;和我一起学习更多计算机网络的知识 &#x1f51d;&#x1f51d; 目录 …

开源项目实战学习之YOLO11:ultralytics-cfg-datasets-Objects365、open-images-v7.yaml文件(六)

&#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 medical - pills.yaml 通常用于配置与医学药丸检测任务相关的参数和信息 Objects365.yaml 用于配置与 Objects365 数据集相关信息的文件。Objects365 数据集包含 365 个不同的物体类别…

蚂蚁集团“Plan A”重磅登场,开启AI未来

近期&#xff0c;蚂蚁集团面向全球高潜AI人才&#xff0c;正式发布顶级专项招募计划——“Plan A”。作为其“蚂蚁星”校招体系的全新升级模块&#xff0c;Plan A聚焦人工智能领域科研精英&#xff0c;旨在与全球高校AI研究者协同突破AGI前沿&#xff0c;共绘技术未来图谱。 蚂…

高中数学联赛模拟试题精选第18套几何题

在 △ A B C \triangle ABC △ABC 中, A B < A C AB< AC AB<AC, 点 K K K, L L L, M M M 分别是边 B C BC BC, C A C A CA, A B AB AB 的中点. △ A B C \triangle ABC △ABC 的内切圆圆心为 I I I, 且与边 B C BC BC 相切于点 D D D. 直线 l l l 经过线段…

Java 富文本转word

前言&#xff1a; 本文的目的是将传入的富文本内容(html标签&#xff0c;图片)并且分页导出为word文档。 所使用的为docx4j 一、依赖导入 <!-- 富文本转word --><dependency><groupId>org.docx4j</groupId><artifactId>docx4j</artifactId&…

多模态大语言模型arxiv论文略读(四十三)

InteraRec: Screenshot Based Recommendations Using Multimodal Large Language Models ➡️ 论文标题&#xff1a;InteraRec: Screenshot Based Recommendations Using Multimodal Large Language Models ➡️ 论文作者&#xff1a;Saketh Reddy Karra, Theja Tulabandhula …

GPU加速-系统CUDA12.5-Windows10

误区注意 查看当前系统可支持的最高版本cuda&#xff1a;nvidia-smi 说明&#xff1a; 此处显示的12.7只是驱动对应的最高版本&#xff0c;不一定是 / 也不一定需要是 当前Python使用的版本。但我们所安装的CUDA版本需要 小于等于它&#xff08;即≤12.7&#xff09;因此即使…

kafka课后总结

Kafka是由LinkedIn开发的分布式发布 - 订阅消息系统&#xff0c;具备高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性和高并发等特性。其主要角色包括Broker、Topic、Partition、Producer、Consumer、Consumer Group、replica、leader、follower和controller。消息系统中存…

【股票系统】使用docker本地构建ai-hedge-fund项目,模拟大师炒股进行分析。人工智能的对冲基金的开源项目

股票系统: https://github.com/virattt/ai-hedge-fund 镜像地址: https://gitcode.com/gh_mirrors/ai/ai-hedge-fund 项目地址: https://gitee.com/pythonstock/docker-run-ai-hedge-fund 这是一个基于人工智能的对冲基金的原理验证项目。本项目旨在探讨利用人工智能进行…

施工安全巡检二维码制作

进入新时代以来&#xff0c;人们对安全的重视程度越来越高。特别在建筑施工行业&#xff0c;安全不仅是关乎着工人的性命&#xff0c;更是承载着工人背后家庭的幸福生活。此时就诞生了安全巡检的工作&#xff0c;而巡检过程中内容庞杂&#xff0c;安全生产检查、隐患排查、施工…

基于 Google Earth Engine (GEE) 的土地利用变化监测

一、引言 土地利用变化是全球环境变化的重要组成部分&#xff0c;对生态系统、气候和人类社会产生深远影响。利用遥感技术可以快速、准确地获取土地利用信息&#xff0c;监测其变化情况。本文将详细介绍如何使用 GEE 对特定区域的 Landsat 影像进行处理&#xff0c;实现土地利…

BT169-ASEMI无人机专用功率器件BT169

编辑&#xff1a;ll BT169-ASEMI无人机专用功率器件BT169 型号&#xff1a;BT169 品牌&#xff1a;ASEMI 封装&#xff1a;SOT-23 批号&#xff1a;最新 引脚数量&#xff1a;3 特性&#xff1a;单向可控硅 工作温度&#xff1a;-40℃~150℃ BT169单向可控硅&#xff…

AI图像编辑器 Luminar Neo 便携版 Win1.24.0.14794

如果你对图像编辑有兴趣&#xff0c;但又不想花费太多时间学习复杂的软件操作&#xff0c;那么 Luminar Neo 可能就是你要找的完美工具。作为一款基于AI技术的创意图像编辑器&#xff0c;Luminar Neo简化了复杂的编辑流程&#xff0c;即使是没有任何图像处理经验的新手&#xf…