Zookeeper 三、Zookeeper基本使用

1.Zookeeper系统模型

1)Zookeeper数据模型之ZNode

在Zookeeper中,数据信息被保存在一个个数据节点上,这些节点被称为ZNode。ZNode是Zookeeper中最小数据单位,在ZNode下面又可以再挂ZNode,这样一层层下去就形成了一个层次化命名空间ZNode树,我们称为ZNode Tree,它采用了类似文件系统的层级树状结构进行管理。
image.png

在Zookeeper中,每一个数据节点都是一个ZNode,上图根目录下有两个节点,分别是:app1和app2,其中app1下面又有三个子节点,所有ZNode按层次化进行组织,形成这么一棵树,ZNode的节点路径标识方式和Unix文件系统路径非常相似,都是由一系列使用斜杠(/)进行分割的路径表示,开发人员可以向这个节点写入数据,也可以在这个节点下面创建子节点

2)ZNode的类型

Zookeeper节点类型可以分为三大类:

  • 持久性节点(Persistent)
  • 临时性节点(Ephemeral)
  • 顺序性节点(Sequential)

在开发中在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点。不同类型的节点则会有不同的生命周期。

(1)持久节点

是Zookeeper中最常见的一种节点类型,所谓持久节点,就是指节点被创建后会一直存在服务器,直到删除操作主动清除

(2)持久顺序节点

就是顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序

(3)临时节点

就是会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点

(4)临时顺序节点

就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀

3)事务ID

事务是对物理和抽象的应用状态上的操作集合。往往在现在的概念中,狭义上的事务通常指的是数据库事务,一般包含了一系列对数据库有序的读写操作,这些数据库事务具有所谓的ACID特性,即原子性(Atomic)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)

而在Zookeeper中,事务是指能够改变Zookeeper服务器状态的操作,我们也称之为事务操作或更新操作,一般包括数据节点创建与删除、数据节点内容更新等操作。对于每一个事务请求,Zookeeper都会为其分配一个全局唯一的事务ID,用ZXID来表示,通常是一个64位的数字。每一个ZXID对应一次更新操作,从这些ZXID中可以间接地识别出Zookeeper处理这些更新操作请求的全局顺序。

4)ZNode的状态信息

image.png
整个ZNode节点内容包括两部分:节点数据内容和节点状态信息。图中的quota是数据内容,其他的属于状态信息。那么这些状态信息都有什么含义呢?

  • cZxid:Create ZXID,表示节点被创建时的事务ID
  • ctime:Create Time,表示节点创建时间
  • mZxid:Modified ZXID,表示节点最后一次被修改时的事务ID
  • mtime:Modified Time,表示节点最后一次被修改的时间
  • pZxid:表示该节点的子节点列表最后一次被修改时的事务ID,只有子节点列表变更才会更新pZxid,子节点内容变更不会更新
  • cversion:表示子节点的版本号
  • dataVersion:表示内容版本号
  • aclVersion:标识acl版本
  • ephemeralOwner:表示创建该临时节点时的会话sessionID,如果是持久性节点那么值为0
  • dataLength:表示数据长度
  • numChildren:表示直系节点数
5)Watcher–数据变更通知

Zookeeper使用Watcher机制实现分布式数据的发布/订阅功能
一个典型的发布/订阅模式系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。

在Zookeeper中,引入了Watcher机制来实现这种分布式的通知方式。Zookeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。
整个Watcher注册与通知过程如图所示
image.png
Zookeeper的Watcher机制主要包括客户端线程、客户端WatcherManager、Zookeeper服务器三部分。
具体工作流程为:客户端在向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的WatcherManager当中。当Zookeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑

6)ACL–保障数据的安全

Zookeeper作为一个分布式协调框架,其内部存储了分布式系统运行时状态的元数据,这些元数据会直接影响基于Zookeeper进行构造的分布式系统的运行状态,因此,如何保障系统中数据的安全,从而避免因误操作所带来的数据随意变更而导致的数据库异常十分重要,在Zookeeper中,提供了一套完善的ACL(Access Control List)权限控制机制来保障数据的安全
我们可以从三个方面来理解ACL机制:权限模式(Scheme)、授权对象(ID)、权限)Permission),通常使用“scheme : id : permission"来标识一个有效的ACL信息

(1)权限模式:Scheme

