RPC框架引入zookeeper服务注册与服务发现

Zookeeper概念及其作用

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是大数据生态中的重要组件。它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

它是一个为分布式应用提供一致性协调服务的中间件
zookeeper入门参考链接:https://www.cnblogs.com/xinyonghu/p/11031729.html

在分布式系统中,zookeeper提供了非常丰富的应用,本文只是剖析其中一小部分,但也是非常重要的一个部分,即服务注册和服务发现。
在RPC框架中,如果没有服务注册和服务发现,那么这个RPC框架几乎变得不实用,浅显的思路是在RPCConsumer(服务调用端)维护一个服务的列表,这个列表包含了所有分布式节点服务的ip和端口,但考虑这么一种情况,如果其中某个节点由于某种原因down掉了或者将这个节点的服务删除了,但是RPCConsumer本地还维护的列表中还存在这个服务结点,并且还尝试请求这个服务,那么显然会调用出错。
在这里插入图片描述

类似这种肯定需要动态的维护每个分布式服务节点的状态,在该节点down掉或者被撤销时应及时删除这个服务,避免RPC调用端继续请求不存在的服务。这就是zookeeper服务注册和服务发现所做的事。

在这里插入图片描述
Zookeeper组织数据的格式类似于一个文件系统,每个znode结点都可以是一个分布式服务结点,一般组织的结构是XXXXService/login、 XXXXService/registe,即service_name/method_name,znode结点的数据就是该服务所在节点的ip和port
在这里插入图片描述

Zookeeper服务注册和发现的流程:

step1:Rpc服务端先通过zkClient向zkServer端注册服务,也即创建XXXXService/login、 XXXXService/registe节点,并填充相应的数据。
step2:Rpc调用端再调用某个服务之前,通过zkClient向ZkServer查询这个服务节点是否存在,如果存在则返回这个服务节点的ip和port。然后进行远程rpc调用,否则返回错误终止调用过程。
step3:这一步其实zookeeper已经帮我们做了,step1中注册服务的过程中,zkServer会与这个节点建立一个session,并且zkServer以1/3 * timeout 的时间定期为每个与之简历的节点发送心跳包,如果得不到回应那么zkServer会认为这个节点已经不存在了,会动态的把这个节点上的所有服务都进行删除。

RPC框架引入zookeeper

1、封装zkclient(用于与zkServer通信的句柄、例如创建结点和删除结点、以及一些心跳回调操作)

#pragma once


#include <semaphore.h>
#include <zookeeper/zookeeper.h>
#include <string>


class ZkClient
{
public: 
    ZkClient();
    ~ZkClient();
    // zkClient启动连接zkserver
    void Start();
    // 在zkserver上根据指定的path创建Znode节点
    void Create(const char *path, const char* data, int datalen, int state=0);
    // 根据参数指定的znode节点路径,获取znode节点的值
    std::string GetData(const char* path);

private:
    // zk客户端句柄
    zhandle_t *m_zhandle;
};

// .cc
#include "zookeeperutil.h"
#include "rpcapplication.h"
#include  <iostream>

//全局的watcher观察器     zkserver给zkclient的通知回调
void global_watcher(zhandle_t *zh, int type, int state, const char* path, void *watcherCtx)
{
    if(type == ZOO_SESSION_EVENT)    //回调的消息类型是和会话相关的消息类型
    {
        if(state == ZOO_CONNECTED_STATE)  //zkserver和zkclient连接成功
        {
            sem_t *sem = (sem_t*) zoo_get_context(zh);
            sem_post(sem);
        }
    }
}

