分布式网络通信框架-rpc (六)服务端初始化操作及RpcProvider编写
Contents
从需求出发考虑需要哪些功能
- 调用框架的初始化操作:包括rpc服务端所在的服务器地址的设置、zookeeper所在服务器地址的设置。将配置信息读进来;
- 服务的发布和管理;
- 服务的启动;
- 服务的调用:包括接收参数、服务端运行服务得到结果以及结果的返回。
调用框架的初始化操作(MprpcApplication类)
MprpcApplication是mprpc框架的基础类,负责框架的一些初始化操作,例如读入一些后续需要用到的参数。因此可以发现,该类只需要构建一次,我们将其设计为单例模式。
# mprpcapplication.h
#pragma once
#include "mprpcconfig.h"
#include "mprpcchannel.h"
#include "mprpccontroller.h"
// mprpc框架的基础类,负责框架的一些初始化操作 设计成单例模式
class MprpcApplication
{
public:
static void Init(int argc, char **argv);
static MprpcApplication &GetInstance();
static MprpcConfig &GetConfig();
private:
static MprpcConfig m_config;
MprpcApplication() {}
MprpcApplication(const MprpcApplication &) = delete; // 设计成单例模式,因此和拷贝相关的函数都删掉
MprpcApplication(MprpcApplication &&) = delete; // 设计成单例模式,因此和拷贝相关的函数都删掉
};
服务的发布和管理
- 服务
一个业务模块的集合。比如说撩条服务器中的好友模块,他可以包含很多的方法。 - 服务方法
一个业务模块中可以包含很多的服务方法。比如好友模块中,就可以包含:查找好友列表、加好友等方法。
客户端要调用的服务需要提前在rpc服务端进行注册,当客户端进行服务调用时我们能够通过方法名称在rpc服务端查询到相关的方法并进行调用,rpc服务端需要能够提供服务发布注册的功能。
服务发布的具体实现是:设计一个NotifyService方法,在该方法中我们将服务(Service)、方法对应的MethodDescriptor存下来,放到一个ServiceInfo中,之后在客户端调用时可以通过服务名称找到对应的服务信息,从服务信息中可以通过方法名称找到对应方法的MethodDescriptor。
// Service服务类型信息
struct ServiceInfo
{
google::protobuf::Service *m_service; // 保存服务对象
std::unordered_map<std::string, const google::protobuf::MethodDescriptor *> m_methodMap; // 保存服务方法
};
// 存储注册成功的服务对象和其服务方法的所有信息
std::unordered_map<std::string, ServiceInfo> m_serviceMap;
之后服务会分布在不同的服务器上,那么可以使用zookeeper来进行服务的注册和分发。通过zookeeper来找到服务在哪个服务器上。这个后面会专门进行总结。
服务的启动
这一步主要是启动rpc服务节点,开始提供rpc远程服务调用。主要是实现muduo网络库的OnConnection方法和OnMessage方法。
配置网络信息、启动网络服务
std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
muduo::net::InetAddress address(ip, port);
// 创建TcpServer对象
muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");
// 绑定连接回调和消息读写回调方法 分离了网络代码和业务代码
server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
// 设置muduo库的线程数量
server.setThreadNum(4);
// 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
// 目前省略。。。
// 启动网络服务
server.start();
m_eventLoop.loop();
OnConnection方法
如果连接不成功,我们就断开连接。
// 新的socket连接回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (!conn->connected())
{
// 和rpc client的连接断开了
conn->shutdown();
}
}
OnMessage方法
消息拆包
这部分是已建立连接用户的读写事件回调。如果远程有一个rpc服务的调用请求,那么OnMessage方法就会响应。
当rpcprovider收到rpc服务请求时,首先会对接收到的信息进行解包。服务调用端的信息格式和rpc服务提供端的信息格式应该是对应起来的。即在框架内部,RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型。如下图所示。
我们规定客户端在传输参数时按照上图的格式进行组织参数,rpc服务端在解包时也按照上面的格式进行参数的解包。
– tcp粘包问题:tcp返回回来的包可能时当前的数据和下一条数据混在一起的。为了解决tcp粘包的问题,我们需要记录参数的长度,即args_size字段。
– 我们如果直接把header_size存成字符串类型,那么我们读4个字节也有可能出现错误:比如“10000”读四个字节将会出错。因此我们直接存储整型,整型不会超过4个字节。
按照上述格式进行读取就能将消息字节流拆包成需要的参数。
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buffer,
muduo::Timestamp)
{
// 网络上接受的远程rpc调用请求的字符流 包含rpc调用方法的名字,参数args
std::string recv_buf = buffer->retrieveAllAsString();
// 从字符流中读取前4个字节的内容
uint32_t header_size = 0;
recv_buf.copy((char *)&header_size, 4, 0);
// 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
std::string rpc_header_str = recv_buf.substr(4, header_size);
mprpc::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size;
if (rpcHeader.ParseFromString(rpc_header_str))
{
// 数据头反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
}
else
{
// 数据头反序列化失败
std::cout << "rpc_header_str : " << rpc_header_str << " parse error!" << std::endl;
return;
}
// 获取rpc方法参数的字符流数据
std::string args_str = recv_buf.substr(4 + header_size, args_size);
// ...
}
服务调用
从上面的消息拆包后的结果,可以找到对应的服务名称和方法名称,之后我们就可以根据方法名称将参数传入,进行服务的调用了。
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buffer,
muduo::Timestamp)
{
// 拆包
// 。。。
// 获取service对象和method对象
auto it = m_serviceMap.find(service_name);
if (it == m_serviceMap.end())
{
std::cout << service_name << " is not exist!" << std::endl;
return;
}
auto method_it = it->second.m_methodMap.find(method_name);
if (method_it == it->second.m_methodMap.end())
{
std::cout << service_name << " : " << method_name << " is not exist!" << std::endl;
return;
}
google::protobuf::Service *service = it->second.m_service; // 获取service对象
const google::protobuf::MethodDescriptor *method = method_it->second; // 获取method对象
// 生成rpc方法调用的请求request和响应response参数
google::protobuf::Message *request = service->GetRequestPrototype(method).New();
if (!request->ParseFromString(args_str))
{
std::cout << "request parse error, content : " << args_str << std::endl;
return;
}
google::protobuf::Message *response = service->GetResponsePrototype(method).New();
// 给下面的method方法的调用,绑定一个Closure的回调函数
google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
const muduo::net::TcpConnectionPtr &,
google::protobuf::Message *>(this,
&RpcProvider::SendRpcResponse,
conn, response);
// 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
// 用户端 new UserService().Login(controller, request, response, done)
service->CallMethod(method, nullptr, request, response, done);
}
结果返回/回调函数的编写
// Call a method of the service specified by MethodDescriptor. This is
// normally implemented as a simple switch() that calls the standard
// definitions of the service's methods.
//
// Preconditions:
// * method->service() == GetDescriptor()
// * request and response are of the exact same classes as the objects
// returned by GetRequestPrototype(method) and
// GetResponsePrototype(method).
// * After the call has started, the request must not be modified and the
// response must not be accessed at all until "done" is called.
// * "controller" is of the correct type for the RPC implementation being
// used by this Service. For stubs, the "correct type" depends on the
// RpcChannel which the stub is using. Server-side Service
// implementations are expected to accept whatever type of RpcController
// the server-side RPC implementation uses.
//
// Postconditions:
// * "done" will be called when the method is complete. This may be
// before CallMethod() returns or it may be at some point in the future.
// * If the RPC succeeded, "response" contains the response returned by
// the server.
// * If the RPC failed, "response"'s contents are undefined. The
// RpcController can be queried to determine if an error occurred and
// possibly to get more information about the error.
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller, const Message* request,
Message* response, Closure* done) = 0;
对照上面代码注释可以看到,CallMethod方法支持在方法完成时定义一个回调函数done,该,我们可以在CallMethod方法调用结束(response已经拿到了服务调用结束后的返回值了)通过回调函数done将response中的结果发送给调用方。
我们可以查看一下done函数的类型Closure*需要如何编写
// Abstract interface for a callback. When calling an RPC, you must provide
// a Closure to call when the procedure completes. See the Service interface
// in service.h.
//
// To automatically construct a Closure which calls a particular function or
// method with a particular set of parameters, use the NewCallback() function.
// Example:
// void FooDone(const FooResponse* response) {
// ...
// }
//
// void CallFoo() {
// ...
// // When done, call FooDone() and pass it a pointer to the response.
// Closure* callback = NewCallback(&FooDone, response);
// // Make the call.
// service->Foo(controller, request, response, callback);
// }
//
// Example that calls a method:
// class Handler {
// public:
// ...
//
// void FooDone(const FooResponse* response) {
// ...
// }
//
// void CallFoo() {
// ...
// // When done, call FooDone() and pass it a pointer to the response.
// Closure* callback = NewCallback(this, &Handler::FooDone, response);
// // Make the call.
// service->Foo(controller, request, response, callback);
// }
// };
//
// Currently NewCallback() supports binding zero, one, or two arguments.
//
// Callbacks created with NewCallback() automatically delete themselves when
// executed. They should be used when a callback is to be called exactly
// once (usually the case with RPC callbacks). If a callback may be called
// a different number of times (including zero), create it with
// NewPermanentCallback() instead. You are then responsible for deleting the
// callback (using the "delete" keyword as normal).
//
// Note that NewCallback() is a bit touchy regarding argument types. Generally,
// the values you provide for the parameter bindings must exactly match the
// types accepted by the callback function. For example:
// void Foo(string s);
// NewCallback(&Foo, "foo"); // WON'T WORK: const char* != string
// NewCallback(&Foo, string("foo")); // WORKS
// Also note that the arguments cannot be references:
// void Foo(const string& s);
// string my_str;
// NewCallback(&Foo, my_str); // WON'T WORK: Can't use referecnes.
// However, correctly-typed pointers will work just fine.
class PROTOBUF_EXPORT Closure {
public:
Closure() {}
virtual ~Closure();
virtual void Run() = 0;
private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(Closure);
};
根据上面注释的提示,To automatically construct a Closure which calls a particular function or method with a particular set of parameters, use the NewCallback() function. 我们需要定义一个NewCallback()。
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buffer,
muduo::Timestamp)
{
// 省略了……
// 给下面的method方法的调用,绑定一个Closure的回调函数
google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
const muduo::net::TcpConnectionPtr &,
google::protobuf::Message *>(this,
&RpcProvider::SendRpcResponse,
conn, response);
// 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
// 用户端 new UserService().Login(controller, request, response, done)
service->CallMethod(method, nullptr, request, response, done);
}
// Closure的回调操作,用于序列化rpc的响应和网络发送,需要一个TcpConnectionPtr和用户传回来的response(Message类型)
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response)
{
std::string response_str;
if (response->SerializeToString(&response_str)) // response进行序列化
{
// 序列化成功后,通过网络把rpc方法执行的结果发送给rpc调用方
conn->send(response_str);
}
else
{
std::cout << "serialize response_str error!" << std::endl;
}
conn->shutdown(); // 调用结束后,rpcprovider主动断开连接
}
回调函数将结果返回后,自动断开连接。
总结
本文介绍了服务提供方(RpcProvider)的编写方法。主要有以下几个方面:
– 服务、方法的注册发布。将服务、方法名称统一存在一个数据结构中,当有服务调用时进行查找,找到其对应的MethodDescriptor。之后会使用zookeeper来进行跨服务器的服务注册和服务分发。
– 服务的调用。在框架设计上,服务提供方和服务调用方要统一消息的结构;消息拆包要注意tcp粘包问题;调用方法结束后,编写回调函数将结果返回服务调用方。