分布式网络通信框架-rpc (六)服务端初始化操作及RpcProvider编写

从需求出发考虑需要哪些功能

  • 调用框架的初始化操作:包括rpc服务端所在的服务器地址的设置、zookeeper所在服务器地址的设置。将配置信息读进来;
  • 服务的发布和管理;
  • 服务的启动;
  • 服务的调用:包括接收参数、服务端运行服务得到结果以及结果的返回。

调用框架的初始化操作(MprpcApplication类)

MprpcApplication是mprpc框架的基础类,负责框架的一些初始化操作,例如读入一些后续需要用到的参数。因此可以发现,该类只需要构建一次,我们将其设计为单例模式。

# mprpcapplication.h
#pragma once

#include "mprpcconfig.h"
#include "mprpcchannel.h"
#include "mprpccontroller.h"

// mprpc框架的基础类,负责框架的一些初始化操作  设计成单例模式
class MprpcApplication
{
public:
    static void Init(int argc, char **argv);
    static MprpcApplication &GetInstance();
    static MprpcConfig &GetConfig();

private:
    static MprpcConfig m_config;

    MprpcApplication() {}
    MprpcApplication(const MprpcApplication &) = delete; // 设计成单例模式,因此和拷贝相关的函数都删掉
    MprpcApplication(MprpcApplication &&) = delete;      // 设计成单例模式,因此和拷贝相关的函数都删掉
};

服务的发布和管理

  • 服务
    一个业务模块的集合。比如说撩条服务器中的好友模块,他可以包含很多的方法。
  • 服务方法
    一个业务模块中可以包含很多的服务方法。比如好友模块中,就可以包含:查找好友列表、加好友等方法。
    客户端要调用的服务需要提前在rpc服务端进行注册,当客户端进行服务调用时我们能够通过方法名称在rpc服务端查询到相关的方法并进行调用,rpc服务端需要能够提供服务发布注册的功能。
    服务发布的具体实现是:设计一个NotifyService方法,在该方法中我们将服务(Service)、方法对应的MethodDescriptor存下来,放到一个ServiceInfo中,之后在客户端调用时可以通过服务名称找到对应的服务信息,从服务信息中可以通过方法名称找到对应方法的MethodDescriptor。
// Service服务类型信息
struct ServiceInfo
{
    google::protobuf::Service *m_service;                                                    // 保存服务对象
    std::unordered_map<std::string, const google::protobuf::MethodDescriptor *> m_methodMap; // 保存服务方法
};
// 存储注册成功的服务对象和其服务方法的所有信息
std::unordered_map<std::string, ServiceInfo> m_serviceMap;

之后服务会分布在不同的服务器上,那么可以使用zookeeper来进行服务的注册和分发。通过zookeeper来找到服务在哪个服务器上。这个后面会专门进行总结。

服务的启动

这一步主要是启动rpc服务节点,开始提供rpc远程服务调用。主要是实现muduo网络库的OnConnection方法和OnMessage方法。

配置网络信息、启动网络服务

std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
muduo::net::InetAddress address(ip, port);

// 创建TcpServer对象
muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");
// 绑定连接回调和消息读写回调方法   分离了网络代码和业务代码
server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1,
                                    std::placeholders::_2, std::placeholders::_3));

// 设置muduo库的线程数量
server.setThreadNum(4);

// 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
// 目前省略。。。

// 启动网络服务
server.start();
m_eventLoop.loop();

OnConnection方法

如果连接不成功,我们就断开连接。

// 新的socket连接回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn)
{
    if (!conn->connected())
    {
        // 和rpc client的连接断开了
        conn->shutdown();
    }
}

OnMessage方法

消息拆包

这部分是已建立连接用户的读写事件回调。如果远程有一个rpc服务的调用请求,那么OnMessage方法就会响应。
当rpcprovider收到rpc服务请求时,首先会对接收到的信息进行解包。服务调用端的信息格式和rpc服务提供端的信息格式应该是对应起来的。即在框架内部,RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型。如下图所示。

