Curator分布式锁

系列文章目录


文章目录

  • 系列文章目录
  • 前言


前言

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。
在这里插入图片描述


分布式锁服务宕机,ZooKeeper一般是以集群部署,如果出现ZooKeeper宕机,那么只要当前正常的服务器超过集群的半数,依然可以正常提供服务持有锁资源服务器宕机,假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成死锁问题, 在Curator中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作。
在这里插入图片描述
zookeeper安装单机模式

http://www.javacui.com/opensource/445.html

SpringBoot集成Curator实现Zookeeper基本操作

http://www.javacui.com/tool/615.html

SpringBoot集成Curator实现Watch事件监听

http://www.javacui.com/tool/616.html

Zookeeper实现分布式锁的机制

使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。

创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。

如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。

如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。

比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。

锁分类

InterProcessSemaphoreMutex:分布式不可重入排它锁

InterProcessMutex:分布式可重入排它锁

InterProcessReadWriteLock:分布式读写锁

InterProcessMultiLock:多重共享锁,将多个锁作为单个实体管理的容器

InterProcessSemaphoreV2:共享信号量

Shared Lock 分布式非可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-lock.html

InterProcessSemaphoreMutex是一种不可重入的互斥锁,也就意味着即使是同一个线程也无法在持有锁的情况下再次获得锁,所以需要注意,不可重入的锁很容易在一些情况导致死锁,比如你写了一个递归。

Shared Reentrant Lockf分布式可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html

此锁可以重入,但是重入几次需要释放几次。

InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid+递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。

Shared Reentrant Read Write Lock可重入读写锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html

读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。

读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。

Multi Shared Lock 多共享锁

官网地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html

多个锁作为一个锁,可以同时在多个资源上加锁。一个维护多个锁对象的容器。当调用acquire()时,获取容器中所有的锁对象,请求失败时,释放所有锁对象。同样调用release()也会释放所有的锁。

Shared Semaphore共享信号量

官网地址:http://curator.apache.org/curator-recipes/shared-semaphore.html

一个计数的信号量类似JDK的Semaphore,所有使用相同锁定路径的jvm中所有进程都将实现进程间有限的租约。此外,这个信号量大多是“公平的” - 每个用户将按照要求的顺序获得租约。

有两种方式决定信号号的最大租约数。一种是由用户指定的路径来决定最大租约数,一种是通过SharedCountReader来决定。

如果未使用SharedCountReader,则不会进行内部检查比如A表现为有10个租约,进程B表现为有20个。因此,请确保所有进程中的所有实例都使用相同的numberOfLeases值。

acuquire()方法返回的是Lease对象,客户端在使用完后必须要关闭该lease对象(一般在finally中进行关闭),否则该对象会丢失。如果进程session丢失(如崩溃),该客户端拥有的所有lease会被自动关闭,此时其他端能够使用这些lease。

编码测试

package com.example.springboot;
 
import com.example.springboot.tool.ZkConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
 
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
 
/**
 * @Auther: Java小强
 * @Date: 2022/2/4 - 19:33
 * @Decsription: com.example.springboot
 * @Version: 1.0
 */
@SpringBootTest(classes = Application.class)
public class CuratorTest {
    @Autowired
    private ZkConfiguration zk;
 
