ZooKeeper 实战(四) Curator Watch事件监听

文章目录

  • ZooKeeper 实战(四) Curator Watch事件监听
    • 0.前言
    • 1.Watch 事件监听概念
    • 2.NodeCache
      • 2.1.全参构造器参数
      • 2.2.代码DEMO
      • 2.3.日志输出
    • 3.PathChildrenCache
      • 3.1.全参构造器参数
      • 3.2.子节点监听时间类型
      • 3.2.代码DEMO
    • 4.TreeCache
      • 4.1.构造器参数
      • 4.2.代码DEMO
      • 4.3.日志输出

ZooKeeper 实战(四) Curator Watch事件监听

0.前言

上一篇博客只介绍了有关Curator中对ZNode的CRUD操作,从本篇起开始逐步介绍更加高级的API操作。

1.Watch 事件监听概念

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。虽然ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。

而 Curator 引入了Cache 来实现对 ZooKeeper 服务端事件的监听。

Curator 中提供了三种 Cache(Watcher)来监听不同节点变化类型:

  • NodeCache:监听指定的节点。
  • PathChildrenCache:监听指定节点的子节点。
  • TreeCache:监听指定节点及其子孙节点。

2.NodeCache

监听指定的节点,增删改都会监听。

2.1.全参构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 */
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

2.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher";
        TimeUnit.SECONDS.sleep(3);

        // 创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client,path);
        // 添加监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null){
                    String s = new String(currentData.getData(),StandardCharsets.UTF_8);
                    log.info("监听{}节点发生变化,数据内容:{}",path,s);
                }else {
                    log.info("监听{}节点被删除了",path);
                }
            }
        });
      	// 开启监听
        nodeCache.start();

        TimeUnit.SECONDS.sleep(2);
        // 创建节点
        client.create().creatingParentsIfNeeded().forPath(path,"第一次新增".getBytes(StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(2);
        // 更新节点
        client.setData().forPath(path,"数据修改了".getBytes(StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(2);
        // 删除节点
        client.delete().deletingChildrenIfNeeded().forPath(path);
    }

2.3.日志输出

在这里插入图片描述

3.PathChildrenCache

监听指定节点的子节点。当一个子节点增删改时, PathChildrenCache会包含最新的子节点的数据和状态。

3.1.全参构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: cacheData 是否缓存节点内容(包含节点状态)
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 * @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果
 */
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

3.2.子节点监听时间类型

public enum Type
{
  	// 子节点添加
    CHILD_ADDED,
  	// 子节点的数据变更
    CHILD_UPDATED,
		// 子节点被删除
    CHILD_REMOVED,
 
  	// 以下三个事件类型表示:当连接断开时,PathChildrenCache将继续保持其断开连接之前的状态,并且在连接恢复后,PathChildrenCache将为断开连接期间发生的所有添加、删除和更新发出正常的子事件。
  	// 当连接状态处于ConnectionState.SUSPENDED。
    CONNECTION_SUSPENDED,
  	// 当连接状态处于ConnectionState.RECONNECTED
    CONNECTION_RECONNECTED,
  	// 当连接状态处于ConnectionState.LOST
    CONNECTION_LOST,
  	
  	// 当通过PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)启动监听时,该事件表示PathChildrenCache初始化完成
  This event signals that the initial cache has been populated.
    INITIALIZED
}

3.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher";
        TimeUnit.SECONDS.sleep(3);

        // 创建PathChildrenCache对象
        // 此处的cacheData参数一定要设置为true,不然Curator不会缓存数据当本地,
        // 那么后续pathChildrenCache.getCurrentData()得到的数据都为null
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,true);
        // 添加监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED){
                    log.info("PathChildrenCache初始化完,事件类型:{}", event.getType());
                }else {
                    ChildData currentData = event.getData();
                    log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());
                }
            }
        });
        // 开启监听
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        // 创建子节点
        TimeUnit.SECONDS.sleep(2);
        client.create().creatingParentsIfNeeded().forPath(path+"/c1");
        client.create().creatingParentsIfNeeded().forPath(path+"/c2");
        client.create().creatingParentsIfNeeded().forPath(path+"/c3/age");
        // 修改子节点
        TimeUnit.SECONDS.sleep(2);
        client.setData().forPath(path+"/c1","c1更新了".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath(path+"/c2","c2更新了".getBytes(StandardCharsets.UTF_8));
        // 删除子节点
        TimeUnit.SECONDS.sleep(2);
        client.delete().deletingChildrenIfNeeded().forPath(path+"/c3");
    }