权限模式用来确定权限验证过程中使用的检验策略,有如下四种模式:

  1. IP
    1. IP模式就是通过IP地址粒度来进行权限控制
    2. 如“ip:192.168.0.110"表示权限控制针对该IP地址
    3. 同时IP模式可以支持按照网段方式进行配置,如"ip:192.168.0.1/24"表示针对192.168.0.*这个网段进行权限控制
  2. Digest
    1. Digest是最常用的权限控制模式,要更符合我们对权限控制的认识,其使用"username:password"形式的权限标识来进行权限配置,便于区分不同应用来进行权限控制。
    2. 当我们通过"username:password"形式配置了权限标识后,Zookeeper会先后对其进行SHA-1加密和BASE64编码
  3. World
    1. World是一种最开放的权限控制模式,这种权限控制方式几乎没有任何作用,数据节点的访问权限对所有用户开放,即所有用户都可以在不进行任何权限校验的情况下操作Zookeeper上的数据。
    2. 另外,World模式也可以看作是一种特殊的Digest模式,它只有一个权限标识,即"world:anyone"
  4. Super
    1. Super模式,顾名思义就是超级用户的意思,也是一种特殊的Digest模式。在Super模式下,超级用户可以对任意Zookeeper上的数据节点进行任何操作
(2)授权对象:ID

授权对象指的是权限赋予的用户或一个指定实体,例如IP地址或是机器等。在不同的权限模式下,授权对象是不同的,表中列出了各个权限模式和授权对象之间的对应关系。

权限模式授权对象
IP通常是一个Ip地址或IP段,例如:192.168.10.110或192.168.10.1/24
Digest自定义,通常是username:BASE64(SHA-1((username:password))
World只有一个ID:anyone
Super超级用户
(3)权限:Permission

权限就是指那些通过权限检查后可以被允许执行的操作。在Zookeeper中,所有对数据的操作权限分为以下五大类:

  • CREATE(C):数据节点的创建权限,允许授权对象在该数据节点下创建子节点
  • DELETE(D):子节点的删除权限,允许授权对象删除该数据节点的子节点
  • READ(R):数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或子节点列表等
  • WAITE(W):数据节点的更新权限,允许授权对象对该数据节点进行更新操作
  • ADMIN(A):数据节点的管理权限,允许授权对象对该数据节点进行ACL相关的设置操作

2.Zookeeper命令行操作

首先,进入到zookeeper的bin目录之后
通过zkClient进入Zookeeper客户端命令行

./zkcli.sh    连接本地的zookeeper服务器
./zkCli.sh -server ip:port    连接指定的服务器

连接成功之后,系统会输出Zookeeper的相关环境及配置信息等信息。输入help之后,屏幕会输出可用的zookeeper命令,如下图所示

image.png

1)创建节点

使用create命令,可以创建一个Zookeeper节点,如

create [-s][-e] path data acl
其中,-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点;acl用来进行权限控制
(1)创建顺序节点
create -s /zk-test 123

image.png

(2)创建临时节点
create -e /zk-temp 123

image.png

临时节点在客户端会话结束后,就会自动删除,下面使用quit命令退出客户端

quit

再次使用客户端连接服务器,并使用ls/命令查看根目录下的节点
image.png
可以看到根目录下已经不存在zk-temp临时节点了

(3)创建永久节点
create /zk-permanent 123

image.png

可以看到永久节点不同于顺序节点,不会自动在后面添加一串数字

2)读取节点

与读取相关的命令有 ls 命令和 get 命令
ls 命令可以列出Zookeeper指定节点下的所有子节点,但只能查看指定节点下的第一级的所有子节点

ls path
#其中,path表示的是指定数据字节的节点路径

get 命令可以获取Zookeeper指定节点的数据内容和属性信息

get path

若获取根节点下面的所有子节点,使用 ls / 命令即可
image.png
若想获取 /zk-permanent 的数据内容和属性,可使用如下命令:get /zk-permanent
image.png

image.png

3)更新节点

使用set命令,可以更新指定节点的数据内容,用法如下:

set path data [version]

其中,data就是要更新的新内容,version表示数据版本,在zookeeper中,节点的数据是有版本概念的,这个参数用于指定本次更新操作是基于ZNode的哪一个数据版本进行的,如将/zk-permanent节点的数据更新为456,可以使用如下命令:set /zk-permanent 456
image.png
现在dataVersion已经变为1了,表示进行了更新

4)删除节点

使用delete命令可以删除Zookeeper上的指定节点,用法如下

delete path [version]

其中version表示数据版本,使用delete /zk-permanent 命令即可删除 /zk-permanent节点
image.png

【注】若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点

3.Zookeeper的api使用