我们规定客户端在传输参数时按照上图的格式进行组织参数,rpc服务端在解包时也按照上面的格式进行参数的解包。
tcp粘包问题:tcp返回回来的包可能时当前的数据和下一条数据混在一起的。为了解决tcp粘包的问题,我们需要记录参数的长度,即args_size字段。
– 我们如果直接把header_size存成字符串类型,那么我们读4个字节也有可能出现错误:比如“10000”读四个字节将会出错。因此我们直接存储整型,整型不会超过4个字节。
按照上述格式进行读取就能将消息字节流拆包成需要的参数。

void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
                            muduo::net::Buffer *buffer,
                            muduo::Timestamp)
{
    // 网络上接受的远程rpc调用请求的字符流   包含rpc调用方法的名字,参数args
    std::string recv_buf = buffer->retrieveAllAsString();

    // 从字符流中读取前4个字节的内容
    uint32_t header_size = 0;
    recv_buf.copy((char *)&header_size, 4, 0);

    // 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
    std::string rpc_header_str = recv_buf.substr(4, header_size);
    mprpc::RpcHeader rpcHeader;
    std::string service_name;
    std::string method_name;
    uint32_t args_size;
    if (rpcHeader.ParseFromString(rpc_header_str))
    {
        // 数据头反序列化成功
        service_name = rpcHeader.service_name();
        method_name = rpcHeader.method_name();
        args_size = rpcHeader.args_size();
    }
    else
    {
        // 数据头反序列化失败
        std::cout << "rpc_header_str : " << rpc_header_str << " parse error!" << std::endl;
        return;
    }

    // 获取rpc方法参数的字符流数据
    std::string args_str = recv_buf.substr(4 + header_size, args_size);

    // ...
}

服务调用

从上面的消息拆包后的结果,可以找到对应的服务名称和方法名称,之后我们就可以根据方法名称将参数传入,进行服务的调用了。

void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
                            muduo::net::Buffer *buffer,
                            muduo::Timestamp)
{
    // 拆包
    // 。。。

    // 获取service对象和method对象
    auto it = m_serviceMap.find(service_name);
    if (it == m_serviceMap.end())
    {
        std::cout << service_name << " is not exist!" << std::endl;
        return;
    }

    auto method_it = it->second.m_methodMap.find(method_name);
    if (method_it == it->second.m_methodMap.end())
    {
        std::cout << service_name << " : " << method_name << " is not exist!" << std::endl;
        return;
    }

    google::protobuf::Service *service = it->second.m_service;            // 获取service对象
    const google::protobuf::MethodDescriptor *method = method_it->second; // 获取method对象

    // 生成rpc方法调用的请求request和响应response参数
    google::protobuf::Message *request = service->GetRequestPrototype(method).New();
    if (!request->ParseFromString(args_str))
    {
        std::cout << "request parse error, content : " << args_str << std::endl;
        return;
    }
    google::protobuf::Message *response = service->GetResponsePrototype(method).New();

    // 给下面的method方法的调用,绑定一个Closure的回调函数
    google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
                                                                    const muduo::net::TcpConnectionPtr &,
                                                                    google::protobuf::Message *>(this,
                                                                                                 &RpcProvider::SendRpcResponse,
                                                                                                 conn, response);

    // 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
    // 用户端 new UserService().Login(controller, request, response, done)
    service->CallMethod(method, nullptr, request, response, done);
}

结果返回/回调函数的编写

// Call a method of the service specified by MethodDescriptor.  This is
  // normally implemented as a simple switch() that calls the standard
  // definitions of the service's methods.
  //
  // Preconditions:
  // * method->service() == GetDescriptor()
  // * request and response are of the exact same classes as the objects
  //   returned by GetRequestPrototype(method) and
  //   GetResponsePrototype(method).
  // * After the call has started, the request must not be modified and the
  //   response must not be accessed at all until "done" is called.
  // * "controller" is of the correct type for the RPC implementation being
  //   used by this Service.  For stubs, the "correct type" depends on the
  //   RpcChannel which the stub is using.  Server-side Service
  //   implementations are expected to accept whatever type of RpcController
  //   the server-side RPC implementation uses.
  //
  // Postconditions:
  // * "done" will be called when the method is complete.  This may be
  //   before CallMethod() returns or it may be at some point in the future.
  // * If the RPC succeeded, "response" contains the response returned by
  //   the server.
  // * If the RPC failed, "response"'s contents are undefined.  The
  //   RpcController can be queried to determine if an error occurred and
  //   possibly to get more information about the error.
  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

