C++实现分布式网络通信框架RPC(3)--rpc调用端

news2025/6/12 21:15:36

目录

一、前言

二、UserServiceRpc_Stub

三、 CallMethod方法的重写

头文件

实现

四、rpc调用端的调用

实现

五、 google::protobuf::RpcController *controller

头文件

实现

六、总结


一、前言

在前边的文章中,我们已经大致实现了rpc服务端的各项功能代码,接下来我们就来看看,如果一个rpc调用端想要调用都要干什么。

二、UserServiceRpc_Stub

我们在预备知识就提到过我们通过在 .proto 文件中使用service关键字定义了用来描述rpc方法的类型后

 service UserServiceRpc
 {
    rpc Login(LoginRequest) returns(LoginResponse);
 }

使用 protoc 编译后会自动生成两个服务类,分别是 UserServiceRpc 、UserServiceRpc_Stub

UserServiceRpc是继承自Service这个类的,我们讲到过这个类是在rpc发布端使用的,而UserServiceRpc_Stub是继承自UserServiceRpc的,这个类是在rpc调用端使用的。

可以看到 UserServiceRpc_Stub这个类是没有默认构造函数的,想要构造这个类它必须传入  ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel

我们再来看看这个RpcChannel是什么

class PROTOBUF_EXPORT RpcChannel {
 public:
  inline RpcChannel() {}
  virtual ~RpcChannel();

  // Call the given method of the remote service.  The signature of this
  // procedure looks the same as Service::CallMethod(), but the requirements
  // are less strict in one important way:  the request and response objects
  // need not be of any specific class as long as their descriptors are
  // method->input_type() and method->output_type().
  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};

可以看到它是一个抽象类,所以我们肯定还要定义一个类Mprpcchannel来继承这个类并对该类中的CallMethod方法进行重写。

在预备知识一节中,我们已经提到了,无论是用service 关键字定义用来描述rpc方法的类型,无论定义几个该服务下的方法,最后对这些方法的调用都最终调用的是channel的CallMethod方法,即是在调用在创建 UserServiceRpc_Stub对象时传入的我们创建的Mrpcchannel中的CallMethod方法,所以我们就对CallMethod方法进行重写,集中来做所有rpc方法调用的参数的序列化和反序列化操作。

三、 CallMethod方法的重写

我们创建一个类Mrpcchannel用来继承UserServiceRpc_Stub类中的RpcChannel类并对类中的 CallMethod方法进行重写,在该方法中我们集中来做所有rpc方法调用的参数的序列化和反序列化操作。

CallMethod方法中需要做的事情:

  1. 将用户发送的消息按照双方协定的消息格式(header_size + header_str + args_str)序列化好。
  2. 通过网络发送,因为我们是客户端是不需要处理高并发的情况的,所以这里我们采用tcp编程就行。
  3. 接下来就是阻塞等待数据的响应。
  4. 最后拿到响应数据后反序列化之后返回给用户。

头文件

//mprpcchannel.h
class MprpcChannel:public google::protobuf::RpcChannel
{
    //所有通过stub代理对象调用的rpc方法,都是走到了这里,统一做rpc方法调用的数据序列化和网络发送
    void CallMethod(const google::protobuf::MethodDescriptor* method,
        google::protobuf::RpcController* controller, 
        const google::protobuf::Message* request,
        google::protobuf::Message* response, 
        google::protobuf::Closure* done);
};
      

实现

