分布式网络通信框架-rpc (九)zookeeper服务注册与发现
Contents
为什么需要zookeeper?
由于rpc调用的服务可能分布在不同的服务器上,我们需要知道所要调用的服务在哪个服务器上,因此我们需要一个类似路由功能的东西能够进行统一的服务的注册,当我需要调用某个服务方法时,这个东西能够告诉我应该去哪台服务器上调用。能够做到这个功能的其中一项技术是zookeeper。
zookeeper介绍
Zookeeper在分布式环境中应用非常广泛,它的优秀功能很多,比如分布式环境中全局命名服务,服务注册中心,全局分布式锁等等。
zookeeper介绍
在这个项目中,zookeeper主要用来做服务的注册和发现。
编写zookeeperutil类
实现功能:
– zookeeper客户端的启动、连接zookeeper server
– 在zookeeper server上按照指定的路径创建znode
– 根据指定的znode的路径获取znode的节点值
// zookeeperutil.h
#pragma once
#include <semaphore.h>
#include <zookeeper/zookeeper.h>
#include <string>
// 封装的zk客户端类
class ZkClient
{
public:
ZkClient();
~ZkClient();
// zkclient启动连接zkserver
void Start();
// 在zkserver上根据指定的path创建znode节点
void Create(const char *path, const char *data, int datalen, int state = 0); // state表明创建的是临时性节点还是永久性节点
// 根据参数指定的znode节点路径,或者znode节点的值
std::string GetData(const char *path);
private:
// zk的客户端句柄
zhandle_t *m_zhandle;
};
zookeeper的watcher机制
ZooKeeper 的每个节点上都有一个 Watcher 用于监控节点数据的变化,当节点状态发生改变时(Znode 新增、删除、修改)将会触发 Wahcher 所对应的操作。在 Watcher 被触发时,ZooKeeper 会向监控该节点的客户端发送一条通知说明节点的变化情况。
具体实现流程就是,客户端向 ZooKeeper 服务器注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManager 中。当 ZooKeeper 服务器端触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。
zookeeper启动、创建节点、查找数据
需要注意的是,zookeeper官方提供的开源实现中我们使用的是多线程版本(zookeeper_mt),zookeeper_init之后就会产生三个线程:
– API调用线程
– 网络I/O线程 pthread_create poll
– watcher回调线程
#include "zookeeperutil.h"
#include "mprpcapplication.h"
#include <semaphore.h>
#include <iostream>
// 全局的watcher观察器 zkserver给zkclient的通知
void global_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
if (type == ZOO_SESSION_EVENT) // 回调的消息类型是和会话相关的消息类型
{
if (state == ZOO_CONNECTED_STATE) // zkclient和zkserver连接成功
{
sem_t *sem = (sem_t *)zoo_get_context(zh);
sem_post(sem);
}
}
}
ZkClient::ZkClient() : m_zhandle(nullptr)
{
}
ZkClient::~ZkClient()
{
if (m_zhandle != nullptr)
{
zookeeper_close(m_zhandle); // 关闭句柄,释放资源
}
}
// zkclient启动连接zkserver
void ZkClient::Start()
{
std::string host = MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
std::string port = MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
std::string connstr = host + ":" + port;
/*
zookeeper_mt:多线程版本
zookeeper的API客户端程序提供了三个线程
- API调用线程
- 网络I/O线程 pthread_create poll
- watcher回调线程
*/
m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
if (nullptr == m_zhandle)
{
std::cout << "zookeeper_init error!" << std::endl;
exit(EXIT_FAILURE);
}
sem_t sem;
sem_init(&sem, 0, 0);
zoo_set_context(m_zhandle, &sem);
sem_wait(&sem);
std::cout << "zookeeper_init success!" << std::endl;
}
// 在zkserver上根据指定的path创建znode节点
// state表明创建的是临时性节点还是永久性节点
void ZkClient::Create(const char *path, const char *data, int datalen, int state)
{
char path_buffer[128];
int bufferlen = sizeof(path_buffer);
int flag;
// 先判断path表示的znode节点是否存在,如果存在,就不再重复创建了
flag = zoo_exists(m_zhandle, path, 0, nullptr);
if (ZNONODE == flag) // 表示path的znode节点不存在
{
// 创建指定path的znode节点
// 默认创建的是永久性节点,如果设置了 ZOO_EPHEMERAL flags,那么就是创建临时性节点
flag = zoo_create(m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
if (flag == ZOK)
{
std::cout << "znode create success... path: " << path << std::endl;
}
else
{
std::cout << "flag: " << flag << std::endl;
std::cout << "znode create error... path: " << path << std::endl;
exit(EXIT_FAILURE);
}
}
}
// 根据参数指定的znode节点路径,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{
char buffer[64];
int bufferlen = sizeof(buffer);
int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
if (flag != ZOK)
{
std::cout << "get znode error... path: " << path << std::endl;
return "";
}
else
{
return buffer;
}
}
在RpcProvider中将注册的服务和方法添加到zookeeper中
把服务器上的可供调用的rpc方法通过zookeeper client注册到zookeeper server中。
// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run()
{
// ……
// 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
// session timeout 30s zkclient 网络I/O线程 1/3 * timeout 时间发送ping消息
ZkClient zkCli;
zkCli.Start();
// service_name为永久性节点 method_name为临时性节点
for (auto &sp : m_serviceMap)
{
// /service_name
std::string service_path = "/" + sp.first;
zkCli.Create(service_path.c_str(), nullptr, 0);
for (auto &mp : sp.second.m_methodMap)
{
// /service_name/method_name
std::string method_path = service_path + "/" + mp.first;
char method_path_data[128] = {0};
sprintf(method_path_data, "%s:%d", ip.c_str(), port);
// ZOO_EPHEMERAL表示znode是一个临时性节点
zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}
// ……
}
在服务调用方RpcChannel中查找服务和方法
// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据序列化和网络发送
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
google::protobuf::RpcController *controller,
const google::protobuf::Message *request,
google::protobuf::Message *response,
google::protobuf::Closure *done)
{
// ......
// rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
ZkClient zkCli;
zkCli.Start();
std::string method_path = "/" + service_name + "/" + method_name;
std::string host_data = zkCli.GetData(method_path.c_str()); // 在zookeeper中查找调用的方法的相关信息
if (host_data == "")
{
controller->SetFailed(method_path + " is not exit!");
return;
}
int idx = host_data.find(":");
if (idx == -1)
{
controller->SetFailed(method_path + " address is invalid!");
return;
}
// ......