3.3.日志输出

可以看出,PathChildrenCache只会监听直属子节点的变化,其非直属子节点的后代节点如/c3/age,没有发布通知。

在这里插入图片描述

4.TreeCache

监听指定节点及其子孙节点。

4.1.构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 */
public TreeCache(CuratorFramework client, String path)
  
/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: cacheData 是否缓存节点内容(包含节点状态)
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 * @param: maxDepth 最大深度。最深的那个后代节点到path所需要经过的节点数
 * @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果
 * @param: createParentNodes 是否需要创建父节点。如果父节点不存在泽创建父节点(容器节点)
 * @param: TreeCacheSelector TreeCache选择器。根据指定的策略和条件,选择适合的缓存树来创建和维护TreeCache
 */
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)

4.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher/tree";
        TimeUnit.SECONDS.sleep(3);

        // 创建TreeCache对象,也可通过TreeCache.newBuilder()创建
        TreeCache treeCache = new TreeCache(client,path);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                if (event.getType() == TreeCacheEvent.Type.INITIALIZED){
                    log.info("TreeCache初始化完,事件类型:{}", event.getType());
                }else {
                    ChildData currentData = event.getData();
                    log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());
                }
            }
        });
        // 开启监听
        treeCache.start();

        // 创建节点
        TimeUnit.SECONDS.sleep(2);
        client.create().creatingParentsIfNeeded().forPath(path);
        client.create().creatingParentsIfNeeded().forPath(path +"/t1");
        client.create().creatingParentsIfNeeded().forPath(path +"/t2/ccc");
        // 修改子节点
        TimeUnit.SECONDS.sleep(2);
        client.setData().forPath(path,"根节点更新了".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath(path +"/t2/ccc","/t2/ccc更新了".getBytes(StandardCharsets.UTF_8));
        // 删除子节点
        TimeUnit.SECONDS.sleep(2);
        client.delete().deletingChildrenIfNeeded().forPath(path +"/t2");
    }

4.3.日志输出

可以看出TreeCache会监听当前节点和后代节点的变化。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/316830.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营第四天 |链表总结

1、每次先加判断: if (head null) {return head;} 2、ListNode dummy new ListNode(-1, head);和ListNode dummy new ListNode(-1);区别: 在Java中,ListNode dummy new ListNode(-1, head); 和 ListNode dummy new ListNode(-1); 的主…

软考学习笔记--操作系统-进程管理

进程管理是一个具有独立功能的程序关于数据集合的一次可以并发执行的运行活动,是系统进行资源分配和调度的基本单位。相对于程序,进程是动态的概念,而程序是静态的概念,是指令的集合。进程具有动态性和并发性,需要一定…

LeetCode讲解篇之39. 组合总和

文章目录 题目描述题解思路题解代码 题目描述 题解思路 首先排序数组,然后开始选择数字,当选择数字num后,在去选择大于等于num的合法数字,计算过程中的数字和,直到选数字和等于target, 加入结果集,若数字和…

爬虫案例—表情党图片data-src抓取

爬虫案例—表情党图片data-src抓取 表情党网址:https://qq.yh31.com 抓取心情板块的图片data-src 由于此页面采用的是懒加载技术,为了节省网络带宽和减轻服务器压力。不浏览的图片,页面不加载,统一显示LOADING…。如下图&#x…

tkinter控件中文显示为unicode编码的解决办法

一、背景 最近使用python tkinter编写界面应用时,发现按钮的中文名称在windows上显示正常,但是在linux上显示为中文的unicode编码;文本输入框也是,输入中文输时,text控件上也显示为unicode编码,如下图所示…

【Python数据可视化】matplotlib之设置坐标:添加坐标轴名字、设置坐标范围、设置主次刻度、坐标轴文字旋转并标出坐标值

文章传送门 Python 数据可视化matplotlib之绘制常用图形:折线图、柱状图(条形图)、饼图和直方图matplotlib之设置坐标:添加坐标轴名字、设置坐标范围、设置主次刻度、坐标轴文字旋转并标出坐标值matplotlib之增加图形内容&#x…

02-Dapper

1.2:Dapper 1.2.1:设计要求 1、无处不在的部署: 任何服务都应该被监控到,任何服务出问题都要做到有据可查。2、持续的监控:做到7*24小时全天候监控,任何时候出了问题都要基于监控数据追踪问题根源。1.2.2…