//mprpcchannel.cc
/*我们约定好在进行网络传输的时候的规则是 header_size + service_name + method_name + args_size +args*/
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
                              google::protobuf::RpcController *controller,
                              const google::protobuf::Message *request,
                              google::protobuf::Message *response,
                              google::protobuf::Closure *done)//在调用端最后一个参数没用
{
     //通过method的service方法得到方法属于的服务类
    const google::protobuf::ServiceDescriptor *sd = method->service();
    std::string service_name = sd->name();    // service_name
    std::string method_name = method->name(); // method_name

    // 获取参数的序列化字符串长度 args_size
    uint32_t args_size = 0;
    std::string args_str;
    if (request->SerializeToString(&args_str))
    {
        // 序列化成功
        args_size = args_str.size();
    }
    else
    {
        controller->SetFailed("Serialize request error!");
        return;
    }

    // 定义rpc的请求header
    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    // 进行序列化
    uint32_t header_size = 0;
    std::string rpc_header_str;
    if (rpcHeader.SerializeToString(&rpc_header_str)) // 将东西设置进rpcHeader后进行序列化
    {
        header_size = rpc_header_str.size();
    }
    else
    {
        controller->SetFailed("Serialize rpc header error!");
        return;
    }

    // 组织待发送的rpc请求字符串
    std::string send_rpc_str;
    send_rpc_str.insert(0, std::string((char *)&header_size, 4));
    send_rpc_str += rpc_header_str;
    send_rpc_str += args_str;

    // 打印调试信息
    std::cout << "==================================================" << std::endl;
    std::cout << "header_size::" << header_size << std::endl;
    std::cout << "rpc_header_str::" << rpcHeader.DebugString() << std::endl;
    std::cout << "service_name::" << service_name << std::endl;
    std::cout << "method_name::" << method_name << std::endl;
    std::cout << "args_size::" << args_size << std::endl;
    std::cout << "args_str::" << args_str << std::endl;
    std::cout << "==================================================" << std::endl;

    // 由于这里是客户端,所以不需要处理高并发的情况,所以这边的网络发送我们采用tcp编程,完成rpc方法的远程调用
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == clientfd)
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "creat socket error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }
    // 将服务器的ip地址和端口号读一下
    // std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    // uint16_t port= atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
    // 之前都是使用上面的方法来获取配置文件中的参数,但是现在我们想调用什么,就去zk上查询该服务所在的host信息
    zkClient zkCli;
    zkCli.Start();
    std::string method_path = "/" + service_name + "/" + method_name;
    std::string host_data = zkCli.GetData(method_path.c_str());
    if (host_data == "")
    {
        controller->SetFailed(method_name + "is not exist!");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1)
    {
        controller->SetFailed(method_path + "address is invalid!");
         return;
    }
    std::string ip = host_data.substr(0, idx);
    uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());

    struct sockaddr_in server_addr;
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);

    // 连接rpc服务节点
    if (-1 == connect(clientfd, (struct sockaddr *)&server_addr, sizeof(server_addr)))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "connect error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 发送rpc请求
    if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "send error! errno:%d", errno);
        controller->SetFailed(errtxt);
        // exit(EXIT_FAILURE);这里不能因为没有成功发送而整个的服务全部推出了
        return;
    }
    // 接收rpc端发来的请求响应值
    char recv_buf[1024] = {0};
    int recv_size = 0; // 整个接收的缓冲区中不可能全是发送过来的数据吧,所以记录一下多少数据
    if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "recv error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 接下来就是把接收到的数据填进response中,这样框架就可以通过response知道响应值并返回给用户
    // std::string response_str(recv_buf,0,recv_size);//string有构造函数可以使用recv_buf来初始化,即将recv_buf的从0到recv_size的这一段数据初始化response_str
    // 有bug,recv_buf在遇到\0后面的数据就存不下来了****std::string response_str(recv_buf,recv_size);也可以
    // if(!response->ParseFromString(response_str))反序列化rpc调用的响应数据
    if (!response->ParseFromArray(recv_buf, recv_size))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "parse error! response_str:%d", recv_buf);
        controller->SetFailed(errtxt);
        return;
    }
    close(clientfd);
}

我们在调用rpc服务的时候, 我们用的是相应的描述rpc的服务的 UserServiceRpc_Stub这个类,在使用这个类的时候,我们需要传入一个Rpcchannel这个类

UserServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);

然后在这个UserServiceRpc_Stub这个类中调用的所有的 rpc 方法,最终都跑到了调用channel的CallMethod方法下。举个例子如下

UserServiceRpc_Stub stub(new MprpcChannel());
stub.Login(nullptr,&request,&response,nullptr);

像这样的通过stub调用的Login方法都是调用的channel的CallMethod方法。

四、rpc调用端的调用

在这里我们需要考虑如何去发起一个rpc调用,去调用某个服务下的某个方法,下面都以UserServiceRpc服务下的Login方法为例,我们同样要定义一个与发布端相同的 user.proto文件。然后使用 protoc 编译,接着

  1. 对调用的rpc框架进行初始化操作
  2. 实例化出一个代理对象  fixbug::UserServiceRpc_Stub stub;后面都会通过stub来调用rpc方法。
  3. 填写请求方法的参数
  4. 用stub代理调用Login方法
  5. 等待读取返回值。

