【大数据】Zookeeper 数据写入与分布式锁

Zookeeper 数据写入与分布式锁

  • 1.数据是怎么写入的
  • 2.基于 Zookeeper 实现分布式锁

1.数据是怎么写入的

无论是 Zookeeper 自带的客户端 zkCli.sh,还是使用 Python(或者其它语言)实现的客户端,本质上都是连接至集群,然后往里面读写数据。那么问题来了,集群在收到来自客户端的写请求时,是怎么写入数据的呢?

另外客户端在访问集群的时候,本质上是访问集群内的某一个节点,而根据访问的节点是领导者还是追随者,写入数据的过程也会有所不同。

先来看看当 访问的节点是领导者 的情况:

在这里插入图片描述
这里面有一个关键的地方,就是 Leader 不会等到所有的 Follower 都写完,只要有一半的 Follower 写完,就会告知客户端。还是半数机制,一半的 Follower 加上 Leader 正好刚过半数。而这么做的原因也很简单,就是为了快速响应。

再来看另一种情况,如果客户端 访问的节点是追随者,情况会怎么样呢?其实很简单,由于追随者没有写权限,那么会先将写请求转发给领导者,然后接下来的步骤和上面类似,只是最后一步不同。

当 Leader 发现有半数的 Follower 写完,就认为写数据成功,于是返回 ack。但这个 ack 不会返回给客户端,因为客户端访问的不是领导者,最终领导者会将 ack 返回给客户端访问的追随者,再由这个追随者将 ack 返回给客户端,告知写请求已执行完毕。

2.基于 Zookeeper 实现分布式锁

关于分布式锁,我之前介绍过如何基于 Redis 实现分布式锁,里面对分布式锁做了比较详细的解析。下面来聊一聊如何基于 Zookeeper 实现分布式锁。

先来说一下原理,当客户端需要操作共享资源时,需要先往 Zookeeper 集群中创建一个临时顺序节点。然后查看对应的编号,如果没有比它小的,说明最先创建,我们就认为客户端拿到了分布式锁。

如果客户端发现节点的编号不是最小的,说明已经有人先创建了,也就是锁已经被别的客户端拿走了。那么该客户端会对前一个节点进行监听,等待释放。

在这里插入图片描述

所以从概念上还是很好理解的,然后我们来编程实现一下。

from typing import List
import queue
from kazoo.client import KazooClient

class DistributedLock:

    def __init__(self, hosts: List[str]):
        """
        :param hosts: 'ip1:port1,...'
        """
        self.client = KazooClient(",".join(hosts))
        self.client.start()
        # 要在 /lock 节点下面创建临时顺序节点
        # 所以先保证 /lock 节点存在
        if not self.client.exists("/lock"):
            self.client.create("/lock")

        # 要创建的临时顺序节点
        self.cur_node = None
        # 要监听的节点(也就是上一个节点)
        self.prev_node = None
        # 本地队列
        self.q = queue.Queue()

    def acquire(self):
        """
        获取锁
        :return:
        """
        self.cur_node = self.client.create(
            "/lock/seq-",
            # 临时顺序节点
            ephemeral=True,
            sequence=True
        )
        # create 方法会返回创建的节点名称
        # 需要判断编号是不是最小的
        # 因此要拿到所有的节点
        nodes = self.client.get_children("/lock")
        # nodes: ["seq-000..0", "seq-000...1"]
        nodes.sort()
        if len(nodes) == 1:
            return True
        elif "/lock/" + nodes[0] == self.cur_node:
            # 如果 nodes 里面的最小值和 node 相等
            # 说明该客户端创建的节点的编号最小
            # 于是我们就认为它拿到了分布式锁
            return True
        # 否则说明不是最小,因此要找到它的上一个节点
        # 也就是要监听的节点
        index = nodes.index(self.cur_node.split("/")[-1])
        self.prev_node = "/lock/" + nodes[index - 1]
        # 对上一个节点进行监听
        self.client.get(self.prev_node, watch=self.watch)
        # 这一步不是阻塞的,但程序必须要拿到锁之后才可以执行
        # 所以我们要显式地让程序阻塞在这里
        self.q.get()
        return True

    def release(self):
        """
        释放锁
        :return:
        """
        self.client.delete(self.cur_node)

    def watch(self, event):
        """
        监听函数,参数 event 是一个 namedtuple
        kazoo.protocol.states.WatchedEvent
        里面有三个字段:type、state、path

        监听节点的值被改变时,type 为 "CHANGED"
        监听节点被删除时,type 为 "DELETED"

        path 就是监听的节点本身

        state 表示客户端和服务端之间的连接状态
        建立连接时,状态为 LOST
        连接建立成功,状态为 CONNECTED
        如果在整个会话的生命周期里,伴随着网络闪断、服务端异常
        或者其他什么原因导致客户端和服务端连接断开,状态为 SUSPENDED
        与此同时,KazooClient 会不断尝试与服务端建立连接,直至超时
        如果连接建立成功了,那么状态会再次切换到 CONNECTED
        """
        if event.type == "DELETED" and \
            self.prev_node == event.path:
            # 往队列里面扔一个元素
            # 让下一个节点解除阻塞
            self.q.put(None)

