Zookeeper Java 开发,自定义分布式锁示例

文章目录

    • 一、概述
    • 二、导入依赖包
    • 三、创建锁的过程
      • 3.1 通过 create 创建节点信息
      • 3.2 AsyncCallback.StringCallback 回调函数
      • 3.3 AsyncCallback.Children2Callback 的回调函数
      • 3.4 Watcher 的回调函数
    • 四、完整示例
      • 4.1 完整分布式锁代码
      • 4.2 测试类

如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明,Zookeeper Java 开发入门。

一、概述

  • 情景:假设有10个客户端(分散的10台主机)要执行一个任务,这个任务某些过程需要保持原子性。那么我们就需要一个分布式锁。

  • 原理:通过在Zookeeper中创建序列节点来实现获得锁,删除节点来释放锁。其实质是一个按先来后到的排序过程,实现过程如下:

    • 客户端发起请求,创建锁序列节点(/lock/xxxxxxxx)

    • 获取所有锁节点,判断自己是否为最小节点

      • 如果自己是最小序列节点,则立即获得锁
      • 否则不能获得锁,但要监控前一个序列节点的状态
    • 获得锁的客户端开始执行任务。

    • 执行完任务后释放锁。

      • 由于后一个节点监控了前一个节点,当前一个节点删除时,后一个客户端会收到回调。

      • 在这个回调中再获取所有锁节点,判断自己是否为最小节点。

      • 以此类推,直到全部结束。

  • 流程如下

在这里插入图片描述

  • 如果您对没有做过 Zookeeper 开发,强烈建立先看 Zookeeper Java 开发入门。

二、导入依赖包

  • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

    <dependency>
       <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.2</version>
    </dependency>
    

三、创建锁的过程

3.1 通过 create 创建节点信息

  • 通过 create 创建序列节点信息。他是异步方式,创建成功后会调用 AsyncCallback.StringCallback.processResult 回调函数。
    public void lock() throws InterruptedException, LockException {
        zooKeeper.create("/lock", "xxx".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);
        countDownLatch.await();

        if(StringUtils.isEmpty(this.lockNodePath)){
            throw new LockException("创建锁失败");
        }

        System.out.println(this.appName + " 获得锁");
    }

3.2 AsyncCallback.StringCallback 回调函数

  • 在 AsyncCallback.StringCallback 的回调函数中通过 getChildren 方法获取 ZooKeeper 锁节点下的所有节点信息。这个方法是异步的,调用成功后会调用 AsyncCallback.Children2Callback.processResult 回调函数。
    // AsyncCallback.StringCallback
    @Override
    public void processResult(int i, String s, Object o, String s1) {
        if(StringUtils.isEmpty(s1)){
            // 这里是创建锁失败的情况。
            this.countDownLatch.countDown();
            return;
        }
        System.out.println(this.appName + " create lock node="+s1);

        this.lockNodePath = s1;
        // 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。
        zooKeeper.getChildren("/", false, this, context);
    }

3.3 AsyncCallback.Children2Callback 的回调函数

  • 在 AsyncCallback.Children2Callback 的回调函数判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。监控前一个节点信息使用 exists 方法,这个方法设置了 Watcher 的 processResult 回调函数
    // AsyncCallback.Children2Callback
    @Override
    public void processResult(int i, String s, Object o, List<String> list, Stat stat) {

        Collections.sort(list);

//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }

        int index = list.indexOf(lockNodePath.substring(1));
        if(0 == index){
            // 如果我现在是第一个节点,则获得锁
            try {
                zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.countDownLatch.countDown();
        }
        else {
            // 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)
            String watchNodePath = "/" + list.get(index - 1);
            System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);
            zooKeeper.exists(watchNodePath, this, new StatCallback() {
                @Override
                public void processResult(int i, String s, Object o, Stat stat) {

                }
            }, context);
        }
    }

