分布式网络通信框架-rpc (七)服务请求端对RpcChannel的重写

rpc框架在服务调用端需要完成什么任务

这是之前放过的一张图,可以看到客户端在调用服务时主要是[服务名]_stub(stub代理对象)这个类调用CallMethod方法,而stub代理对象需要向其传入一个继承自RpcChannel的子类作为参数,stub代理对象会将所有通过stub代理对象调用的rpc方法,统一在RpcChannel的CallMethod方法中做rpc方法调用的数据序列化和网络发送,简单点说就是:所有通过stub代理对象调用的rpc方法,都走到了CallMethod方法,由CallMethod方法统一做rpc方法调用的数据序列化和网络发送。
因此服务调用端主要的工作就是RpcChannel类的重写。

具体在调用端需要完成的任务

  • 参数的打包
  • 使用网络将参数发出
  • 收到结果

RpcChannel类的重写

#pragma once

#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>

// RpcChannel是抽象类,需要继承重写CallMethod方法
class MprpcChannel : public google::protobuf::RpcChannel
{
public:
private:
    // 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据序列化和网络发送
    void CallMethod(const google::protobuf::MethodDescriptor *method,
                    google::protobuf::RpcController *controller,
                    const google::protobuf::Message *request,
                    google::protobuf::Message *response,
                    google::protobuf::Closure *done);
};

CallMethod方法主要接收的参数是
– 需要调用的方法的MethodDescriptor
– RpcController,当你调用的服务的过程中会出现问题,RpcController提供了一些需要你实现的方法来对错误信息进行记录。可以先不管。
– request:请求参数
– response:返回值
– Closure:方法完成后的回调函数

参数打包

那么第一步我们要做的就是将请求参数request按照约定好的消息结构进行打包。

void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
                              google::protobuf::RpcController *controller,
                              const google::protobuf::Message *request,
                              google::protobuf::Message *response,
                              google::protobuf::Closure *done)
{
const google::protobuf::ServiceDescriptor *sd = method->service();
std::string service_name = sd->name();
std::string method_name = method->name();
// 获取参数的序列化字符串长度 args_size
uint32_t args_size = 0;
std::string args_str;
if (request->SerializeToString(&args_str))
{
    args_size = args_str.size();
}
else
{
    controller->SetFailed("serialize request error!");
    return;
}
// 定义rpc的请求header
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);
uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str))
{
    header_size = rpc_header_str.size();
}
else
{
    controller->SetFailed("serialize request error!");
    return;
}
// 组织待发送的rpc请求的字符串
std::string send_rpc_str;
send_rpc_str.insert(0, std::string((char *)&header_size, 4)); // header_size
send_rpc_str += rpc_header_str;                               // rpcheader
send_rpc_str += args_str;
//...
}

网络发送

由于在服务调用端不涉及高并发,我们可以直接使用socket编程来进行网络通信。

// 因为这里只是客户端进行调用rpc,不涉及高并发等要求,因此使用socket编程即可
    // 使用tcp编程,完成rpc方法的远程调用
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == clientfd)
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "create socket error! errno : %d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 读取配置文件rpcserver的信息
    // std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    // uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());

    // rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
    ZkClient zkCli;
    zkCli.Start();
    std::string method_path = "/" + service_name + "/" + method_name;
    std::string host_data = zkCli.GetData(method_path.c_str());
    if (host_data == "")
    {
        controller->SetFailed(method_path + " is not exit!");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1)
    {
        controller->SetFailed(method_path + " address is invalid!");
        return;
    }
    std::string ip = host_data.substr(0, idx);
    uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

    // 连接rpc服务节点
    if (-1 == connect(clientfd, (struct sockaddr *)&server_addr, sizeof(server_addr)))
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "connect error! errno : %d", errno);
        controller->SetFailed(errtxt);
        close(clientfd);
        return;
    }

    // 发送rpc请求
    if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "send error! errno : %d", errno);
        controller->SetFailed(errtxt);
        close(clientfd);
        return;
    }

    // 接收rpc请求的响应值
    char recv_buf[1024] = {0};
    int recv_size = 0;
    if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "recive error! errno : %d", errno);
        controller->SetFailed(errtxt);
        close(clientfd);
        return;
    }

获得响应

// 反序列化rpc调用的响应数据
    // std::string response_str(recv_buf, 0, recv_size); // bug出现问题,recv_buf中遇到\0后面的数据就存不下来了,当值反序列化失败
    // if (!response->ParseFromString(response_str))
    if (!response->ParseFromArray(recv_buf, recv_size)) // ParseFromString存在问题,直接使用ParseFromArray来解析参数
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "parse error! errno : %d", errno);
        controller->SetFailed(errtxt);
        close(clientfd);
        return;
    }

    close(clientfd);

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注