基于SSM+JSP的订餐管理系统的设计与实现

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…

WordPress企业模板

首页大图wordpress外贸企业模板 橙色的wordpress企业模板 演示 https://www.zhanyes.com/waimao/6250.html

如何领取腾讯云免费服务器?腾讯云服务器免费领取教程

腾讯云免费服务器申请入口 https://curl.qcloud.com/FJhqoVDP 免费服务器可选轻量应用服务器和云服务器CVM,轻量配置可选2核2G3M、2核8G7M和4核8G12M,CVM云服务器可选2核2G3M和2核4G3M配置,腾讯云百科txybk.com分享2024年最新腾讯云免费服务器…

SAP SQVI制作报表及SE93创建事务代码

在平时的项目中,财务想查询所有的凭证明细,SAP的查询凭证FB03不能满足需求,所以用SQVI制作一个简易的查询报表。 1、打开SQVI,填写自开发报表的名称“ZFB03”,点击“创建”,输入自开发报表的名称“凭证明细…

79LXX 三端负电源电压调节器,具有一系列固定电压输出,适用于小于100mA电源供给的场合

79LXX系列三端负电源电压调节器是单片双极型线性集成电路,采用TO92、SOT89-3的封装形式封装,有一系列固定的电压输出,适用于小于100mA电源供给的场合。 主要特点: 最大输出电流为100mA 固定输出电压分别为-5V、-6V、-8V、-9V、-1…

回归预测 | Matlab基于SO-GRU蛇群算法优化门控循环单元的数据多输入单输出回归预测

回归预测 | Matlab基于SO-GRU蛇群算法优化门控循环单元的数据多输入单输出回归预测 目录 回归预测 | Matlab基于SO-GRU蛇群算法优化门控循环单元的数据多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab基于SO-GRU蛇群算法优化门控循环单元的数…

7.11、Kali Linux中文版虚拟机安装运行教程

目录 一、资源下载准备工作 二、安装教程 三、kali linux换源 四、apt-get update 报错 一、资源下载准备工作 linux 中文版镜像历史版本下载:http://old.kali.org/kali-images/ 大家可以自行选择版本下载,本人下载的是2021版本 二、安装教程 打开vmvare wokst…

Canopen学习笔记——sync同步报文增加数据域(同步计数器)

1.Canfestival同步报文sync的设置 在OD表中的配置如下: 如果0x1006索引的同步报文循环周期时间设置为0则禁用同步报文,这里要注意的就是,上面第一张图也提到了,时间单位是us。第二张图,我的0x1006就设置为0xF4240,也就…

docker compose安装gitlab

环境 查看GitLab镜像 docker search gitlab 拉取GitLab镜像 docker pull gitlab/gitlab-ce 准备gitlab-docker.yml文件 version: 3.1 services:gitlab:image: gitlab/gitlab-ce:latestcontainer_name: gitlabrestart: alwaysenvironment:GITLAB_OMNIBUS_CONFIG: |external_url…

HarmonyOS开发FA应用模型下多个页面的声明方式

目录 方式1 方式2 HarmonyOS配套的IDE是DevEco Studio,目前的版本是3.1。官网可以直接下载 HUAWEI DevEco Studio和SDK下载和升级 | HarmonyOS开发者 ​ 方式1 ​在DevEco Studio如果是在pages目录通过右键New->ArkTS File生成的文件,需要注意&…

PHP如何拆分中文名字(包括少数民族名字)

/*** param string|null $name* return array|null*/ function splitName($name) {if (empty($name) || empty(trim($name))) {return null;}//该正则是用来提取$name参数里面的中文字符的。preg_match_all(/[\x{4e00}-\x{9fff}]/u, $name, $matchers);$matchersCount isset($…

互联网医院系统|北京线上问诊|线上问诊系统功能解析

随着科技的不断发展,线上问诊系统作为一种快速、便捷的医疗服务方式在近年来越来越受欢迎。本文将重点介绍线上问诊系统的开发功能及其优势,帮助读者更好地了解这一医疗服务方式的价值和好处。 一、线上问诊系统的开发功能: 1、患者注册与登…

Ovtio不同版本下载

关注 M r . m a t e r i a l , \color{Violet} \rm Mr.material\ , Mr.material , 更 \color{red}{更} 更 多 \color{blue}{多} 多 精 \color{orange}{精} 精 彩 \color{green}{彩} 彩! 主要专栏内容包括: †《LAMMPS小技巧》: ‾ \textbf…