3.4 Watcher 的回调函数

  • 在 Watcher 的回调函数,我们通过判断 watchedEvent.getType() 为 NodeDeleted 类型时,重新获取 ZooKeeper 锁节点下的所有节点信息,这使得消息回到了 “3.3”步,判断谁是第一个节点,然后获得得,完成整个流程。
    // Watcher
    @Override
    public void process(WatchedEvent watchedEvent) {
        switch (watchedEvent.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zooKeeper.getChildren("/", false, this, context);
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

四、完整示例

4.1 完整分布式锁代码

package top.yiqifu.study.p131;


import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZookeeperLock implements Watcher, AsyncCallback.StringCallback,
        AsyncCallback.Children2Callback {

    private String appName;
    private ZooKeeper zooKeeper;

    private Object context;
    private String lockNodePath;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public ZookeeperLock(String name, ZooKeeper zk){
        this.appName = name;
        this.zooKeeper = zk;
        this.context = this;
    }


    public void lock() throws InterruptedException, LockException {
        zooKeeper.create("/lock", "xxx".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);
        countDownLatch.await();

        if(StringUtils.isEmpty(this.lockNodePath)){
            throw new LockException("创建锁失败");
        }

        System.out.println(this.appName + " 获得锁");
    }

    public void unlock() throws KeeperException, InterruptedException, LockException {
        if(StringUtils.isEmpty(this.lockNodePath)){
            throw new LockException("没有获得锁,无法释放锁");
        }
        zooKeeper.delete(lockNodePath, -1);

        System.out.println(this.appName + " 释放锁");
    }


    // AsyncCallback.StringCallback
    @Override
    public void processResult(int i, String s, Object o, String s1) {
        if(StringUtils.isEmpty(s1)){
            // 这里是创建锁失败的情况。
            this.countDownLatch.countDown();
            return;
        }
        System.out.println(this.appName + " create lock node="+s1);

        this.lockNodePath = s1;
        // 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。
        zooKeeper.getChildren("/", false, this, context);
    }

    // AsyncCallback.Children2Callback
    @Override
    public void processResult(int i, String s, Object o, List<String> list, Stat stat) {

        Collections.sort(list);

//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }

        int index = list.indexOf(lockNodePath.substring(1));
        if(0 == index){
            // 如果我现在是第一个节点,则获得锁
            try {
                zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.countDownLatch.countDown();
        }
        else {
            // 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)
            String watchNodePath = "/" + list.get(index - 1);
            System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);
            zooKeeper.exists(watchNodePath, this, new StatCallback() {
                @Override
                public void processResult(int i, String s, Object o, Stat stat) {

                }
            }, context);
        }
    }

    // Watcher
    @Override
    public void process(WatchedEvent watchedEvent) {
        switch (watchedEvent.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zooKeeper.getChildren("/", false, this, context);
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }



    public class LockException extends  Exception
    {
        public LockException(String message){
            super(message);
        }
    }
}

4.2 测试类

package top.yiqifu.study.p131;


import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Test06_ZookeeperLock {

    public static void main(String[] args) {
        try {
            // 创建 ZooKeeper 对象
            final ZooKeeper zooKeeper = testCreateZookeeper();

            int clientCount = 10;
            final CountDownLatch countDownLatch = new CountDownLatch(clientCount);
            for (int i = 0; i < clientCount; i++) {
                new Thread(new Runnable(){
                    @Override
                    public void run() {
                        TestLock(zooKeeper);
                        countDownLatch.countDown();
                    }
                }).start();
            }

            countDownLatch.await();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    private static void  TestLock(ZooKeeper zooKeeper){
        try {
            String appName = Thread.currentThread().getName();
            ZookeeperLock zookeeperLock = new ZookeeperLock(appName, zooKeeper);
            
            // 加锁(获得分布式锁)
            zookeeperLock.lock();
            System.out.println(appName + " 执行任务");
            Thread.sleep(1000);
		   // 释放锁
            zookeeperLock.unlock();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (ZookeeperLock.LockException e) {
            e.printStackTrace();
        }
    }

    private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        // ZooKeeper 集群地址(没连接池的概念,是Session的概念)
        //String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
        String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181/aaa";
        // ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
        Integer sessionTimeout = 3000;
        // ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
        final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    switch (state) {
                        case Unknown:
                            break;
                        case Disconnected:
                            break;
                        case NoSyncConnected:
                            break;
                        case SyncConnected:
                            countDownLatch.countDown();
                            break;
                        case AuthFailed:
                            break;
                        case ConnectedReadOnly:
                            break;
                        case SaslAuthenticated:
                            break;
                        case Expired:
                            break;
                        case Closed:
                            break;
                    }
                    switch (type) {
                        case None:
                            break;
                        case NodeCreated:
                            break;
                        case NodeDeleted:
                            break;
                        case NodeDataChanged:
                            break;
                        case NodeChildrenChanged:
                            break;
                        case DataWatchRemoved:
                            break;
                        case ChildWatchRemoved:
                            break;
                        case PersistentWatchRemoved:
                            break;
                    }

                    System.out.println("Session watch state=" + state);
                    System.out.println("Session watch type=" + type);
                    System.out.println("Session watch path=" + path);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        countDownLatch.await();
        ZooKeeper.States state = zooKeeper.getState();
        switch (state) {
            case CONNECTING:
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }
        System.out.println("ZooKeeper state=" + state);

        return zooKeeper;
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/155640.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

第四章 串【24王道数据结构笔记】

1.串的基本概念 串&#xff0c;即字符串 (String) 是由零个或多个字符组成的有限序列。一般记为Sa1a2.....an(n>0) S"HelloWorld!" TiPhone 11 Pro Max? 其中&#xff0c;S是串名&#xff0c;单引号括起来的字符序列是串的值;a;可以是字母、数字或其他字符;串中…

智能售货柜:小本投资的不二之选

智能售货柜&#xff1a;小本投资的不二之选 智能售货柜的运营优势在于&#xff1a;一是降低运营成本&#xff0c;不需要大量员工&#xff1b;二是具备自动识别和智能结算功能&#xff0c;提高运营效率&#xff1b;三是提供数据分析&#xff0c;优化产品和服务。相比传统零售店&…

初学UE5 C++②

目录 导入csv表格数据 创建、实例化、结构体 GameInstance Actor camera 绑定滚轮控制摇臂移动 碰撞绑定 角色碰撞设定 按钮 UI显示 单播代理 多播和动态多播 写一个接口 其他 NewObject 和 CreateDefaultSubobject区别 导入csv表格数据 创建一个object的C类 …

怎样备份电脑文件比较安全

域智盾软件是一款功能强大的电脑监控软件&#xff0c;它不仅具备实时屏幕监控、行为审计等功能&#xff0c;还能够对电脑文件进行备份和管理。下面将介绍域智盾软件如何备份电脑文件&#xff0c;以确保数据安全。 1、开启文档备份功能 部署后台&#xff0c;然后点击文档安全&a…

30天黑客(网络安全)自学

前言 前几天发布了一篇 网络安全&#xff08;黑客&#xff09;自学 没想到收到了许多人的私信想要学习网安黑客技术&#xff01;却不知道从哪里开始学起&#xff01;怎么学 今天给大家分享一下&#xff0c;很多人上来就说想学习黑客&#xff0c;但是连方向都没搞清楚就开始学习…

科技创新 共铸典范 | 江西卫健办邓敏、飞图影像董事长洪诗诗一行到访拓世科技集团,提振公共卫生事业发展

2023年11月15日&#xff0c;拓世科技集团总部迎来了江西省卫健项目办项目负责人邓敏、江西飞图影像科技有限公司董事长洪诗诗一行的考察参观&#xff0c;集团董事长李火亮、集团高级副总裁方高强进行热情接待。此次多方交流&#xff0c;旨在共同探讨携手合作&#xff0c;激发科…

Win7安装nvme协议的SSD硬盘方法

自家用的电脑硬盘不够用&#xff0c;于是想买块硬盘扩展下存储。市面上&#xff0c;我比较了下SSD&#xff0c;一类是原来的SATA协议的固态硬盘&#xff0c;一类是M2的固态硬盘&#xff0c;我发现SATA的硬盘比M2的贵&#xff0c;我的主板较老&#xff0c;又不没有原生支持M2的接…

Python---列表 集合 字典 推导式(本文以 列表 为主)

推导式&#xff1a; 推导式comprehensions&#xff08;又称解析式&#xff09;&#xff0c;是Python的一种独有特性。推导式是可以从一个数据序列构建另一个新的数据序列&#xff08;一个有规律的列表或控制一个有规律列表&#xff09;的结构体。 共有三种推导&#xff1a;列表…

windows监控打印机状态工具

windows监控打印机状态工具 实时监控打印机状态&#xff0c;打印总页数&#xff0c;以及打印故障提醒。 工具下载地址

《硅基物语.AI写作高手:从零开始用ChatGPT学会写作》《从零开始读懂相对论》

文章目录 《硅基物语.AI写作高手&#xff1a;从零开始用ChatGPT学会写作》内容简介核心精华使用ChatGPT可以高效搞定写作的好处如下 《从零开始读懂相对论》内容简介关键点书摘最后 《硅基物语.AI写作高手&#xff1a;从零开始用ChatGPT学会写作》 内容简介 本书从写作与ChatG…

ORB SLAM3 使用二进制文件 ORBvoc.bin 加载Vocabulary

使用 二进制文件 ORBvoc.bin 加载Vocabulary&#xff0c;将比ORBvoc.txt 速度快很多倍&#xff01; 实测1秒内完成加载&#xff1a; 一、下载ORBvoc.bin 百度网盘&#xff1a; ORBvoc.bin下载链接 提取码&#xff1a;dyyk 解压后&#xff0c;将ORBvoc.bin拷贝到Vocabulary文…

5G与中国的海

今年国庆假期&#xff0c;香港迎来了阔别5年的国庆维港烟花汇演 10月1日晚上9点&#xff0c;“HKT x FWD 2023 年国庆烟花汇演”在维多利亚港上空上演。在23分钟时间里&#xff0c;燃放了超过3万枚烟花。而与以往维港烟花秀不同的是&#xff0c;为了让更多民众欣赏这次表演&…

【C++面向对象】15. 模板

文章目录 【 1. 函数模板 】【 2. 类模板 】 模板是泛型编程的基础&#xff0c;泛型编程即以一种独立于任何特定类型的方式编写代码。模板是指创建泛型类或函数的蓝图或公式。库容器&#xff0c;比如迭代器和算法&#xff0c;都是泛型编程的例子&#xff0c;它们都使用了模板的…

Milvus Standalone安装

使用Docker Compose安装 Milvus standalone&#xff08;即单机版&#xff09;&#xff0c;进行一个快速milvus的体验。 前提条件&#xff1a; 1.系统可以使用centos 2.系统已经安装docker和docker-compose 3.milvus版本这里选择2.3.1 由于milvus依赖etcd和minio&#xff0c…

翻译: 人工智能代理 Agents in Artificial Intelligence

在人工智能中&#xff0c;代理是一种计算机程序或系统&#xff0c;旨在感知其环境、做出决策并采取行动以实现特定目标或一组目标。该代理自主运行&#xff0c;这意味着它不受人类操作员的直接控制。 智能体可以根据其特征分为不同类型&#xff0c;例如它们是被动的还是主动的…

CUDA学习笔记8——GPU硬件资源

简单来说就是为了充分利用GPU&#xff0c;不要让分出去的CUDA核心摸鱼闲置&#xff1b;GPU每次干活&#xff0c;都是以最小的组分配的&#xff0c;因此分派任务的时候就尽量充分发挥每个小组里CUDA核心的作用。这里的每个小组就是一个SM&#xff08;stream multi-processor&…

Python基础:正则表达式(regular expression)详解

在Python中&#xff0c;正则表达式是一种强大的工具&#xff0c;可用于匹配和操作字符串。什么是正则表达式&#xff1f; 正则表达式是一种模式匹配语言&#xff0c;用于匹配字符串中的特定模式。这些模式可以是字母、数字、字符组合或其他符号。正则表达式通常用于文本处理、网…

短视频账号矩阵系统源码/技术源码分享/技术搭建架构

短视频账号矩阵系统----技术源码分享/技术搭建架构 抖音seo又叫抖音搜索引擎&#xff0c;只要能做到布词&#xff0c;和过去的百度seo优化一样&#xff0c;布词&#xff0c;布关键词&#xff0c;当搜索栏搜索时可以搜索到该短视频。优化视频关键词&#xff0c;做好关键词的优化…

Python实现视频字幕时间轴格式转换

自己喜欢收藏电影&#xff0c;有时网上能找到的中文字幕文件都不满足自己电影版本。在自己下载的压制版电影中已内封非中文srt字幕时&#xff0c;可以选择自己将srt的时间轴转为ass并替换ass中的时间轴。自己在频繁 复制粘贴改格式的时候想起可以用Python代码完成转换这一操作&…

基于操作系统讨论Java线程与进程、浅谈Go的线程与管程

文章目录 操作系统中的进程进程概念进程的状态 Java中的进程Java进程的概念Java进程的特性Java进程的状态Java进程与操作系统进程的通信 操作系统的进程和Java进程的区别联系操作系统进程Java 进程区别和联系 操作系统中的线程动机优点多核编程 Java中的线程定义&#xff1a;特…