ZkClient::ZkClient():m_zhandle(nullptr)
{
    
}
ZkClient::~ZkClient()
{
    if(m_zhandle != nullptr)
    {
        zookeeper_close(m_zhandle);   //关闭句柄, 释放资源
    }
}
// zkClient启动连接zkserver
void ZkClient::Start()
{
    std::string host = RpcApplication::GetInstance().GetConfig().Load("zookeeperip");
    std::string port = RpcApplication::GetInstance().GetConfig().Load("zookeeperport");
    std::string connstr = host + ":" + port;

    /*
        zookeeper_mt:多线程版本
        zookeeper的API客户端程序提供了三个线程
        APT调用线程
        网络I/O线程  pthread_create (使用的poll-IO多路复用)
        watcher回调线程 pthread_create
    */
   m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
   if(nullptr == m_zhandle)
   {
        std::cout << "zookeeper_init error !" << std::endl;
        exit(EXIT_FAILURE);
   }

   sem_t sem;
   sem_init(&sem, 0, 0);
   zoo_set_context(m_zhandle, &sem);

   sem_wait(&sem);
   std::cout << "zookeeper_init success !" << std::endl;
}

// 在zkserver上根据指定的path创建Znode节点
void ZkClient::Create(const char *path, const char* data, int datalen, int state)
{
    char path_buffer[128];
    int bufferlen = sizeof(path_buffer);
    int flag;
    //先判断path表示的znode节点是否存在, 如果存在, 就不能重复创建了
    flag = zoo_exists(m_zhandle, path, 0, nullptr);
    if(ZNONODE == flag)   //表示path的znode节点不存在
    {
        // 创建指定path的znode节点
        flag = zoo_create(m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
        if(flag == ZOK)
        {
            std::cout << "znode create success .... path:" << path << std::endl;
        }
        else
        {
            std::cout << "flag : " << flag <<std::endl;
            std::cout << "znode create error...path: " << path << std::endl;
            exit(EXIT_FAILURE);
        }
    }
}


// 根据参数指定的znode节点路径,获取znode节点的值
std::string ZkClient::GetData(const char* path)
{
    char buffer[64];
    int bufferlen = sizeof(buffer);
    int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
    if(flag != ZOK)
    {
        std::cout << "get znode error ...... path" << path << std::endl;
        return "";
    }
    else
    {
        return buffer;
    }
}

2、在RPCProvider端进行服务注册

//把当前rpc节点上要发布的服务全部注册到zk上面, 让rpc client可以从zk上发现服务
        // session timeout 30s          zkclient 的网络I/O线程 会定时以1/3 * timeout 时间去给zkserver发送ping心跳包
        ZkClient zkCli;
        zkCli.Start();
        //service name为永久性节点      method name 为临时性节点
        for(auto& sp : m_serviceMap)
        {
                // /service_name   ---> /UserServiceRPc
                std::string service_path = "/" + sp.first;
                zkCli.Create(service_path.c_str(), nullptr, 0);
                for(auto &mp : sp.second.m_methodMap)
                {
                        // /service_name/method_name /UserServiceRPc/Login 存储当前这个rpc服务节点主机的ip和port
                        std::string method_path = service_path + "/" + mp.first;
                        char method_path_data[128] = {0};
                        sprintf(method_path_data, "%s:%d", ip.c_str(), port);
                        //ZOO_EPHEMERAL 表示znode是一个临时性节点
                        zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
                }

        }

3、RPCConsumer端进行服务发现

//rpc调用方想调用service_name的method_name的服务, 需要查询zk上该服务所在的host信息
     ZkClient zkCli;
     zkCli.Start();
     // /UserServiceRpc/Login
     std::string method_path = "/" + service_name + "/" + method_name;
     // 127.0.0.1:8000
     std::string host_data = zkCli.GetData(method_path.c_str());
     if(host_data == "")
     {
        controller->SetFailed(method_path + "is not exist!");
        return;
     }
     int idx = host_data.find(":");
     if(idx == -1)
     {
        controller->SetFailed(method_path + "address is invalid!");
        return;
     }
     std::string ip = host_data.substr(0, idx);
     uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());