# 测试函数
def test(lock, name):
    lock.acquire()
    print(f"{name}获得锁,其它人等着吧")
    print(f"{name}处理业务······")
    print(f"{name}处理完毕,释放锁")
    lock.release()

if __name__ == '__main__':
    import threading
    hosts = [
        "82.157.146.194:2181",  
        "121.37.165.252:2181",  
        "123.60.7.226:2181",    
    ]
    # 创建三把锁
    lock1 = DistributedLock(hosts)
    lock2 = DistributedLock(hosts)
    lock3 = DistributedLock(hosts)
    threading.Thread(
        target=test, args=(lock1, "客户端1")
    ).start()
    threading.Thread(
        target=test, args=(lock2, "客户端2")
    ).start()
    threading.Thread(
        target=test, args=(lock3, "客户端3")
    ).start()

"""
客户端1获得锁,其它人等着吧
客户端1处理业务······
客户端1处理完毕,释放锁
客户端3获得锁,其它人等着吧
客户端3处理业务······
客户端3处理完毕,释放锁
客户端2获得锁,其它人等着吧
客户端2处理业务······
客户端2处理完毕,释放锁
"""

实现起来不是很难,并且使用 Zookeeper 的好处就是,我们不需要担心死锁的问题。因为客户端宕掉之后,临时节点会自动删除,但缺点是性能没有 Redis 高。

另外值得一提的是,kazoo 已经帮我们实现好了分布式锁,开箱即用,我们就不需要再手动实现了。

# 创建客户端
client = KazooClient(",".join(hosts))
client.start()
# 此时需要自己手动给一个唯一标识
lock = client.Lock("/lock", "unique-identifier")
# 获取锁
lock.acquire()
# 处理业务逻辑
...
# 释放锁
lock.release()
# 或者也可以使用上下文管理器
with lock:
    ...

显然就优雅多了,借助于 kazoo 实现好的分布式锁,可以减轻我们的心智负担。此外 kazoo 还提供了 读锁写锁

  • client.ReadLock
  • client.WriteLock

我们一般使用 client.Lock 就行,可以自己测试一下。


关于 Zookeeper 的基础内容就介绍到这里,但伴随着 Zookeeper 还有一系列的协议,比如 Paxos 协议ZAB 协议CAP 定理 等等,这些可谓是分布式系统的重中之重。我们后续来逐一介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/298761.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

1982-2022年GIMMS 标准化差异植被指数

GIMMS 标准化差异植被指数 1982-2022 PKU GIMMS 归一化植被指数数据集(PKU GIMMS NDVI,版本 1.2)提供了从 1982 年到 2022 年以半个月为间隔、分辨率为 1/12 的一致的全球 NDVI 数据。其主要目标是解决现有领域中普遍存在的关键不确定性。全…

BMS电池管理系统带充放电控制过流过压保护

2.4G无线采集BMS开发板(主从一体) 全新升级 (赠送上位机源码TTL 上位机,可以改成自己想要的界面) 12串电池TTL上位机 CAN通信上位机源码有偿开源,供项目二次开发。 增加STM32平台 USB转TTL通信 CAN通信 增加…

C++面向对象核心-继承

1、继承 1.1 概念 继承是面向对象的三大特性之一,体现了代码复用的思想。 继承就是在一个已存在的类的基础上建立一个新的类,并拥有其特性。 已存在的类被称为“基类”或者“父类”新建立的类被称为“派生类”或者“子类”对象间没有继承关系 #include &…

数据恢复与并发控制例题

例1: (1)重做(REDO):T1,T2,T3; 撤销(UNDO):T4。 (2)重做:T1,T2; 撤销:T3。 (3)重做:T1; 撤销:T2,T3. (4)重做:T1; 撤销…

手机上下载 Linux 系统

我们首先要下载 Ternux 点击下载以及vnc viewer (提取码:d9sX),需要魔法才行 下载完以后我们打开 Ternux 敲第一个命令 pkg upgrade 这个命令是用来跟新软件的 敲完命令就直接回车,如果遇到需要输入 Y/N 的地方全部输入 Y 下一步 #启动TMOE…

java SSM问卷调查系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM问卷调查管理系统是一套完善的web设计系统(系统采用SSM框架进行设计开发,springspringMVCmybatis),对理解JSP java编程开发语言有帮助,系统具有完整的源代 码和数据库,系统主要采…

vite + vue3引入ant design vue 报错