对照上面代码注释可以看到,CallMethod方法支持在方法完成时定义一个回调函数done,该,我们可以在CallMethod方法调用结束(response已经拿到了服务调用结束后的返回值了)通过回调函数done将response中的结果发送给调用方。
我们可以查看一下done函数的类型Closure*需要如何编写

// Abstract interface for a callback.  When calling an RPC, you must provide
// a Closure to call when the procedure completes.  See the Service interface
// in service.h.
//
// To automatically construct a Closure which calls a particular function or
// method with a particular set of parameters, use the NewCallback() function.
// Example:
//   void FooDone(const FooResponse* response) {
//     ...
//   }
//
//   void CallFoo() {
//     ...
//     // When done, call FooDone() and pass it a pointer to the response.
//     Closure* callback = NewCallback(&FooDone, response);
//     // Make the call.
//     service->Foo(controller, request, response, callback);
//   }
//
// Example that calls a method:
//   class Handler {
//    public:
//     ...
//
//     void FooDone(const FooResponse* response) {
//       ...
//     }
//
//     void CallFoo() {
//       ...
//       // When done, call FooDone() and pass it a pointer to the response.
//       Closure* callback = NewCallback(this, &Handler::FooDone, response);
//       // Make the call.
//       service->Foo(controller, request, response, callback);
//     }
//   };
//
// Currently NewCallback() supports binding zero, one, or two arguments.
//
// Callbacks created with NewCallback() automatically delete themselves when
// executed.  They should be used when a callback is to be called exactly
// once (usually the case with RPC callbacks).  If a callback may be called
// a different number of times (including zero), create it with
// NewPermanentCallback() instead.  You are then responsible for deleting the
// callback (using the "delete" keyword as normal).
//
// Note that NewCallback() is a bit touchy regarding argument types.  Generally,
// the values you provide for the parameter bindings must exactly match the
// types accepted by the callback function.  For example:
//   void Foo(string s);
//   NewCallback(&Foo, "foo");          // WON'T WORK:  const char* != string
//   NewCallback(&Foo, string("foo"));  // WORKS
// Also note that the arguments cannot be references:
//   void Foo(const string& s);
//   string my_str;
//   NewCallback(&Foo, my_str);  // WON'T WORK:  Can't use referecnes.
// However, correctly-typed pointers will work just fine.
class PROTOBUF_EXPORT Closure {
 public:
  Closure() {}
  virtual ~Closure();

  virtual void Run() = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(Closure);
};

根据上面注释的提示,To automatically construct a Closure which calls a particular function or method with a particular set of parameters, use the NewCallback() function. 我们需要定义一个NewCallback()。

void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
                            muduo::net::Buffer *buffer,
                            muduo::Timestamp)
{
    // 省略了……
    // 给下面的method方法的调用,绑定一个Closure的回调函数
    google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
                                                                    const muduo::net::TcpConnectionPtr &,
                                                                    google::protobuf::Message *>(this,
                                                                                                 &RpcProvider::SendRpcResponse,
                                                                                                 conn, response);

    // 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
    // 用户端 new UserService().Login(controller, request, response, done)
    service->CallMethod(method, nullptr, request, response, done);
}

// Closure的回调操作,用于序列化rpc的响应和网络发送,需要一个TcpConnectionPtr和用户传回来的response(Message类型)
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response)
{
    std::string response_str;
    if (response->SerializeToString(&response_str)) // response进行序列化
    {
        // 序列化成功后,通过网络把rpc方法执行的结果发送给rpc调用方
        conn->send(response_str);
    }
    else
    {
        std::cout << "serialize response_str error!" << std::endl;
    }
    conn->shutdown(); // 调用结束后,rpcprovider主动断开连接
}

回调函数将结果返回后,自动断开连接。

总结

本文介绍了服务提供方(RpcProvider)的编写方法。主要有以下几个方面:
– 服务、方法的注册发布。将服务、方法名称统一存在一个数据结构中,当有服务调用时进行查找,找到其对应的MethodDescriptor。之后会使用zookeeper来进行跨服务器的服务注册和服务分发。
– 服务的调用。在框架设计上,服务提供方和服务调用方要统一消息的结构;消息拆包要注意tcp粘包问题;调用方法结束后,编写回调函数将结果返回服务调用方。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注