至此基本上完整RPC应该具备的核心东西都有了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/58424.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(自控原理)线性系统的时域分析法

目录 一、系统时间响应的性能指标 1、典型输入信号 2、动态性能与稳态性能 二、一阶系统的时域分析 1、一阶系统的数学模型 2、一阶系统的单位阶跃响应 三、二阶系统的时域分析 1、二阶系统的数学模型 2、二阶系统的单位阶跃响应 3、欠阻尼二阶系统的动态过程分析 4…

ORB-SLAM2学习笔记6之D435i双目IR相机运行ROS版ORB-SLAM2并发布位姿pose的rostopic

文章目录 0 引言1 D435i相机配置2 新增发布双目位姿功能2.1 新增d435i_stereo.cc代码2.2 修改CMakeLists.txt2.3 新增配置文件D435i.yaml 3 编译运行和结果3.1 编译运行3.2 结果3.3 可能出现的问题 0 引言 ORB-SLAM2学习笔记1已成功编译安装ROS版本ORB-SLAM2到本地&#xff0c…

Redis面试题2

Redis面试题-2 10、统计高并发网站每个网页每天的 UV 数据&#xff0c;结合Redis你会如何实现&#xff1f; 选用方案&#xff1a;HyperLogLog 如果统计 PV 那非常好办&#xff0c;给每个网页一个独立的 Redis 计数器就可以了&#xff0c;这个计数器的 key 后缀加上当天的日期…

【eNSP】静态路由

【eNSP】静态路由 原理网关路由表 实验根据图片连接模块配置路由器设备R1R2R3R4 配置PC的IP地址、掩码、网关PC1PC2PC3 配置静态路由查看路由表R1R2R3R4测试能否通信 原理 网关 网关与路由器地址相同&#xff0c;一般路由地址为.1或.254。 网关是当电脑发送的数据的目标IP不在…

芯片制造详解.光刻技术与基本流程.学习笔记(四)

本篇文章是看了以下视频后的笔记提炼&#xff0c;欢迎各位观看原视频&#xff0c;这里附上地址 芯片制造详解04&#xff1a;光刻技术与基本流程&#xff5c;国产之路不容易 芯片制造详解.光刻技术与基本流程.学习笔记 四 一、引子二、光刻(1).光掩膜(2).光刻机(3).光刻胶(4).挖…

【深度学习】High-Resolution Image Synthesis with Latent Diffusion Models,论文

13 Apr 2022 论文&#xff1a;https://arxiv.org/abs/2112.10752 代码&#xff1a;https://github.com/CompVis/latent-diffusion 文章目录 PS基本概念运作原理 AbstractIntroductionRelated WorkMethodPerceptual Image CompressionLatent Diffusion Models Conditioning Mec…

什么是线程?为什么需要线程?和进程的区别?

目录 前言 一.线程是什么&#xff1f; 1.1.为什么需要线程 1.2线程的概念 1.3线程和进程的区别 二.线程的生命周期 三.认识多线程 总结 &#x1f381;个人主页&#xff1a;tq02的博客_CSDN博客-C语言,Java,Java数据结构领域博主 &#x1f3a5; 本文由 tq02 原创&#xf…

深入探索Vue.js核心技术与跨平台开发uni-app实战

&#x1f482; 个人网站:【工具大全】【游戏大全】【神级源码资源网】&#x1f91f; 前端学习课程&#xff1a;&#x1f449;【28个案例趣学前端】【400个JS面试题】&#x1f485; 寻找学习交流、摸鱼划水的小伙伴&#xff0c;请点击【摸鱼学习交流群】 前言 在当今Web应用不断…

UGUI图文混排超链接

目录 一、LinkSpriteText二、EmojiText1、EmojiText2、支持超链接的EmojiText出现的问题 三、通用版EmojiText1、使用方法 之前做web项目有个需求需要通过某种方式打开试题中所提到的关键字介绍,当时是在试题旁边放个小按钮点击打开,后来要求把图标放在题干中,或者直接点击关键…