Zookeeper作为一个分布式框架,主要用来解决分布式一致性问题,它提供了简单的分布式源语,并且对多种编程语言提供了API,所以接下来重点来看下Zookeeper的Java客户端API使用方式
Zookeeper API共包含五个包,分别为:

  • org.apache.zookeeper
  • org.apache.zookeeper.data
  • org.apache.zookeeper.server
  • org.apache.zookeeper.server.quorum
  • org.apache.zookeeper.server.upgrade

包含Zookeeper类,他是我们编程时最常用的类文件。这个类是Zookeeper客户端的主要类文件。如果要使用Zookeeper服务,应用程序首先必须创建一个Zookeeper实例,这是就需要使用此类。一旦客户端和Zookeeper服务端建立起了连接,Zookeeper系统将会给本次连接会话分配一个ID值,并且客户端将会周期性的向服务器端发送心跳维持会话连接。主要连接有效,客户端就可以使用Zookeeper API来做相应处理了。

准备工作:导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.9.2</version>
        </dependency>
    </dependencies>

1)建立会话
public class CreateSession implements Watcher {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    /*
     * 建立会话
     */
    public static void main(String[] args) throws IOException, InterruptedException {
        /*
        客户端可以通过创建⼀个zk实例来连接zk服务器
        new Zookeeper(connectString,sesssionTimeOut,Wather)
        connectString: 连接地址:IP:端⼝
        sesssionTimeOut:会话超时时间:单位毫秒
        Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
        */

        ZooKeeper zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new CreateSession());
//        System.out.println(zooKeeper.getState());

        //计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
        countDownLatch.await();
        System.out.println("客户端与服务端会话真正建立了"+zooKeeper.getState());

    }

    /*
     * 回调方法:处理来自服务器端的Watcher通知
     **/
    @Override
    public void process(WatchedEvent watchedEvent) {
        // SyncConnectedif
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
            //解除主程序在CountDownLatch的等待阻塞
            countDownLatch.countDown();

        }

    }
}

【注】
Zookeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立一个可用的会话,在会话的生命周期中处于“CONNECTING”状态。当该会话真正创建完毕后Zookeeper服务端向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话

