分布式网络通信框架-rpc (七)服务请求端对RpcChannel的重写
rpc框架在服务调用端需要完成什么任务

这是之前放过的一张图,可以看到客户端在调用服务时主要是[服务名]_stub(stub代理对象)这个类调用CallMethod方法,而stub代理对象需要向其传入一个继承自RpcChannel的子类作为参数,stub代理对象会将所有通过stub代理对象调用的rpc方法,统一在RpcChannel的CallMethod方法中做rpc方法调用的数据序列化和网络发送,简单点说就是:所有通过stub代理对象调用的rpc方法,都走到了CallMethod方法,由CallMethod方法统一做rpc方法调用的数据序列化和网络发送。
因此服务调用端主要的工作就是RpcChannel类的重写。
具体在调用端需要完成的任务
- 参数的打包
- 使用网络将参数发出
- 收到结果
RpcChannel类的重写
#pragma once
#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
// RpcChannel是抽象类,需要继承重写CallMethod方法
class MprpcChannel : public google::protobuf::RpcChannel
{
public:
private:
// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据序列化和网络发送
void CallMethod(const google::protobuf::MethodDescriptor *method,
google::protobuf::RpcController *controller,
const google::protobuf::Message *request,
google::protobuf::Message *response,
google::protobuf::Closure *done);
};
CallMethod方法主要接收的参数是
– 需要调用的方法的MethodDescriptor
– RpcController,当你调用的服务的过程中会出现问题,RpcController提供了一些需要你实现的方法来对错误信息进行记录。可以先不管。
– request:请求参数
– response:返回值
– Closure:方法完成后的回调函数
参数打包
那么第一步我们要做的就是将请求参数request按照约定好的消息结构进行打包。
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
google::protobuf::RpcController *controller,
const google::protobuf::Message *request,
google::protobuf::Message *response,
google::protobuf::Closure *done)
{
const google::protobuf::ServiceDescriptor *sd = method->service();
std::string service_name = sd->name();
std::string method_name = method->name();
// 获取参数的序列化字符串长度 args_size
uint32_t args_size = 0;
std::string args_str;
if (request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}
// 定义rpc的请求header
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);
uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str))
{
header_size = rpc_header_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}
// 组织待发送的rpc请求的字符串
std::string send_rpc_str;
send_rpc_str.insert(0, std::string((char *)&header_size, 4)); // header_size
send_rpc_str += rpc_header_str; // rpcheader
send_rpc_str += args_str;
//...
}
网络发送
由于在服务调用端不涉及高并发,我们可以直接使用socket编程来进行网络通信。
// 因为这里只是客户端进行调用rpc,不涉及高并发等要求,因此使用socket编程即可
// 使用tcp编程,完成rpc方法的远程调用
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd)
{
char errtxt[512] = {0};
sprintf(errtxt, "create socket error! errno : %d", errno);
controller->SetFailed(errtxt);
return;
}
// 读取配置文件rpcserver的信息
// std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
// uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
// rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
ZkClient zkCli;
zkCli.Start();
std::string method_path = "/" + service_name + "/" + method_name;
std::string host_data = zkCli.GetData(method_path.c_str());
if (host_data == "")
{
controller->SetFailed(method_path + " is not exit!");
return;
}
int idx = host_data.find(":");
if (idx == -1)
{
controller->SetFailed(method_path + " address is invalid!");
return;
}
std::string ip = host_data.substr(0, idx);
uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
// 连接rpc服务节点
if (-1 == connect(clientfd, (struct sockaddr *)&server_addr, sizeof(server_addr)))
{
char errtxt[512] = {0};
sprintf(errtxt, "connect error! errno : %d", errno);
controller->SetFailed(errtxt);
close(clientfd);
return;
}
// 发送rpc请求
if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
{
char errtxt[512] = {0};
sprintf(errtxt, "send error! errno : %d", errno);
controller->SetFailed(errtxt);
close(clientfd);
return;
}
// 接收rpc请求的响应值
char recv_buf[1024] = {0};
int recv_size = 0;
if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
{
char errtxt[512] = {0};
sprintf(errtxt, "recive error! errno : %d", errno);
controller->SetFailed(errtxt);
close(clientfd);
return;
}
获得响应
// 反序列化rpc调用的响应数据
// std::string response_str(recv_buf, 0, recv_size); // bug出现问题,recv_buf中遇到\0后面的数据就存不下来了,当值反序列化失败
// if (!response->ParseFromString(response_str))
if (!response->ParseFromArray(recv_buf, recv_size)) // ParseFromString存在问题,直接使用ParseFromArray来解析参数
{
char errtxt[512] = {0};
sprintf(errtxt, "parse error! errno : %d", errno);
controller->SetFailed(errtxt);
close(clientfd);
return;
}
close(clientfd);