【C++奇遇记】函数探幽(上)

&#x1f3ac; 博客主页&#xff1a;博主链接 &#x1f3a5; 本文由 M malloc 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f384; 学习专栏推荐&#xff1a;LeetCode刷题集 数据库专栏 初阶数据结构 &#x1f3c5; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如…

誉天程序员-瀑布模型-敏捷开发模型-DevOps模型比较

文章目录 2. 项目开发-开发方式2.1. 瀑布开发模型2.2. 敏捷开发模型2.3. DevOps开发模型2.4. 区别 自增主键策略1、数据库支持主键自增自增和uuid方案优缺点 2. 项目开发-开发方式 由传统的瀑布开发模型、敏捷开发模型&#xff0c;一跃升级到DevOps开发运维一体化开发模型。 …

swagger相关问题

swagger相关问题 swagger版本为&#xff1a; <dependency><groupId>com.github.xiaoymin</groupId><artifactId>swagger-bootstrap-ui</artifactId><version>1.9.6</version> </dependency> <dependency><groupId&…

Grafana集成prometheus(4.Grafana添加预警)

上文已经完成了grafana对prometheus的集成及数据导入&#xff0c;本文主要记录grafana的预警功能&#xff08;以内存为例&#xff09; 添加预警 添加入口&#xff08;2个&#xff09; databorard面板点击edit&#xff0c;下方有个Alert的tab&#xff0c;创建Alert rules依赖…

IDEA偶尔编译的时候不识别lombok

偶尔IDEA启动项目的时候会识别不到lombok,识别不到get()跟set()方法 方案 在settings添加下面代码 -Djps.track.ap.dependenciesfalse

自然语言处理学习笔记(二)————语料库与开源工具

目录 1.语料库 2.语料库建设 &#xff08;1&#xff09;规范制定 &#xff08;2&#xff09;人员培训 &#xff08;3&#xff09;人工标注 3.中文处理中的常见语料库 &#xff08;1&#xff09;中文分词语料库 &#xff08;2&#xff09;词性标注语料库 &#xff08;3…

我理解的音响设备音频放大器地线环路共地回路造成交流声干扰哼声的分析,信号接地,工业仪表接地的问题

我理解的音响设备音频放大器地线环路共地回路造成交流声干扰哼声的分析&#xff0c;信号接地&#xff0c;工业仪表接地的问题 wxleasyland 2023.8 一、地线环路造成交流声哼声 家里插座中有一个的PE地线&#xff0c;相当于大地。 设备1的“信号地”接到家里三脚插座的PE地线…

【雕爷学编程】MicroPython动手做(39)——机器视觉之图像基础2

MixPY——让爱(AI)触手可及 MixPY布局 主控芯片&#xff1a;K210&#xff08;64位双核带硬件FPU和卷积加速器的 RISC-V CPU&#xff09; 显示屏&#xff1a;LCD_2.8寸 320*240分辨率&#xff0c;支持电阻触摸 摄像头&#xff1a;OV2640&#xff0c;200W像素 扬声器&#…

unity TextMeshPro 富文本

<b>粗体标签</b> <i>斜体标签</i> <u>下划线标签</u> <s>删除线标签</s> <sup>上标标签</sup>前面后边上标签 5<sup>。</sup>C <sub>下标标签&#xff0c;如&#xff1a;</sub>H<sub&…

【练】要求定义一个全局变量 char buf[] = “1234567“,创建两个线程,不考虑退出条件,打印buf

要求定义一个全局变量 char buf[] "1234567"&#xff0c;创建两个线程&#xff0c;不考虑退出条件&#xff0c;另&#xff1a; A线程循环打印buf字符串&#xff0c;B线程循环倒置buf字符串&#xff0c;即buf中本来存储1234567&#xff0c;倒置后buf中存储7654321. 不…