2)创建节点
public class CreateNode implements Watcher {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new CreateNode());
//        countDownLatch.await();
        Thread.sleep(Integer.MAX_VALUE);
    }




    @Override
    public void process(WatchedEvent watchedEvent) {
        //当连接创建了,服务端发送给客户端SyncConnected事件
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
            //解除主程序在CountDownLatch的等待阻塞
//            countDownLatch.countDown();

            //调用创建节点方法
            try {
                createNodeSync();
            }catch (Exception e){
                e.printStackTrace();
            }
        }

    }

    private void createNodeSync() throws Exception {
        /*
         * path :节点创建的路径
         * data[] :节点创建要保存的数据,是个byte类型的
         * acl :节点创建的权限信息(4种类型)
         *      ANYONE_ID_UNSAFE : 表示任何⼈
         *      AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替换。
         *      OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)--> world:anyone
         *      CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
         * createMode :创建节点的类型(4种类型)
         *      PERSISTENT:持久节点
         *      PERSISTENT_SEQUENTIAL:持久顺序节点
         *      EPHEMERAL:临时节点
         *      EPHEMERAL_SEQUENTIAL:临时顺序节点
         String node = zookeeper.create(path,data,acl,createMode);
         **/
        String node_PERSISTENT = zooKeeper.create("/xg_persistent","持久节点内容".getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String node_PERSISTENT_SEQUENTIAL = zooKeeper.create("/xg_persistent_sequential","持久顺序节点内容".getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
        String node_EPERSISTENT = zooKeeper.create("/xg_ephemeral","临时节点内容".getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        System.out.println("创建的持久节点是:"+node_PERSISTENT);
        System.out.println("创建的持久顺序节点是:"+node_PERSISTENT_SEQUENTIAL);
        System.out.println("创建的临时节点是:"+node_EPERSISTENT);

    }
}
3)获取节点数据
public class GetNodeData implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    /*
      建立会话
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

     /*
        客户端可以通过创建一个zk实例来连接zk服务器
        new Zookeeper(connectString,sesssionTimeOut,Wather)
        connectString: 连接地址:IP:端口
        sesssionTimeOut:会话超时时间:单位毫秒
        Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
     */

        zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new GetNodeData());
        System.out.println(zooKeeper.getState());

        // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);

    }
    /*
        回调方法:处理来自服务器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {

        /*
            子节点列表发生改变时,服务器端会发生noteChildrenChanged事件通知
            要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听
         */
        if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){

            List<String> children = null;
            try {
                children = zooKeeper.getChildren("/xg_persistent", true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(children);

        }
        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程序在CountDownLatch上的等待阻塞
            System.out.println("process方法执行了...");

            // 获取节点数据的方法
            try {
                getNoteData();

                // 获取节点的子节点列表方法
                getChildren();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /*
        获取某个节点的内容
     */
    private void getNoteData() throws KeeperException, InterruptedException {

        /**
         * path    : 获取数据的路径
         * watch    : 是否开启监听
         * stat    : 节点状态信息
         *        null: 表示获取最新版本的数据
         *  zk.getData(path, watch, stat);
         */
        byte[] data = zooKeeper.getData("/xg_persistent", false, null);
        System.out.println(new String(data));


    }

    /*
        获取某个节点的子节点列表方法
     */
    public static void getChildren() throws KeeperException, InterruptedException {

        /*
            path:路径
            watch:是否要启动监听,当子节点列表发生变化,会触发监听
            zooKeeper.getChildren(path, watch);
         */
        List<String> children = zooKeeper.getChildren("/xg_persistent", true);
        System.out.println(children);
    }
}
4)修改节点数据
public class UpdateNodeData implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    /*
      建立会话
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

     /*
        客户端可以通过创建一个zk实例来连接zk服务器
        new Zookeeper(connectString,sesssionTimeOut,Wather)
        connectString: 连接地址:IP:端口
        sesssionTimeOut:会话超时时间:单位毫秒
        Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
     */

        zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new UpdateNodeData());
        System.out.println(zooKeeper.getState());

        // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);
    }
    /*
        回调方法:处理来自服务器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {
        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程序在CountDownLatch上的等待阻塞
            System.out.println("process方法执行了...");

            // 更新数据节点内容的方法
            try {
                updateNoteSync();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /*
        更新数据节点内容的方法
     */
    private void updateNoteSync() throws KeeperException, InterruptedException {

         /*
            path:路径
            data:要修改的内容 byte[]
            version:为-1,表示对最新版本的数据进行修改
            zooKeeper.setData(path, data,version);
         */
        byte[] data = zooKeeper.getData("/xg_persistent", false, null);
        System.out.println("修改前的值:" + new String(data));

        //修改/lg-persistent 的数据 stat: 状态信息对象
        Stat stat = zooKeeper.setData("/xg_persistent", "客户端修改了节点数据".getBytes(), -1);

        byte[] data2 = zooKeeper.getData("/xg_persistent", false, null);
        System.out.println("修改后的值:" + new String(data2));
    }
}
5)删除节点
public class DeleteNode implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    /*
      建立会话
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

     /*
        客户端可以通过创建一个zk实例来连接zk服务器
        new Zookeeper(connectString,sesssionTimeOut,Wather)
        connectString: 连接地址:IP:端口
        sesssionTimeOut:会话超时时间:单位毫秒
        Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
     */

        zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new DeleteNode());
        System.out.println(zooKeeper.getState());

        // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);
    }
    /*
        回调方法:处理来自服务器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {
        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程序在CountDownLatch上的等待阻塞
            System.out.println("process方法执行了...");
            // 删除节点
            try {
                deleteNoteSync();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /*
        删除节点的方法
     */
    private void deleteNoteSync() throws KeeperException, InterruptedException {
        /*
      	zooKeeper.exists(path,watch) :判断节点是否存在
      	zookeeper.delete(path,version) : 删除节点
      */
        Stat stat = zooKeeper.exists("/xg_persistent/c1", false);
        System.out.println(stat == null ? "该节点不存在":"该节点存在");

        if(stat != null){
            zooKeeper.delete("/xg_persistent/c1",-1);
        }

        Stat stat2 = zooKeeper.exists("/xg_persistent/c1", false);
        System.out.println(stat2 == null ? "该节点不存在":"该节点存在");
    }
}

4.Zookeeper开源客户端

1)ZkClient

ZkClient是Github上一个开源的zookeeper客户端,在zookeeper原生API接口之上进行了包装,是一个更易用的Zookeeper客户端,同时,ZkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能

添加依赖

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>
(1)创建会话
public class CreateSession {

    public static void main(String[] args) {
        /*
         *   创建一个zkclient实例就可以完成连接,完成会话的创建
         *   serverString : 服务器连接地址
         *   注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
         */
        ZkClient zkClient = new ZkClient("192.168.101.3:2181");
        System.out.println("会话被创建了...");
    }
}
(2)创建节点
public class CreateNode {
    /*
     *   借助zkclient完成会话的创建
     */
    public static void main(String[] args) {

        /*
         *   创建一个zkclient实例就可以完成连接,完成会话的创建
         *   serverString : 服务器连接地址
         *   注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
         */

        ZkClient zkClient = new ZkClient("192.168.101.3:2181");
        System.out.println("会话被创建了..");

        /*
         *  createParents : 是否要创建父节点,如果值为true,那么就会递归创建节点
         */
        zkClient.createPersistent("/xg_zkclient/c1",true);
        System.out.println("节点递归创建完成");

    }
}

