分布式网络通信框架-rpc (九)zookeeper服务注册与发现

为什么需要zookeeper?

由于rpc调用的服务可能分布在不同的服务器上,我们需要知道所要调用的服务在哪个服务器上,因此我们需要一个类似路由功能的东西能够进行统一的服务的注册,当我需要调用某个服务方法时,这个东西能够告诉我应该去哪台服务器上调用。能够做到这个功能的其中一项技术是zookeeper。

zookeeper介绍

Zookeeper在分布式环境中应用非常广泛,它的优秀功能很多,比如分布式环境中全局命名服务,服务注册中心,全局分布式锁等等。
zookeeper介绍
在这个项目中,zookeeper主要用来做服务的注册和发现。

编写zookeeperutil类

实现功能:
– zookeeper客户端的启动、连接zookeeper server
– 在zookeeper server上按照指定的路径创建znode
– 根据指定的znode的路径获取znode的节点值

// zookeeperutil.h
#pragma once

#include <semaphore.h>
#include <zookeeper/zookeeper.h>
#include <string>

// 封装的zk客户端类
class ZkClient
{
public:
    ZkClient();
    ~ZkClient();
    // zkclient启动连接zkserver
    void Start();
    // 在zkserver上根据指定的path创建znode节点
    void Create(const char *path, const char *data, int datalen, int state = 0); // state表明创建的是临时性节点还是永久性节点
    // 根据参数指定的znode节点路径,或者znode节点的值
    std::string GetData(const char *path);

private:
    // zk的客户端句柄
    zhandle_t *m_zhandle;
};

zookeeper的watcher机制

ZooKeeper 的每个节点上都有一个 Watcher 用于监控节点数据的变化,当节点状态发生改变时(Znode 新增、删除、修改)将会触发 Wahcher 所对应的操作。在 Watcher 被触发时,ZooKeeper 会向监控该节点的客户端发送一条通知说明节点的变化情况。

具体实现流程就是,客户端向 ZooKeeper 服务器注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManager 中。当 ZooKeeper 服务器端触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。

zookeeper启动、创建节点、查找数据

需要注意的是,zookeeper官方提供的开源实现中我们使用的是多线程版本(zookeeper_mt),zookeeper_init之后就会产生三个线程:
– API调用线程
– 网络I/O线程 pthread_create poll
– watcher回调线程

#include "zookeeperutil.h"
#include "mprpcapplication.h"
#include <semaphore.h>
#include <iostream>

// 全局的watcher观察器   zkserver给zkclient的通知
void global_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
    if (type == ZOO_SESSION_EVENT) // 回调的消息类型是和会话相关的消息类型
    {
        if (state == ZOO_CONNECTED_STATE) // zkclient和zkserver连接成功
        {
            sem_t *sem = (sem_t *)zoo_get_context(zh);
            sem_post(sem);
        }
    }
}

ZkClient::ZkClient() : m_zhandle(nullptr)
{
}

ZkClient::~ZkClient()
{
    if (m_zhandle != nullptr)
    {
        zookeeper_close(m_zhandle); // 关闭句柄,释放资源
    }
}

// zkclient启动连接zkserver
void ZkClient::Start()
{
    std::string host = MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
    std::string port = MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
    std::string connstr = host + ":" + port;

    /*
    zookeeper_mt:多线程版本
    zookeeper的API客户端程序提供了三个线程
    - API调用线程
    - 网络I/O线程 pthread_create  poll
    - watcher回调线程
    */
    m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
    if (nullptr == m_zhandle)
    {
        std::cout << "zookeeper_init error!" << std::endl;
        exit(EXIT_FAILURE);
    }

    sem_t sem;
    sem_init(&sem, 0, 0);
    zoo_set_context(m_zhandle, &sem);

    sem_wait(&sem);
    std::cout << "zookeeper_init success!" << std::endl;
}

// 在zkserver上根据指定的path创建znode节点
// state表明创建的是临时性节点还是永久性节点
void ZkClient::Create(const char *path, const char *data, int datalen, int state)
{
    char path_buffer[128];
    int bufferlen = sizeof(path_buffer);
    int flag;
    // 先判断path表示的znode节点是否存在,如果存在,就不再重复创建了
    flag = zoo_exists(m_zhandle, path, 0, nullptr);
    if (ZNONODE == flag) // 表示path的znode节点不存在
    {
        // 创建指定path的znode节点
        // 默认创建的是永久性节点,如果设置了 ZOO_EPHEMERAL flags,那么就是创建临时性节点
        flag = zoo_create(m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
        if (flag == ZOK)
        {
            std::cout << "znode create success... path: " << path << std::endl;
        }
        else
        {
            std::cout << "flag: " << flag << std::endl;
            std::cout << "znode create error... path: " << path << std::endl;
            exit(EXIT_FAILURE);
        }
    }
}

// 根据参数指定的znode节点路径,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{
    char buffer[64];
    int bufferlen = sizeof(buffer);
    int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
    if (flag != ZOK)
    {
        std::cout << "get znode error... path: " << path << std::endl;
        return "";
    }
    else
    {
        return buffer;
    }
}

在RpcProvider中将注册的服务和方法添加到zookeeper中

把服务器上的可供调用的rpc方法通过zookeeper client注册到zookeeper server中。

// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run()
{
    // ……

    // 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
    // session timeout   30s   zkclient 网络I/O线程  1/3 * timeout 时间发送ping消息
    ZkClient zkCli;
    zkCli.Start();
    // service_name为永久性节点   method_name为临时性节点
    for (auto &sp : m_serviceMap)
    {
        //  /service_name
        std::string service_path = "/" + sp.first;
        zkCli.Create(service_path.c_str(), nullptr, 0);
        for (auto &mp : sp.second.m_methodMap)
        {
            //  /service_name/method_name
            std::string method_path = service_path + "/" + mp.first;
            char method_path_data[128] = {0};
            sprintf(method_path_data, "%s:%d", ip.c_str(), port);
            // ZOO_EPHEMERAL表示znode是一个临时性节点
            zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
        }
    }

    // ……

}

在服务调用方RpcChannel中查找服务和方法

// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据序列化和网络发送
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
                              google::protobuf::RpcController *controller,
                              const google::protobuf::Message *request,
                              google::protobuf::Message *response,
                              google::protobuf::Closure *done)
{
    // ......
    // rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
    ZkClient zkCli;
    zkCli.Start();
    std::string method_path = "/" + service_name + "/" + method_name;
    std::string host_data = zkCli.GetData(method_path.c_str());  // 在zookeeper中查找调用的方法的相关信息
    if (host_data == "")
    {
        controller->SetFailed(method_path + " is not exit!");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1)
    {
        controller->SetFailed(method_path + " address is invalid!");
        return;
    }
    // ......

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注