实现

//calleruserservice.cc
int main(int argc,char* argv[])
{
    //先调用框架的初始化类且全局只调用这一次
    //MprpcApplication& myin= MprpcApplication::GetInstance();
    //myin.Init(argc,argv);它是静态函数,也就是说它和具体对象无关,可以直接通过类名调用。
    //因为你拿到对象后并没有使用它的状态,只是用来调用静态函数。下面的方法就做到了单例懒加载,且只初始化执行一次
    MprpcApplication::Init(argc,argv);

    //演示调用远程发布rpc的方法Login,UserServiceRpc_Stub是专门用来协助rpc客户端的
    fixbug::UserServiceRpc_Stub stub(new MprpcChannel());//生成一个代理对象,以后通过stub来调用rpc方法

    //rpc方法的请求参数
    fixbug::LoginRequest request;//用户端发起调用,request肯定是这边给
    request.set_name("zhangsan");
    request.set_pwd("123456");

    //rpc方法的响应
    fixbug::LoginResponse response;

    //发起rpc方法的调用,下面是同步的rpc方法调用过程,即它的底层是MprpcChannel::callMethod
    stub.Login(nullptr,&request,&response,nullptr);//集中来做rpc方法调用的参数的序列化和网络发送了

    //走到这里表示一次rpc调用完成,读取调用的结果
    if(0 == response.result().errcode())
    {
        std::cout<<"rpc login response success:"<<response.success()<<std::endl;
    }
    else
    {
        std::cout<<"rpc login response error:"<<response.result().errmsg()<<std::endl;
    }

    //演示调用远程发布的rpc方法Register
    fixbug::RegisterRequset req;
    req.set_id(2000);
    req.set_name("lisi");
    req.set_pwd("12314");

    fixbug::RegisterResponse rsp;

    //以同步的方式发起rpc调用请求,等待返回结果
    stub.Register(nullptr,&req,&rsp,nullptr);
    if(0 == rsp.result().errcode())
    {
        std::cout<<"rpc register response success:"<<rsp.success()<<std::endl;
    }
    else
    {
        std::cout<<"rpc register response error:"<<rsp.result().errmsg()<<std::endl;
    }

    return 0;
}

五、 google::protobuf::RpcController *controller

前面还有最后一个坑需要填上,google::protobuf::RpcController *controller是个什么东西呢?

我们先不着急回答这个问题,我们思考一下在  calleruserservice.cc 中的代码有没有什么问题?

我们是在使用stub对象发起调用Login方法后,就开始等待着读取返回的响应了,但是我们在CallMethod方法中也看到了,在许多地方失败时都会有return直接返回的情况,比如反序列化失败,套接字创建失败等,所以这里一旦失败,直接返回。我们还有必要进行后面的工作吗,即访问响应response,此时还没有response呢。所以我们需要在这里得到一些控制信息。

我们可以看到调用的这个Login方法的第一个参数就是google::protobuf::RpcController *controller

从他的名字就可以猜到它可以存储一些控制信息,让我们清楚地知道当前rpc调用处于什么状态。

下面是 RpcController类,可以看到它的成员函数都是虚函数,它是一个抽象类,无法实例化出对象,所以还需要我们创建一个类来继承它,并且重写它的成员函数。

也就是说这个 RpcController可以携带我们rpc调用过程中的一些信息,所以接下来我们创建类继承它并对方法进行重写。

头文件

//mprpccontroller.h

class MprpcController : public google::protobuf::RpcController
{
    public:
        MprpcController();
        void Reset();
        bool Failed() const;
        std::string ErrorText() const;
        void SetFailed(const std::string& reason);

        //对于这三个方法目前未实现具体的功能,所以暂时不用
        void StartCancel();
        bool IsCanceled() const;
        void NotifyOnCancel(google::protobuf::Closure* callback);
    private:
        bool m_failed;//RPC方法执行过程中的状态
        std::string m_errText;//RPC方法执行过程中的错误信息

};  

实现