image.png

【注】
在原生态接口是无法递归创建节点的(父节点不存在),但是通过ZkClient通过设置createParents参数为true可以递归的先创建父节点,再创建子节点。

(3)删除节点

ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点

public class DeleteNode {
    /*
     * 借助zkclient完成会话的创建
     */
    public static void main(String[] args) {
        /*
            创建一个zkclient实例就可以完成连接,完成会话的创建
            serverString : 服务器连接地址

            注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
         */

        ZkClient zkClient = new ZkClient("192.168.101.3:2181");
        System.out.println("会话被创建了..");

        // 递归删除节点
        String path = "/xg_zkclient/c1";
        zkClient.createPersistent(path+"/c11");
        zkClient.deleteRecursive(path);
        System.out.println("递归删除成功");

    }
}
(4)获取子节点
public class GetNode {

    /*
     * 借助zkclient完成会话的创建
     */
    public static void main(String[] args) throws InterruptedException {

        /*
            创建一个zkclient实例就可以完成连接,完成会话的创建
            serverString : 服务器连接地址

            注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
         */

        ZkClient zkClient = new ZkClient("192.168.101.3:2181");
        System.out.println("会话被创建了..");

        // 获取子节点列表
        List<String> children = zkClient.getChildren("/xg_zkclient");
        System.out.println(children);

        // 注册监听事件

        /*
            客户端可以对一个不存在的节点进行子节点变更的监听
            只要该节点的子节点列表发生变化,或者该节点本身被创建或者删除,都会触发监听
         */
        zkClient.subscribeChildChanges("/xg_zkclient_get", new IZkChildListener() {
            /*
                s : parentPath
                list : 变化后子节点列表
             */

            public void handleChildChange(String parentPath, List<String> list) throws Exception {
                System.out.println(parentPath + "的子节点列表发生了变化,变化后的子节点列表为"+ list);

            }
        });
        //测试
        zkClient.createPersistent("/xg_zkclient_get");
        Thread.sleep(1000);

        zkClient.createPersistent("/xg_zkclient_get/c1");
        Thread.sleep(1000);

    }
}

image.png

客户端可以对一个不存在的节点进行子节点变更的监听
一旦客户端对一个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端。
该节点本身的创建或删除也会通知到客户端

(5)获取数据(节点是否存在、更新、删除)
public class GetNodeSample {

    public static void main(String[] args) throws InterruptedException {

        /*
            创建一个zkclient实例就可以完成连接,完成会话的创建
            serverString : 服务器连接地址

            注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
         */
        ZkClient zkClient = new ZkClient("192.168.101.3:2181");
        System.out.println("会话被创建了..");

        // 判断节点是否存在
        String path = "/xg_zkClient_ep";
        boolean exists = zkClient.exists(path);

        if(!exists){
            // 创建临时节点
            zkClient.createEphemeral(path,"123");
        }
        // 读取节点内容
        Object o = zkClient.readData(path);
        System.out.println(o);
        // 注册监听
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            /*
                当节点数据内容发生变化时,执行的回调方法
                s: path
                o: 变化后的节点内容
             */
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println(s+"该节点内容被更新,更新的内容"+o);
            }
            /*
                当节点被删除时,会执行的回调方法
                s : path
             */
            public void handleDataDeleted(String s) throws Exception {
                System.out.println(s+"该节点被删除");
            }
        });
        // 更新节点内容
        zkClient.writeData(path,"456");
        Thread.sleep(1000);

        // 删除节点
        zkClient.delete(path);
        Thread.sleep(1000);
    }
}

运行结果:

123
/xg_zkClient_ep该节点内容被更新,更新后的内容4567
/xg_zkClient_ep该节点被删除

表明可以成功监听节点数据变化或删除事件

2)Curator客户端

curator是Netflix公司开源的一套Zookeeper客户端框架,和ZkClient一样,Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了给予Fluent的变成风格支持

依赖:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
(1)创建会话

Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactor工厂类来实现的。具体如下:

1.使用CuratorFramework这个工厂类的两个静态方法来创建一个客户端

    public static CuratorFramework newClient(String connectString,
    RetryPolicy retryPolicy)

    public static CuratorFramework newClient(String connectString, 
    int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) 