npm install ant-design-vue --save下载插件并在main.ts 全局引入 报错 解决办法一: main.ts注释掉全局引入 模块按需引入 解决办法二 将package.json中的ant-design-vue的版本^4.0.0-rc.4改为 ^3.2.15版本 同时将将package-lock.json中的ant-design-vue的版本…

华为云服务器试用领取

系列文章目录 华为云服务器试用领取 领取的试用云服务器在哪 文章目录 系列文章目录介绍 介绍 我将会用该系列文章讲述如何在云服务器中安装大数据软件及其环境搭建。如有不足之处,还望指点。 本篇文章讲述的是华为云服务器的免费试用。 华为弹性云服务器 ECS 该云…

metaSPAdes,megahit,IDBA-UB:宏基因组装软件安装与使用

metaSPAdes,megahit,IDBA-UB是目前比较主流的宏基因组组装软件 metaSPAdes安装 GitHub - ablab/spades: SPAdes Genome Assembler #3.15.5的预编译版貌似有问题,使用源码安装试试 wget http://cab.spbu.ru/files/release3.15.5/SPAdes-3.15.5.tar.gz tar -xzf SP…

数据分析——快递电商

一、任务目标 1、任务 总体目的——对账 本项目解决同时使用多个快递发货,部分隔离区域出现不同程度涨价等情形下,如何快速准确核对账单的问题。 1、在订单表中新增一列【运费差异核对】来表示订单运费实际有多少差异,结果为数值。 2、将…

【书生·浦语大模型实战营02】《轻松玩转书生·浦语大模型趣味Demo》学习笔记

《轻松玩转书生浦语大模型趣味Demo》 1、InternLM-Chat-7B 智能对话:生成 300 字的小故事 本节中我们将使用InternLM-Chat-7B 模型部署一个智能对话 Demo。 1.1 环境准备 在InternStudio平台中选择 A100(1/4) 的配置,镜像选择 Cuda11.7-conda&#x…

idea中使用Lombok 失效,@Slf4j 找不到符号的解决办法

文章目录 一、前言二、问题排查和解决方案三、 其他解决方案3.1 另一种解决方案3.2 参考文章 一、前言 今天在一个多module工程中,新增了一个 springboot(版本 2.2.4.RELEASE) module,像往常一样,我引入了lombok依赖&…

电脑开启虚拟化如何查看自己的主机主板型号

问题描述 在使用virtualbox、vmware安装虚拟机的时候,需要本机电脑能够支持虚拟化。 但是不同厂家的主机(主板)幸好并不一致,所以需要先了解自己的电脑主板型号 操作方法 1、win r 键打开运行窗口,输入cmd并确定打开…

关于“Python”的核心知识点整理大全64

目录 20.2.15 确保项目的安全 settings.py 20.2.16 提交并推送修改 20.2.17 创建自定义错误页面 1. 创建自定义模板 500.html settings.py settings.py 注意 views.py 20.2.18 继续开发 往期快速传送门👆(在文章最后)&#xff1a…

大数据Doris(五十一):Colocation Join介绍

文章目录 Colocation Join介绍 一、原理 二、使用方式 1、建表 2、删表

【Java EE初阶七】多线程案例(生产者消费者模型)

1. 阻塞队列 队列是先进先出的一种数据结构; 阻塞队列,是基于队列,做了一些扩展,适用于多线程编程中; 阻塞队列特点如下: 1、是线程安全的 2、具有阻塞的特性 2.1、当队列满了时,就不能往队列里…

MATLAB插值函数

一、MATLAB插值函数概览 1)本节重点介绍的插值函数 MATLAB插值函数适用情况基础句式interp1 函数interp1 主要用于一维数据的插值interp1(x, y, x_interp, ‘linear’); 其中 x 和 y 是已知数据点,x_interp 是要插值的目标点。interp2 函数interp2 用于…

VS code的使用介绍

VS code的使用介绍 简介下载和安装常用的插件使用教程快捷键 集成Git未找到 Git。请安装 Git,或在 "git.path" 设置中配置。操作步骤打开文件夹初始化仓库文件版本控制状态提交文件到git打开git操作栏位 好用的插件ChineseDraw.io Integration实体关系 Gi…

SpringSecurity集成JWT实现后端认证授权保姆级教程-环境搭建篇

🍁 作者:知识浅谈,CSDN签约讲师,CSDN博客专家,华为云云享专家,阿里云专家博主 📌 擅长领域:全栈工程师、爬虫、ACM算法 💒 公众号:知识浅谈 🔥网站…

C++ UTF-8与GBK字符的转换 —基于Linux 虚拟机 (iconv_open iconv)

1、UTF-8 和 GBK 的区别 GBK:通常简称 GB (“国标”汉语拼音首字母),GBK 包含全部中文字符。 UTF-8 :是一种国际化的编码方式,包含了世界上大部分的语种文字(简体中文字、繁体中文字、英文、…