    // 共享信号量,多个信号量
    @Test
    public void testInterProcessSemaphoreV22() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建一个信号量, Curator 以公平锁的方式进行实现
        final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 3);
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取2个许可
                    Collection<Lease> acquire = semaphore.acquire(2);
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnAll(acquire);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
    }
 
    // 共享信号量
    @Test
    public void testInterProcessSemaphoreV2() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建一个信号量, Curator 以公平锁的方式进行实现
        final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 1);
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
    }
 
    // 多重共享锁
    @Test
    public void testInterProcessMultiLock() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 可重入锁
        final InterProcessLock interProcessLock1 = new InterProcessMutex(client, "/lock");
        // 不可重入锁
        final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client, "/lock");
        // 创建多重锁对象
        final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取参数集合中的所有锁
                    lock.acquire();
 
                    // 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入
                    System.out.println(threadName + "----->" + lock.acquire(2, TimeUnit.SECONDS));
                    // interProcessLock1 是可重入锁, 所以可以继续获取锁
                    System.out.println(threadName + "----->" + interProcessLock1.acquire(2, TimeUnit.SECONDS));
                    // interProcessLock2 是不可重入锁, 所以获取锁失败
                    System.out.println(threadName + "----->" + interProcessLock2.acquire(2, TimeUnit.SECONDS));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
    }
 
    // 分布式读写锁
    @Test
    public void testReadWriteLock() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建共享可重入读写锁
        final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, "/lock");
        final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    locl1.writeLock().acquire(); // 获取锁对象
                    System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    locl1.readLock().acquire(); // 获取读锁,锁降级
                    System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    locl1.readLock().release();
                    System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");
                    locl1.writeLock().release();
                    System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock2.writeLock().acquire(); // 获取锁对象
                    System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock2.readLock().acquire(); // 获取读锁,锁降级
                    System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock2.readLock().release();
                    System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");
                    lock2.writeLock().release();
                    System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
    }
 
    // 分布式可重入排它锁
    @Test
    public void testInterProcessMutex() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 分布式可重入排它锁
        final InterProcessLock lock = new InterProcessMutex(client, "/lock");
        final InterProcessLock lock2 = new InterProcessMutex(client, "/lock");
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    lock.acquire(); // 测试锁重入
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    lock.acquire(); // 测试锁重入
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
//        顺序不一定,但是同一个线程可以多次获取,获取几次就必须释放几次,其他线程才能获取到锁
    }
 
 
    // 分布式不可重入排它锁
    @Test
    void testInterProcessSemaphoreMutex() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 分布式不可重入排它锁
        final InterProcessLock lock = new InterProcessSemaphoreMutex(client, "/lock");
        final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, "/lock");
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    // 测试锁重入
                    Thread.sleep(2 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    lock2.acquire();
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    lock2.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
//        顺序不一定,但是必须是获取后再释放其他线程才能获取到锁
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/616318.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

RZ9692实训开发通信系统构建(含配置json配置文件)

实验名称 通信系统的构建 实验目的&#xff1a; 实现一个通信系统的构建&#xff0c;要求传输两路正弦波&#xff0c;和一路视频信号&#xff0c;要求在接受端完整接受正弦信号和视频信号。 一、实验原理&#xff1a; 数字通信系统的一般模型&#xff1a; 数字通信系统的一…

验证搜索二叉树

目录 题目 方法一 思路 优化 方法二 思维误区 递归关系推导 代码实现 题目 98. 验证二叉搜索树 难度&#xff1a;中等 给你一个二叉树的根节点root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含…

Python 开发 框架安全:Django SQL注入漏洞测试.(CVE-2021-35042)

什么是 Django 框架 Django 是一个用 Python 编写的 Web 应用程序框架。它提供了许多工具和库&#xff0c;使得开发 Web 应用程序变得更加容易和高效。Django 遵循了“MTV”&#xff08;模型-模板-视图&#xff09;的设计模式&#xff0c;将应用程序的不同组件分离开来&#x…

QT的C++版本是如何从ui文件编译成C++可以使用的.h文件的

Desktop_Qt_6_7_0_MinGW_64_bit是一个编译器&#xff0c;可以将ui文件编译为.h文件。我们可以在项目文件下看到这一样一个文件&#xff1a; 这里的ui_mainwindow.h文件我们可以打开看一下&#xff1a;你会发现你所有的ui设计都被记录在了这里。 /***************************…

最新网页版USB转串口芯片CH340中文规格书手册(20240511)

前言 南京沁恒的产品已经很成熟了&#xff0c;完全可替代国外USB转串口产品&#xff0c;不必迷信FT232&#xff0c;CP2102之类了。 另外&#xff0c;急着买芯片&#xff0c;直接跑过去的&#xff0c;看过几次妹子了:) CH340手册&#xff0c;基于网页3.3版本&#xff0c;规格书…

作为一名新能源汽车热管理仿真工程师需要具备哪些素养与技能

作为一名新能源汽车热管理仿真工程师&#xff0c;需要具备多方面的素养与技能&#xff0c;才能胜任这一岗位的工作。从工程素养到技术技能&#xff0c;再到沟通能力和团队合作&#xff0c;以下是对这些方面的探讨。 理论知识基础 首先&#xff0c;工程素养是新能源汽车热管理仿…

现代制造之数控机床篇

现代制造 有现代技术支撑的制造业&#xff0c;即无论是制造还是服务行业&#xff0c;添了现代两个字不过是因为有了现代科学技术的支撑&#xff0c;如发达的通信方式&#xff0c;不断发展的互联网&#xff0c;信息化程度加强了&#xff0c;因此可以为这两个行业增加了不少优势…

Spring-Cloud-OpenFeign源码解析-01-OpenFeign简介

OpenFeign简介 OpenFeign是一种声明式、模板化的HTTP客户端(仅在Application Client中使用)。声明式调用是指&#xff0c;就像调用本地方法一样调用远程方法&#xff0c;无需感知操作远程http请求。 OpenFeign和Feign的区别 Feign是Spring Cloud组件中一个轻量级RESTful的HT…

[算法][差分][延迟相差][leetcode]2960. 统计已测试设备

题目地址&#xff1a; https://leetcode.cn/problems/count-tested-devices-after-test-operations/description/ 解法一&#xff1a;暴力解法 class Solution {public int countTestedDevices(int[] batteryPercentages) {//特殊条件判断if(null batteryPercentages || ba…

图论(洛谷刷题)

目录 前言&#xff1a; 题单&#xff1a; P3386 【模板】二分图最大匹配 P1525 [NOIP2010 提高组] 关押罪犯 P3385 【模板】负环 P3371 【模板】单源最短路径&#xff08;弱化版&#xff09; SPFA写法 Dij写法&#xff1a; P3385 【模板】负环 P5960 【模板】差分约束…

深度解析Nginx:高性能Web服务器的奥秘(上)

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《洞察之眼&#xff1a;ELK监控与可视化》&#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、前言 1、Nginx概述 2、Nginx的历史与发展 3、Nginx的…

Kubernetes学习-深入Pod篇(一) 创建Pod,Pod配置文件详解

&#x1f3f7;️个人主页&#xff1a;牵着猫散步的鼠鼠 &#x1f3f7;️系列专栏&#xff1a;Kubernetes渐进式学习-专栏 &#x1f3f7;️个人学习笔记&#xff0c;若有缺误&#xff0c;欢迎评论区指正 1.前言 我们在前面的文章讲解了Kubernetes的核心概念和服务部署&#x…

ViLT 浅析

ViLT 浅析 论文链接&#xff1a;ViLT 文章目录 ViLT 浅析创新点网络结构总结 创新点 本文先分析了4种不同类型的Vision-and-Language Pretraining(VLP) 其中每个矩形的高表示相对计算量大小&#xff0c;VE、TE和MI分别是visual embedding、text embedding和modality interact…

2024年数维杯数学建模

高质量原创论文已完成 需要的私我

解决“电脑开机黑屏Explorer进程卡死“问题

今天&#xff0c;给台式机按电源键&#xff0c;进入windows系统时&#xff0c;发现电脑黑屏了&#xff0c;昨天还好好的&#xff0c;怎么今天电脑桌面进不去了&#xff1f;想起Windows XP、Windows 7、Windows 10 、Windows 11等系统&#xff0c;在使用多个文件拷贝时&#xff…

python的import导入规则

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、pycharm只能看到当前工作路径父目录下所有文件和项目根目录下所有文件二、sys或者图形界面添加解释器路径&#xff08;搜寻路径&#xff09;三、import导入…

乡村旅游指标-最美乡村数、旅游示范县数、旅行社数、景区数、农家乐数(2007-2021年)

01、数据介绍 乡村旅游也是促进乡村经济发展的有效途径。通过发展乡村旅游&#xff0c;可以带动乡村相关产业的发展&#xff0c;提高乡村居民的收入&#xff0c;促进乡村的经济发展和社会进步。此外&#xff0c;乡村旅游还能促进城乡交流&#xff0c;推动城乡统筹发展。 数据…

SEO之为什么研究关键词(一)

初创企业需要建站的朋友看这篇文章&#xff0c;谢谢支持&#xff1a; 我给不会敲代码又想搭建网站的人建议 新手上云 初做网站的人很容易犯的最大错误之一是&#xff0c;脑袋一拍就贸然进入某个领域&#xff0c;跳过竞争研究&#xff0c;没规划好目标关键词就开始做网站。这样做…

ICode国际青少年编程竞赛- Python-4级训练场-while语句综合

ICode国际青少年编程竞赛- Python-4级训练场-while语句综合 1、 for i in range(4):while not Flyer[i].disappear():wait()Spaceship.step(6)Spaceship.turnLeft()2、 Dev.turnLeft() for i in range(4):Spaceship.step(2)while Flyer[i].disappear():wait()Dev.step(4)Dev.…

【SpringBoot】Redis Lua脚本实战指南:简单高效的构建分布式多命令原子操作、分布式锁

文章目录 一.Lua脚本1.Lua特性2.Lua优势 二.Lua语法1.注释2.变量3.数据类型&#xff1a;3.1.基本类型3.2.对象类型&#xff1a;表&#xff08;table&#xff09; 4.控制结构&#xff1a;4.1.条件语句: 使用if、else和elseif来实现条件分支。4.2.循环结构&#xff1a;Lua支持for…