//mprpccontroller.cc
MprpcController::MprpcController()
{
    m_failed=false;
    m_errText="";
}

void MprpcController::Reset()//重置成刚开始的样子
{
    m_failed=false;
    m_errText="";
}
bool MprpcController::Failed() const//判断当前调用成功与否,返回状态,true就是发生问题了
{
    return m_failed;
}
std::string MprpcController::ErrorText() const//返回错误信息
{
    return m_errText;
}

void MprpcController::SetFailed(const std::string&reason)//设置错误
{
    m_failed=true;
    m_errText=reason;
}

//对于我们目前尚未实现的功能,我们实现成空函数就可以了
void MprpcController::StartCancel(){};
bool MprpcController::IsCanceled() const{};
void MprpcController::NotifyOnCancel(google::protobuf::Closure* callback){};

 定义很简单,使用方法就是在 mprpcchannel.cc 中的使用,也比较简单。

之前我们在 calleruserservice.cc文件中使用stub调用Login方法时,第一个参数传的是空指针,现在传controller就行了

MprpcController controller;//实例化出一个控制对象
stub.Login(&controller,&request,&response,nullptr);

这样在 return之前就可以知道rpc调用过程中的状态了,后面我们就要这样子调用了

if (controller.Failed())
{
    // 表示出问题
    std::cout << controller.ErrorText() << std::endl;
}
else
{
    if (0 == response.result().errcode())
    {
        std::cout << "rpc login response success:" << response.success() << std::endl;
    }
    else
    {
        std::cout << "rpc login response error:" << response.result().errmsg() << std::endl;
    }
}

六、总结

到这里我们基本已经实现了整个框架,包括rpc发布端rpc方法的发布和rpc调用端的调用。后面我们还要给整个框架添加必不可少的一个模块——日志模块,还有zookeeper在本项目上的应用。见下一文!


感谢阅读!

    本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2407988.html

    如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

    相关文章

    linux之kylin系统nginx的安装

    一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…

    C++_核心编程_多态案例二-制作饮品

    #include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…

    深入剖析AI大模型:大模型时代的 Prompt 工程全解析

    今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…

    使用VSCode开发Django指南

    使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…

    超短脉冲激光自聚焦效应

    前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应&#xff0c;这是一种非线性光学现象&#xff0c;主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场&#xff0c;对材料产生非线性响应&#xff0c;可能…

    Linux 文件类型,目录与路径,文件与目录管理

    文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…

    Flask RESTful 示例

    目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…

    C++初阶-list的底层

    目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…

    【kafka】Golang实现分布式Masscan任务调度系统

    要求&#xff1a; 输出两个程序&#xff0c;一个命令行程序&#xff08;命令行参数用flag&#xff09;和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽&#xff0c;然后将消息推送到kafka里面。 服务端程序&#xff1a; 从kafka消费者接收…

    iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

    美国西海岸的夏天&#xff0c;再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至&#xff0c;这不仅是开发者的盛宴&#xff0c;更是全球数亿苹果用户翘首以盼的科技春晚。今年&#xff0c;苹果依旧为我们带来了全家桶式的系统更新&#xff0c;包括 iOS 26、iPadOS 26…

    TDengine 快速体验(Docker 镜像方式)

    简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能&#xff0c;本节首先介绍如何通过 Docker 快速体验 TDengine&#xff0c;然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker&#xff0c;请使用 安装包的方式快…

    stm32G473的flash模式是单bank还是双bank?

    今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…

    springboot 百货中心供应链管理系统小程序

    一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…

    调用支付宝接口响应40004 SYSTEM_ERROR问题排查

    在对接支付宝API的时候&#xff0c;遇到了一些问题&#xff0c;记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…

    智慧医疗能源事业线深度画像分析(上)

    引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…

    Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

    HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…

    微信小程序之bind和catch

    这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…

    RocketMQ延迟消息机制

    两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…

    CTF show Web 红包题第六弹

    提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框&#xff0c;很难让人不联想到SQL注入&#xff0c;但提示都说了不是SQL注入&#xff0c;所以就不往这方面想了 ​ 先查看一下网页源码&#xff0c;发现一段JavaScript代码&#xff0c;有一个关键类ctfs…

    docker详细操作--未完待续

    docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…