其中RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略,默认提供了以下实现,分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)

2.通过调用CuratorFramework中的start()方法来启动会话

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
 5000,1000,retryPolicy);
client.start();

其实进一步查看源代码可以得知,其实这两种方法内部实现一样,只是对外包装不同的方法。它们的底层都是通过第三个方法builder来实现的

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
 .connectString("server1:2181,server2:2181,server3:2181")
 .sessionTimeoutMs(50000)
 .connectionTimeoutMs(30000)
 .retryPolicy(retryPolicy)
 .build();
client.start();

参数:

  • connectString:zk的server地址,多个server之间使用英文逗号分隔开
  • connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
  • sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
  • retryPolicy:失败重试策略
    • ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
      • baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
        • 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
      • maxRetries:最大重试次数
      • maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大事件是Integer.MAX_VALUE毫秒
  • start():完成会话的创建
public class CreateSession {

    // 创建会话
    public static void main(String[] args) {


        //不使用fluent编程风格

        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.101.3:2181", exponentialBackoffRetry);
        curatorFramework.start();
        System.out.println( "会话被建立了");

        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.101.3:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(exponentialBackoffRetry)
                .namespace("base")  // 独立的命名空间 /base
                .build();

        client.start();

        System.out.println("会话2创建了");
    }

}

【注】
session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对 /base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。

(2)创建节点

curator提供了一系列Fluent风格的接口,通过使用Fluent编程风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。
下面简单介绍一下常用的几个节点创建场景

1.创建一个初始内容为空的节点

client.create().forPath(path);

Curator默认创建的时持久节点,内容为空

2.创建一个包含内容的节点

client.create().forPath(path,"我是内容".getBytes());

Curator和ZkClient不同的是依旧采用Zookeeper原生API的风格,内容使用byte[]作为方法参数

3.递归创建父节点,并选择节点类型

client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path);

creatingParentsIfNeeded这个接口非常有用,在使用Zookeeper的过程中,开发人员经常会碰到NoNodeException异常,其中一个可能的远呀就是试图对一个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断一下该父节点是否存在。在使用Curator之后,通过调用creatingParentsIfNeeded接口,Curator就能够自动地递归创建所有需要的父节点。

public class CreateNode {

    // 创建会话
    public static void main(String[] args) throws Exception {
        //不使用fluent编程风格

        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);

        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.101.3:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(exponentialBackoffRetry)
                .namespace("base")  // 独立的命名空间 /base
                .build();

        client.start();

        System.out.println("会话2创建了");

        // 创建节点
        String path = "/xg_curator/c1";
        String s = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());

        System.out.println("节点递归创建成功,该节点路径" + s);
    }
}
(3)删除节点

删除节点的方法也是基于Fluent方式来进行操作,不同类型的操作调用 新增不同的方法调用即可。

1.删除一个子节点

client.delete().forPath(path);

2.删除节点并递归删除其子节点

client.delete().deletingChildrenIfNeeded().forPath(path);

3.指定版本进行删除

client.delete().withVersion(1).forPath(path);

如果版本已经不存在,则删除异常,异常信息如下

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for

4.强制保证删除一个节点

client.delete().guaranteed().forPath(path);

只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。

public class DeleteNode {

    // 创建会话
    public static void main(String[] args) throws Exception {
        //不使用fluent编程风格

        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.101.3:2181", exponentialBackoffRetry);
        curatorFramework.start();
        System.out.println( "会话被建立了");

        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.101.3:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(exponentialBackoffRetry)
                .namespace("base")  // 独立的命名空间 /base
                .build();

        client.start();

        System.out.println("会话2创建了");

        // 删除节点
        String path = "/xg_curator";
        client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);

        System.out.println("删除成功,删除的节点" + path);
    }
}
(4)获取数据

获取节点数据内容API相当简单,同时Curator提供了传入一个Stat变量的方式来存储服务器返回的最新的节点状态信息

//普通查询
client.getData().forPath(path);
//包含状态查询
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
public class GetNode {

    // 创建会话
    public static void main(String[] args) throws Exception {
        //不使用fluent编程风格
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);

        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.101.3:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(exponentialBackoffRetry)
                .namespace("base")  // 独立的命名空间 /base
                .build();

        client.start();

        System.out.println("会话2创建了");

