目录
一、前言
二、UserServiceRpc_Stub
三、 CallMethod方法的重写
头文件
实现
四、rpc调用端的调用
实现
五、 google::protobuf::RpcController *controller
头文件
实现
六、总结
一、前言
在前边的文章中,我们已经大致实现了rpc服务端的各项功能代码,接下来我们就来看看,如果一个rpc调用端想要调用都要干什么。
二、UserServiceRpc_Stub
我们在预备知识就提到过我们通过在 .proto 文件中使用service关键字定义了用来描述rpc方法的类型后
service UserServiceRpc
{
rpc Login(LoginRequest) returns(LoginResponse);
}
使用 protoc 编译后会自动生成两个服务类,分别是 UserServiceRpc 、UserServiceRpc_Stub
UserServiceRpc是继承自Service这个类的,我们讲到过这个类是在rpc发布端使用的,而UserServiceRpc_Stub是继承自UserServiceRpc的,这个类是在rpc调用端使用的。
可以看到 UserServiceRpc_Stub这个类是没有默认构造函数的,想要构造这个类它必须传入 ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel
我们再来看看这个RpcChannel是什么
class PROTOBUF_EXPORT RpcChannel {
public:
inline RpcChannel() {}
virtual ~RpcChannel();
// Call the given method of the remote service. The signature of this
// procedure looks the same as Service::CallMethod(), but the requirements
// are less strict in one important way: the request and response objects
// need not be of any specific class as long as their descriptors are
// method->input_type() and method->output_type().
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller, const Message* request,
Message* response, Closure* done) = 0;
private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};
可以看到它是一个抽象类,所以我们肯定还要定义一个类Mprpcchannel来继承这个类并对该类中的CallMethod方法进行重写。
在预备知识一节中,我们已经提到了,无论是用service 关键字定义用来描述rpc方法的类型,无论定义几个该服务下的方法,最后对这些方法的调用都最终调用的是channel的CallMethod方法,即是在调用在创建 UserServiceRpc_Stub对象时传入的我们创建的Mrpcchannel中的CallMethod方法,所以我们就对CallMethod方法进行重写,集中来做所有rpc方法调用的参数的序列化和反序列化操作。
三、 CallMethod方法的重写
我们创建一个类Mrpcchannel用来继承UserServiceRpc_Stub类中的RpcChannel类并对类中的 CallMethod方法进行重写,在该方法中我们集中来做所有rpc方法调用的参数的序列化和反序列化操作。
CallMethod方法中需要做的事情:
- 将用户发送的消息按照双方协定的消息格式(header_size + header_str + args_str)序列化好。
- 通过网络发送,因为我们是客户端是不需要处理高并发的情况的,所以这里我们采用tcp编程就行。
- 接下来就是阻塞等待数据的响应。
- 最后拿到响应数据后反序列化之后返回给用户。
头文件
//mprpcchannel.h
class MprpcChannel:public google::protobuf::RpcChannel
{
//所有通过stub代理对象调用的rpc方法,都是走到了这里,统一做rpc方法调用的数据序列化和网络发送
void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done);
};
实现
//mprpcchannel.cc
/*我们约定好在进行网络传输的时候的规则是 header_size + service_name + method_name + args_size +args*/
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
google::protobuf::RpcController *controller,
const google::protobuf::Message *request,
google::protobuf::Message *response,
google::protobuf::Closure *done)//在调用端最后一个参数没用
{
//通过method的service方法得到方法属于的服务类
const google::protobuf::ServiceDescriptor *sd = method->service();
std::string service_name = sd->name(); // service_name
std::string method_name = method->name(); // method_name
// 获取参数的序列化字符串长度 args_size
uint32_t args_size = 0;
std::string args_str;
if (request->SerializeToString(&args_str))
{
// 序列化成功
args_size = args_str.size();
}
else
{
controller->SetFailed("Serialize request error!");
return;
}
// 定义rpc的请求header
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);
// 进行序列化
uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str)) // 将东西设置进rpcHeader后进行序列化
{
header_size = rpc_header_str.size();
}
else
{
controller->SetFailed("Serialize rpc header error!");
return;
}
// 组织待发送的rpc请求字符串
std::string send_rpc_str;
send_rpc_str.insert(0, std::string((char *)&header_size, 4));
send_rpc_str += rpc_header_str;
send_rpc_str += args_str;
// 打印调试信息
std::cout << "==================================================" << std::endl;
std::cout << "header_size::" << header_size << std::endl;
std::cout << "rpc_header_str::" << rpcHeader.DebugString() << std::endl;
std::cout << "service_name::" << service_name << std::endl;
std::cout << "method_name::" << method_name << std::endl;
std::cout << "args_size::" << args_size << std::endl;
std::cout << "args_str::" << args_str << std::endl;
std::cout << "==================================================" << std::endl;
// 由于这里是客户端,所以不需要处理高并发的情况,所以这边的网络发送我们采用tcp编程,完成rpc方法的远程调用
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd)
{
char errtxt[512] = {0};
sprintf(errtxt, "creat socket error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 将服务器的ip地址和端口号读一下
// std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
// uint16_t port= atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
// 之前都是使用上面的方法来获取配置文件中的参数,但是现在我们想调用什么,就去zk上查询该服务所在的host信息
zkClient zkCli;
zkCli.Start();
std::string method_path = "/" + service_name + "/" + method_name;
std::string host_data = zkCli.GetData(method_path.c_str());
if (host_data == "")
{
controller->SetFailed(method_name + "is not exist!");
return;
}
int idx = host_data.find(":");
if (idx == -1)
{
controller->SetFailed(method_path + "address is invalid!");
return;
}
std::string ip = host_data.substr(0, idx);
uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());
struct sockaddr_in server_addr;
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
// 连接rpc服务节点
if (-1 == connect(clientfd, (struct sockaddr *)&server_addr, sizeof(server_addr)))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "connect error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 发送rpc请求
if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "send error! errno:%d", errno);
controller->SetFailed(errtxt);
// exit(EXIT_FAILURE);这里不能因为没有成功发送而整个的服务全部推出了
return;
}
// 接收rpc端发来的请求响应值
char recv_buf[1024] = {0};
int recv_size = 0; // 整个接收的缓冲区中不可能全是发送过来的数据吧,所以记录一下多少数据
if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "recv error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 接下来就是把接收到的数据填进response中,这样框架就可以通过response知道响应值并返回给用户
// std::string response_str(recv_buf,0,recv_size);//string有构造函数可以使用recv_buf来初始化,即将recv_buf的从0到recv_size的这一段数据初始化response_str
// 有bug,recv_buf在遇到\0后面的数据就存不下来了****std::string response_str(recv_buf,recv_size);也可以
// if(!response->ParseFromString(response_str))反序列化rpc调用的响应数据
if (!response->ParseFromArray(recv_buf, recv_size))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "parse error! response_str:%d", recv_buf);
controller->SetFailed(errtxt);
return;
}
close(clientfd);
}
我们在调用rpc服务的时候, 我们用的是相应的描述rpc的服务的 UserServiceRpc_Stub这个类,在使用这个类的时候,我们需要传入一个Rpcchannel这个类
UserServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);
然后在这个UserServiceRpc_Stub这个类中调用的所有的 rpc 方法,最终都跑到了调用channel的CallMethod方法下。举个例子如下
UserServiceRpc_Stub stub(new MprpcChannel());
stub.Login(nullptr,&request,&response,nullptr);
像这样的通过stub调用的Login方法都是调用的channel的CallMethod方法。
四、rpc调用端的调用
在这里我们需要考虑如何去发起一个rpc调用,去调用某个服务下的某个方法,下面都以UserServiceRpc服务下的Login方法为例,我们同样要定义一个与发布端相同的 user.proto文件。然后使用 protoc 编译,接着
- 对调用的rpc框架进行初始化操作
- 实例化出一个代理对象 fixbug::UserServiceRpc_Stub stub;后面都会通过stub来调用rpc方法。
- 填写请求方法的参数
- 用stub代理调用Login方法
- 等待读取返回值。
实现
//calleruserservice.cc
int main(int argc,char* argv[])
{
//先调用框架的初始化类且全局只调用这一次
//MprpcApplication& myin= MprpcApplication::GetInstance();
//myin.Init(argc,argv);它是静态函数,也就是说它和具体对象无关,可以直接通过类名调用。
//因为你拿到对象后并没有使用它的状态,只是用来调用静态函数。下面的方法就做到了单例懒加载,且只初始化执行一次
MprpcApplication::Init(argc,argv);
//演示调用远程发布rpc的方法Login,UserServiceRpc_Stub是专门用来协助rpc客户端的
fixbug::UserServiceRpc_Stub stub(new MprpcChannel());//生成一个代理对象,以后通过stub来调用rpc方法
//rpc方法的请求参数
fixbug::LoginRequest request;//用户端发起调用,request肯定是这边给
request.set_name("zhangsan");
request.set_pwd("123456");
//rpc方法的响应
fixbug::LoginResponse response;
//发起rpc方法的调用,下面是同步的rpc方法调用过程,即它的底层是MprpcChannel::callMethod
stub.Login(nullptr,&request,&response,nullptr);//集中来做rpc方法调用的参数的序列化和网络发送了
//走到这里表示一次rpc调用完成,读取调用的结果
if(0 == response.result().errcode())
{
std::cout<<"rpc login response success:"<<response.success()<<std::endl;
}
else
{
std::cout<<"rpc login response error:"<<response.result().errmsg()<<std::endl;
}
//演示调用远程发布的rpc方法Register
fixbug::RegisterRequset req;
req.set_id(2000);
req.set_name("lisi");
req.set_pwd("12314");
fixbug::RegisterResponse rsp;
//以同步的方式发起rpc调用请求,等待返回结果
stub.Register(nullptr,&req,&rsp,nullptr);
if(0 == rsp.result().errcode())
{
std::cout<<"rpc register response success:"<<rsp.success()<<std::endl;
}
else
{
std::cout<<"rpc register response error:"<<rsp.result().errmsg()<<std::endl;
}
return 0;
}
五、 google::protobuf::RpcController *controller
前面还有最后一个坑需要填上,google::protobuf::RpcController *controller是个什么东西呢?
我们先不着急回答这个问题,我们思考一下在 calleruserservice.cc 中的代码有没有什么问题?
我们是在使用stub对象发起调用Login方法后,就开始等待着读取返回的响应了,但是我们在CallMethod方法中也看到了,在许多地方失败时都会有return直接返回的情况,比如反序列化失败,套接字创建失败等,所以这里一旦失败,直接返回。我们还有必要进行后面的工作吗,即访问响应response,此时还没有response呢。所以我们需要在这里得到一些控制信息。
我们可以看到调用的这个Login方法的第一个参数就是google::protobuf::RpcController *controller
从他的名字就可以猜到它可以存储一些控制信息,让我们清楚地知道当前rpc调用处于什么状态。
下面是 RpcController类,可以看到它的成员函数都是虚函数,它是一个抽象类,无法实例化出对象,所以还需要我们创建一个类来继承它,并且重写它的成员函数。
也就是说这个 RpcController可以携带我们rpc调用过程中的一些信息,所以接下来我们创建类继承它并对方法进行重写。
头文件
//mprpccontroller.h
class MprpcController : public google::protobuf::RpcController
{
public:
MprpcController();
void Reset();
bool Failed() const;
std::string ErrorText() const;
void SetFailed(const std::string& reason);
//对于这三个方法目前未实现具体的功能,所以暂时不用
void StartCancel();
bool IsCanceled() const;
void NotifyOnCancel(google::protobuf::Closure* callback);
private:
bool m_failed;//RPC方法执行过程中的状态
std::string m_errText;//RPC方法执行过程中的错误信息
};
实现
//mprpccontroller.cc
MprpcController::MprpcController()
{
m_failed=false;
m_errText="";
}
void MprpcController::Reset()//重置成刚开始的样子
{
m_failed=false;
m_errText="";
}
bool MprpcController::Failed() const//判断当前调用成功与否,返回状态,true就是发生问题了
{
return m_failed;
}
std::string MprpcController::ErrorText() const//返回错误信息
{
return m_errText;
}
void MprpcController::SetFailed(const std::string&reason)//设置错误
{
m_failed=true;
m_errText=reason;
}
//对于我们目前尚未实现的功能,我们实现成空函数就可以了
void MprpcController::StartCancel(){};
bool MprpcController::IsCanceled() const{};
void MprpcController::NotifyOnCancel(google::protobuf::Closure* callback){};
定义很简单,使用方法就是在 mprpcchannel.cc 中的使用,也比较简单。
之前我们在 calleruserservice.cc文件中使用stub调用Login方法时,第一个参数传的是空指针,现在传controller就行了
MprpcController controller;//实例化出一个控制对象
stub.Login(&controller,&request,&response,nullptr);
这样在 return之前就可以知道rpc调用过程中的状态了,后面我们就要这样子调用了
if (controller.Failed())
{
// 表示出问题
std::cout << controller.ErrorText() << std::endl;
}
else
{
if (0 == response.result().errcode())
{
std::cout << "rpc login response success:" << response.success() << std::endl;
}
else
{
std::cout << "rpc login response error:" << response.result().errmsg() << std::endl;
}
}
六、总结
到这里我们基本已经实现了整个框架,包括rpc发布端rpc方法的发布和rpc调用端的调用。后面我们还要给整个框架添加必不可少的一个模块——日志模块,还有zookeeper在本项目上的应用。见下一文!
感谢阅读!