ZooKeeper 客户端API操作

文章目录

  • 一、节点信息
    • 1、创建节点
    • 2、获取子节点并监听节点变化
    • 3、判断节点是否存在
    • 4、客户端向服务端写入数据
      • 写入请求直接发给 Leader 节点
      • 写入请求直接发给 follow 节点
  • 二、服务器动态上下线监听
    • 1、监听过程
    • 2、代码
  • 三、分布式锁
    • 1、什么是分布式锁?
    • 2、Curator 框架实现分布式锁

一、节点信息

前提:centos102、centos103、centos104 服务器都已经开启

pom.xml 依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.17.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.7</version>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>RELEASE</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

log4j.properties 配置

# 设置全局的日志记录级别为 INFO
log4j.rootLogger=INFO, stdout

# 控制台输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

# 文件输出
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

1、创建节点

zkClient.java 代码

// 注意:逗号后面不能有空格
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;

// 创建客户端
@Before
public void init() throws IOException {

    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
        @Override
        public void process(WatchedEvent event) {

        }
    });
}

// 创建子节点
@Test
public void create() throws InterruptedException, KeeperException {
    String nodeCreated = zkClient.create("/frost", "cat".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
}

运行创建子节点,看看是否创建了该节点

在这里插入图片描述

2、获取子节点并监听节点变化

@Test
public void getChildren() throws InterruptedException, KeeperException {
    List<String> children = zkClient.getChildren("/", true);

    for (String child : children) {
        System.out.println(child);
    }
}

那如果此时我再创建一个节点,此时控制台没有任何变化,我想要创建一个节点控制台能够看到相关变化怎么办?此时只需要将程序保持不结束,然后将客户端查看子节点函数放入监听器中。

3、判断节点是否存在

@Test
public void exit() throws InterruptedException, KeeperException {
    Stat stat = zkClient.exists("/frost", false);
    System.out.println(stat == null ? "not exits" : "exits");
}

4、客户端向服务端写入数据

写入请求直接发给 Leader 节点

  1. 客户端发送写入请求,leader节点执行写入操作
  2. leader通知follow1执行写入操作
  3. folllow1写入完毕给leader返回确认ack
  4. 现在半数以上服务器完成写入,leader给客户端发送确认ack
  5. leader通知follow2写入
  6. follow2写入完毕给leader发送确认ack
    在这里插入图片描述

写入请求直接发给 follow 节点

  1. 客户端发送写入请求,
  2. follow1 将写入请求发送给leader
  3. leader节点执行写入操作,然后leader通知follow1执行写入操作
  4. folllow1写入完毕给leader返回确认ack
  5. 现在半数以上服务器完成写入,leader给follow1发送确认ack
  6. follow1给客户端发送确认ack
  7. leader通知follow2写入
  8. follow2写入完毕给leader发送确认ack

在这里插入图片描述

二、服务器动态上下线监听

1、监听过程

在这里插入图片描述

以下红色字体写错,应该是下线则通知注册监听器的客户端
在这里插入图片描述

对于ZooKeeper集群来说,客户端和服务器都相当于客户端,区别在于:服务器在ZooKeeper集群中是创建节点,客户端在ZooKeeper是监听信息。

在这里插入图片描述

2、代码

服务器注册到zk集群

import org.apache.zookeeper.*;
import java.io.IOException;

public class DistributeServer {

    private String connectString = "centos102:2181,centos103:2181,centos104:2181";
    private int sessionTimeout = 2000;
    ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeServer server = new DistributeServer();

        // 1. 获取zk连接
        server.getConnect();

        // 2. 注册服务器到 zk 集群
        server.regist(args[0]);

        // 3. 启动业务逻辑(睡觉)
        server.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void regist(String hostname) throws InterruptedException, KeeperException {
        String create = zk.create("/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 临时带序号的节点
        System.out.println(hostname + "is online");

    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
    }
}

客户端进行监听

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

    private String connectString = "centos102:2181,centos103:2181,centos104:2181";
    private int sessionTimeout = 2000;
    ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeClient client = new DistributeClient();

        // 1. 获取zk连接
        client.getConnect();

        // 2. 监听/servers下子节点的增加和删除
        client.getServerList();

        // 3. 启动业务逻辑(睡觉)
        client.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/servers", true);
        ArrayList<String> servers = new ArrayList<>();
        for (String child : children) {
            byte[] data = zk.getData("/servers/" + child, false, null);
            servers.add(new String(data));
        }
        System.out.println(servers);
    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    getServerList();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

启动客户端,然后在服务器上进行增加节点监听
在这里插入图片描述

删除节点监听
在这里插入图片描述

因为我们服务端的代码传参了,所以我们需要设置一下这个参数:
在这里插入图片描述

下图代表服务端启动的是hadoop102节点
在这里插入图片描述

先把客户端启动起来,发现有一个节点hadoop101:
在这里插入图片描述

在启动服务端,hadoop102上线:
在这里插入图片描述

然后返回看客户端的监听,发现节点有变化,打印出所有节点 [hadoop102, hadoop101]
在这里插入图片描述

此时我们修改一下再此启动服务端让 hadoop103 上线:
在这里插入图片描述

返回客户端查看发现 hadoop102 下线,hadoop103 上线
在这里插入图片描述

三、分布式锁

1、什么是分布式锁?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

在这里插入图片描述

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock {

    private final String connectString = "centos102:2181,centos103:2181,centos104:2181";
    private final int sessionTimeout = 2000;
    ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    // 前一个节点
    private String waitPath;
    // 当前节点
    String currentMode;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        // 1. 获取连接
         zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // connectLatch,如果连接上zk,可以释放
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // waitLatch,需要释放
                if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

         // 等待zk正常连接后往下走
         connectLatch.await();

        // 2. 判断根节点/lock是否存在
        Stat stat = zk.exists("/locks", false);

        if (stat == null) {
            // 创建根节点(永久节点)
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

    }

    // 对 zk 加锁
    public void zkLock() {
        // 创建对应的临时带序号的节点
        try {
            currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 判断创建的节点是否是最小的序号节点,如果是,获取到锁;如果不是,监听前一个节点
            List<String> children = zk.getChildren("/locks", false);

            // 如果 children 只有一个节点,直接获取锁;如果有多个节点,需要判断,谁最小
            if (children.size() == 1) {
                return;
            }else {
                // 排序
                Collections.sort(children);

                // 获取节点名称seq00000001
                String thisNode = currentMode.substring("/locks/".length());
                // 通过seq00000001获取该节点在children当中的位置
                int index = children.indexOf(thisNode);

                if (index == -1) {
                    System.out.println("数据异常");
                }else if (index == 0) {
                    // 该节点为第一个,获取锁直接返回
                    return;
                }else {
                    // 不是第一个,监听前一个节点
                    waitPath = "/locks/" + children.get(index - 1);
                    zk.getData(waitPath, true, null);

                    // 等待监听结束
                    waitLatch.await();
                    return;
                }
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 解锁
    public void unZkLock() throws InterruptedException, KeeperException {
        // 删除节点
        zk.delete(currentMode, -1);
    }
}

测试

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributedLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        final DistributedLock lock1 = new DistributedLock();

        final DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zkLock();
                    System.out.println("线程1启动,获取到锁");

                    Thread.sleep(5000);

                    lock1.unZkLock();
                    System.out.println("线程1释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                    System.out.println("线程2启动,获取到锁");

                    Thread.sleep(5000);

                    lock2.unZkLock();
                    System.out.println("线程2释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

}

2、Curator 框架实现分布式锁

原生JAVA API出现的问题:
(1)会话是异步的,需要自己去连接
(2)Watch需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高
(4)不支持多节点的删除和创建,需要自己去递归

Curator 是一个专门解决分布式锁的框架,解决了原生JAVA API开发分布式遇到的的问题
curator 官方文档:https://curator.apache.org/index.html

pom.xml 文件添加依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {
    public static void main(String[] args) {
        // 创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1获取到锁");

                    lock1.acquire();
                    System.out.println("线程1获取到锁");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("线程1释放锁");

                    lock1.release();
                    System.out.println("线程1再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2获取到锁");

                    lock2.acquire();
                    System.out.println("线程2获取到锁");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("线程2释放锁");

                    lock2.release();
                    System.out.println("线程2再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }

    private static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("centos102:2181,centos103:2181,centos104:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();
        System.out.println("zookeeper 启动成功");

        return client;
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/906482.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网关如何传递信息给微服务

前情回顾 上篇我们已经完成了网关对所有微服务请求的拦截以及JWT的登录校验。 客户端和微服务之间的桥梁--网关&#xff08;身份校验&#xff09;https://mp.csdn.net/mp_blog/creation/editor/143425484 问题引入 现在的问题是在一些微服务业务中&#xff0c;需要用到用户…

vue3学习记录-nextTick

vue3学习记录-nextTick 1. 案例场景2. 使用方法2.1 回调方式2.2 async&#xff0c;await 3.原理 1. 案例场景 聊天框实现输入内容&#xff0c;滚动条默认滚到最底部。 <template><div class"chat_box"><div class"chat_list" ref"chat…

Nature Medicine病理AI汇总|TORCH:预测未知原发部位癌症的肿瘤起源|顶刊精析·24-11-01

小罗碎碎念 今天分析Nature Medicine病理AI系列的第三篇文章——《Prediction of tumor origin in cancers of unknown primary origin with cytology-based deep learning》 这篇文章报道了一种基于细胞学图像的深度学习方法TORCH&#xff0c;用于预测未知原发部位癌症的肿瘤…

关于SQLServer在局域网内无法连接的问题的解决思路

针对SQL Server 2008在局域网内无法连接的问题&#xff0c;以下是一些详细的解决办法。我们在过程中需要用到Microsoft SQL Server 2008和Microsoft SQL Server tools 2008数据库软件中的配置管理器以及SQL Server Management Studio工具&#xff0c;入下截图所示。 一、检查网…

Ubuntu22.04 安装图形界面以及XRDP教程

一、准备环境 1.一台服务器安装系统ubuntu&#xff08;这里大部分ubuntu系统可以同用&#xff09; 2.安装的ubuntu系统未安装图形界面 二、操作步骤 1.远程ssh或者直接登录服务器命令行界面 ssh -p 远程端口 rootIP 2.更新系统软件包 sudo apt update # 更新本地的软件包…

【运动的&足球】足球运动员球守门员裁判检测系统源码&数据集全套:改进yolo11-DBBNCSPELAN

改进yolo11-FocalModulation等200全套创新点大全&#xff1a;足球运动员球守门员裁判检测系统源码&#xff06;数据集全套 1.图片效果展示 项目来源 人工智能促进会 2024.10.28 注意&#xff1a;由于项目一直在更新迭代&#xff0c;上面“1.图片效果展示”和“2.视频效果展示…

【特征值处理】

【特征值处理】 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 在处理机器学习的相关数据时&#xff0c;需要把特征值与目标组成二组&#xff0c;请您输出处理后的结果。 输入 第一行输入特征值向量&#xff0c;第二行输入目标向量。 输…

异步电机转差率和工作原理,异步电机和同步电机的区别

一、异步电机 异步电机的工作原理基于转差率&#xff08; s s s&#xff09;&#xff0c;而转差率的大小决定了电机是作为电动机还是发电机运行。为了深入理解其中的原理&#xff0c;我们可以从电磁感应和转速关系来分析&#xff1a; 1. 电动机工作原理 异步电机工作时&…

《双指针篇》---移动零

题目传送门 这道题可以归类为 数组划分/数组分块 。 题目制定了一个规则&#xff0c;我们可以在这个规则下&#xff0c;将数组划分为若干个区间。 这道题让我们把所有非零元素移动到左边。所有零元素移动到右边。 将数组划分为&#xff1a; 左区间非0&#xff1b; 右区间&…

龙迅#LT6211适用于HDMI转4PORT LVDS,分辨率高达4K60HZ,可提供技术支持!

1.特点HDMI1.4接收器 符合HDMI 1.4规范&#xff0c;TMDS数据速率每通道高达3.4Gbps 支持HDCP 1.4 自适应接收器均衡的PCB、电缆和连接器损耗 单/双端口/四端口LVDS发射机 兼容VESA和JEIDA标准 1/2/4可配置端口 1时钟通道和每个端口有4个可配置的数据通道 数据通道…

Linux——Ubuntu的基础操作

压缩与解压缩 gzip压缩工具 创建文件 a.c和b.c touch a.c touch b.c 压缩文件a.c和b.c gzip a.c gzip b.c 解压缩a.c.gz和b.c.gz gzip -d a.c.gz 对文件夹进行压缩 gzip -r 对文件夹进行解压缩 gzip -rd 注意&#xff1a;这只是对文件夹里所有文件进行压缩&#xff0c…

HTML静态网页成品作业(HTML+CSS)——自行车介绍网页设计制作(1个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码CSS部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有1个页面。 二、作品…

基于Transformer的路径规划 - 第五篇 GPT生成策略_解码方法优化

上一篇&#xff1a;基于Transformer的路径规划 - 第四篇 GPT模型优化 在上一篇中&#xff0c;我尝试优化GPT路径生成模型&#xff0c;但没有成功。在随机生成的测试集上&#xff0c;路径规划成功率只有99%左右。而使用传统的路径规划算法&#xff0c;例如A*&#xff0c;路径规划…

【HarmonyOS】鸿蒙系统

文章目录 前言一、鸿蒙OS概述1. 定义与特性2. 核心技术理念3. 技术架构设计1. 应用层2. 框架层3. 系统服务层4. 内核层 二、分布式架构分布式架构的核心理念分布式能力的实现关键技术 三、 总结 前言 鸿蒙OS是由华为推出的一款开源操作系统&#xff0c;旨在满足智能终端设备的…

[MySQL]介绍与基础指令

介绍 现在常见的数据库如:Oracle、DB 2、SQL Server、MySQL等都是关系型数据库&#xff0c;使用二维表格来存储数据。 关系结构型数据库系统 管理员 仓库 MySQL的数据存储目录为data&#xff0c;在data下的每个目录都代表一个数据库。 MySQL的安装目录下&#xff1a; bin目录…

智慧农业云平台:大数据赋能现代农业的未来

近年来&#xff0c;随着科技的迅速发展&#xff0c;农业作为传统行业正面临着前所未有的变革。智慧农业&#xff0c;作为现代农业发展的重要方向&#xff0c;借助云计算、大数据、物联网等技术&#xff0c;正在为农业生产、管理和服务提供全新的解决方案。在这个背景下&#xf…

Windows基础(1)

声明&#xff1a;学习视频来自b站up主 泷羽sec&#xff0c;如涉及侵权马上删除文章 声明&#xff1a;本文主要用作技术分享&#xff0c;所有内容仅供参考。任何使用或依赖于本文信息所造成的法律后果均与本人无关。请读者自行判断风险&#xff0c;并遵循相关法律法规。 感谢泷…

Pandas CSV学习

1.CSV文件简介 CSV&#xff08;Comma-Separated Values&#xff0c;逗号分隔值&#xff0c;有时也称为字符分隔值&#xff0c;因为分隔字符也可以不是逗号&#xff09;&#xff0c;其文件以纯文本形式存储表格数据&#xff08;数字和文本&#xff09;。CSV 是一种通用的、相对简…

Visual Studio | 配置管理

文章目录 一、配置管理1、项目属性1.1、常规1.2、VC 目录1.3、C/C -> 常规1.4、C/C -> 预处理器1.5、C/C -> 预编译头1.6、连接器 -> 常规1.7、连接器 -> 输入 2、编辑2.1、显示空格或tab符 一、配置管理 1、项目属性 1.1、常规 字段功能目标平台版本用于生成…

数据采集-Kepware 安装证书异常处理

这里写目录标题 一、 问题描述二、原因分析三、处理方案3.1 1.执行根证书的更新3.2 安装KepServerEx 资源 一、 问题描述 在进行KepServerEx进行安装的情况下&#xff0c;出现了如下的报错&#xff1a; The installer was unable to find required root certificates ,please …