        // 创建节点
        String path = "/xg_curator/c1";
        String s = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());

        System.out.println("节点递归创建成功,该节点路径" + s);

        // 获取节点的数据内容及状态信息

        // 数据内容
        byte[] bytes = client.getData().forPath(path);
        System.out.println("获取到的节点数据内容:" + new String(bytes));

        // 状态信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);

        System.out.println("获取到的节点状态信息:" + stat );
    }
}
(5)更新数据

更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常。

//普遍更新
client.setData().forPath(path,"新内容".getBytes());
//制定版本更新
client.setData().withVersion(1).forPath(path);

版本不一致异常信息:

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for
public class UpdateNode {

    public static void main(String[] args) throws Exception {
        
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);

        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.101.3:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(exponentialBackoffRetry)
                .namespace("base")  // 独立的命名空间 /base
                .build();

        client.start();

        System.out.println("会话2创建了");

        // 创建节点
        String path = "/xg_curator/c1";

        // 获取节点的数据内容及状态信息

        // 数据内容
        byte[] bytes = client.getData().forPath(path);
        System.out.println("获取到的节点数据内容:" + new String(bytes));

        // 状态信息
        Stat stat = new Stat(); //0
        client.getData().storingStatIn(stat).forPath(path);

        System.out.println("获取到的节点状态信息:" + stat );

        // 更新节点内容 //1
        int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改内容1".getBytes()).getVersion();
        System.out.println("当前的最新版本是" + version);
        byte[] bytes2 = client.getData().forPath(path);
        System.out.println("修改后的节点数据内容:" + new String(bytes2));

        // BadVersionException
        client.setData().withVersion(stat.getVersion()).forPath(path,"修改内容2".getBytes());
    }
    
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/743402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JVM专题十:JVM中的垃圾回收机制

在JVM专题九&#xff1a;JVM分代知识点梳理中&#xff0c;我们主要介绍了JVM为什么采用分代算法&#xff0c;以及相关的概念&#xff0c;本篇我们将详细拆分各个算法。 垃圾回收的概念 垃圾回收&#xff08;Garbage Collection&#xff0c;GC&#xff09;确实是计算机编程中的…

详解如何在分数限制下,选好专业还是选好学校?

系列文章目录 1、选好专业还是选好学校&#xff1f; 2、具体分段考虑 3、博主给学子的建议 文章目录 系列文章目录前言一、稳定军心二、鱼与熊掌真不可兼得吗&#xff1f;1.兴趣和职业规划2.专业实力3.就业前景4.个人发展 三、具体分段考虑1、高分段考生2、次高分段考生3、中…

住宅IP代理服务终极指南:增强安全性和可访问性

在当今的数字安装程序中&#xff0c;隐私和可访问性对于企业和个人都至关重要。满足这些需求的一个强大工具是住宅 IP 代理服务。这些服务为用户提供住宅 IP 地址&#xff0c;这些地址是互联网产品 (ISP) 计算房主的真实 IP。在本综合指南中&#xff0c;我们将探讨住宅 IP 代理…

手机照片压缩到20k以内免费,这几款心动软件快收好!

在数字化时代&#xff0c;手机拍照已成为我们记录生活的重要方式之一。然而&#xff0c;高清的照片也意味着占用着越来越多的手机存储空间。如果你正在为手机内存告急而烦恼&#xff0c;那么这几款手机照片压缩神器或许能成为你的救星&#xff01;它们不仅可以将照片轻松压缩至…

docker-compose部署Flink及Dinky

docker-compose部署Flink及Dinky 服务器环境&#xff1a;centos7 1. 配置hosts vim /etc/hostsx.x.x.x jobmanager x.x.x.x taskmanager x.x.x.x dinky-mysql2. 文件目录结构 . ├── conf │ ├── JobManager │ │ ├── flink-conf.yaml │ │ ├── log…

高考英语3500词

DAY1 DAY2 DAY3 DAY4 DAY5 DAY6 DAY7 DAY8 DAY9 DAY10 DAY11 DAY12 DAY13 DAY14 DAY15 DAY16 DAY17 DAY18 DAY19 DAY20 DAY21 DAY22 DAY23 DAY24 DAY25 DAY26 DAY27 DAY28 DAY29 DAY30 DAY31 DAY32 DAY33 DAY34 DAY35 DAY36 DAY37 DAY38 DAY39 DAY40

这份AI绘画攻略赶紧码住!超适合小白入门的PS AI插件来啦!

有没有小伙伴对AI绘画很感兴趣&#xff0c;但是看到国外的mj和sd总觉得入门困难&#xff01;别担心&#xff0c;米兔挖到一款超级绝的国产PS AI插件&#xff01;适合新手学习&#xff0c;米兔这里还有一份专为小白准备的AI绘画攻略&#xff0c;让你的创意不再受限&#xff01; …

解决vs2022scanf报错问题

vs2022scanf报错问题 大家下完vs2022之后,开心的写下一段简单的代码: #include <stdio.h> #include <stdlib.h>int main() {int a;scanf("%d", &a);printf("%d", a);return 0; } vs2022会毫不犹豫的报错,下面是报错信息: 翻译过来就是v…

打造智慧矿山:整体架构设计与实践探索

随着信息技术的不断发展&#xff0c;智慧矿山作为矿业领域的创新模式&#xff0c;正日益受到关注。在智慧矿山中&#xff0c;先进的传感器、大数据分析、人工智能等技术被广泛应用&#xff0c;以提高矿山生产效率、降低成本&#xff0c;并确保安全环保。本文将深入探讨智慧矿山…

云计算【第一阶段(20)】磁盘管理与文件系统 服务器硬件及RAID配置实战(三)

一、服务器硬件详解 cpu 主板 内存 硬盘 网卡 电源 raid卡 风扇 远程管理卡 1.1、硬盘尺寸 目前生产环境中主流的两种类型硬盘 3.5寸 和2.5寸硬盘 2.5寸硬盘可以通过使用硬盘托架后适用于3.5寸硬盘的服务器 但是3.5寸没法转换成2.5寸 二、RAID阵列详解 独立硬盘冗余阵…

七天速通javaSE:第三天 程序控制结构:顺序、选择、循环

文章目录 前言一、Scanner类1. hasNext()和hasNextLine()2.next()和nextLine()3. Scanner的其他用法 二、顺序结构三、选择结构1. if单选择结构2. if-else双选择结构3. if-else if多选择结构4. switch选择结构 四、循环结构1. while循环2.do while循环3. for循环&#xff08;常…

Linux系统学习——指令四

Linux系统学习——指令四 Linux 系统学习——指令四查看文件MD5校验和fuser 指令基本语法常用选项访问类型使用示例 系统信息 Linux 系统学习——指令四 查看文件MD5校验和 在Linux中&#xff0c;你可以使用 md5sum 命令来查看一个文件的MD5校验和。以下是具体的操作方法&…

超细毛搭配超宽设计,一款更呵护牙龈的牙刷

牙龈敏感的时候&#xff0c;刷牙特别难受&#xff0c;最近试了试惠百施&#xff08;EBISU&#xff09;65孔宽头软毛牙刷&#xff0c;感觉它的口腔护理体验很不错。这款牙刷的设计独特&#xff0c;采用宽头设计&#xff0c;一次就能刷两排牙齿&#xff0c;极大地提高了清洁效率。…

ServBay[中文] 下一代Web开发环境

ServBay是一个集成式、图形化的本地化Web开发环境。开发者通过ServBay几分钟就能部署一个本地化的开发环境。解决了Web开发者&#xff08;比如PHP、Nodejs&#xff09;、测试工程师、小型团队安装和维护开发测试环境的问题&#xff0c;同时可以快速的进行环境的升级以及维护。S…

解决msvcp120.dll问题的详细步骤,分析msvcp120.dll文件

msvcp120.dll文件是Microsoft Visual C Redistributable Package for Visual Studio 2013中的一个组件。如果提示你丢失该文件&#xff0c;通常意味着程序试图调用一个未在你电脑上安装的Visual C版本。下面是解决此问题的详细步骤。 msvcp120.dll丢失的解决方法 方法 1&#…

报餐小程序可以运用在饭堂的哪方面

随着科技的快速发展&#xff0c;智能化、信息化的管理方式逐渐渗透到我们日常生活的方方面面。在饭堂管理中&#xff0c;报餐小程序的应用为传统的餐饮管理方式带来了革命性的变革。本文将探讨报餐小程序在饭堂管理中的应用及其带来的优势。 一、报餐小程序的基本功能 报餐小程…

轮播图的制作大全

例如该样式: 1.Vue的方法(可实现自动轮播和左右按钮和下方原点按钮轮播) <div id="app"><div class="a" ref="b" @mouseenter="MouseFun(c)" @mouseleave="MouseFun(d)">//1.图片显示盒子<div class=&qu…

python-17-零基础自学python-

学习内容&#xff1a;《python编程&#xff1a;从入门到实践》第二版 知识点&#xff1a; 类、子类、继承、调用函数 练习内容&#xff1a; 练习9-6&#xff1a;冰激凌小店 冰激凌小店是一种特殊的餐馆。编写一个名为IceCreamStand的类&#xff0c;让它继承为完成练习9-1或…