Zookeeper基础入门-2【ZooKeeper 分布式锁案例】

Zookeeper基础入门-2【ZooKeeper 分布式锁案例】

  • 四、ZooKeeper-IDEA环境搭建
    • 4.1.环境搭建
      • 4.1.1.创建maven工程:zookeeper
      • 4.1.2.在pom文件添加依赖
      • 4.1.3.在项目的src/main/resources 目录下,新建文件为“log4j.properties”
      • 4.1.4.创建包名com.orange.zk
    • 4.2.ZooKeeper 客户端API操作
      • 4.2.1.初始化ZooKeeper对象
      • 4.2.2.创建节点
      • 4.2.3.获取子节点并监听节点变化
      • 4.2.4.判断节点Node是否存在
    • 4.3.客户端向服务端写数据流程
      • 4.3.1.写流程之写入请求直接发送给Leader节点
      • 4.3.2.写流程之写入请求发送给follower节点
  • 五、服务器动态上下线监听案例
    • 5.1.需求
    • 5.2.需求分析--服务器动态上下线
    • 5.3.具体实现
      • 5.3.1.先在集群上创建/servers 节点
      • 5.3.2.创建包名:com.orange.zkcase1
      • 5.3.3.服务器端向Zookeeper注册代码
      • 5.3.4.客户端代码
    • 5.4.测试
      • 5.4.1.在Linux 命令行上操作增加减少服务器
        • 5.4.1.1.启动DistributeClient 客户端
        • 5.4.1.2.在host128 上zk 的客户端/servers 目录上创建临时带序号节点
        • 5.4.1.3.观察Idea 控制台变化
        • 5.4.1.4.执行删除操作
        • 5.4.1.5.观察Idea 控制台变化
      • 5.4.2.在Idea 上操作增加减少服务器
        • 5.4.2.1.启动DistributeClient 客户端(如果已经启动过,不需要重启)
        • 5.4.2.2.启动DistributeServer 服务
          • 5.4.2.2.1.点击Edit Configurations…
          • 5.4.2.2.2.在弹出的窗口中(Program arguments)输入想启动的主机,例如,host130
          • 5.4.2.2.3.回到DistributeServer的main方法,右 键,在弹出的窗口中点击 Run “DistributeServer.main()”
          • 5.4.2.2.4.观察DistributeServer 控制台
          • 5.4.2.2.5.观察DistributeClient 控制台
  • 六、ZooKeeper 分布式锁案例
    • 6.1.分布式锁
    • 6.2.ZooKeeper分布式锁原理
    • 6.3.分布式锁案例分析
    • 6.3.原生Zookeeper 实现分布式锁案例
      • 6.3.1.分布式锁实现
      • 6.3.2.分布式锁测试
        • 6.3.2.1.创建两个线程
        • 6.3.2.2.观察控制台变化
    • 6.4.Curator框架实现分布式锁案例
      • 6.4.1.Curator有五种锁方案:
      • 6.4.1.原生的Java API 开发存在的问题
      • 6.4.2.Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题
      • 6.4.3.Curator 案例实操
        • 6.4.3.1.添加依赖
        • 6.4.3.2.代码实现
        • 6.4.3.3.控制台变化
  • 七、模拟12306售票案例
    • 7.1.代码实现
    • 7.2.测试
    • 7.3.控制台变化
  • 八、企业面试真题(面试重点)
    • 8.1.选举机制
    • 8.2.生产集群安装多少zk 合适
    • 8.3.常用命令
  • endl

四、ZooKeeper-IDEA环境搭建

保证三台Zookeeper 集群服务端启动

[root@host128 ~]# jpsall
显示集群的所有java进程状态
=============== host128 ===============
66496 Jps
2445 QuorumPeerMain
=============== host129 ===============
66162 Jps
2413 QuorumPeerMain
=============== host130 ===============
65947 Jps
2383 QuorumPeerMain
执行结束

4.1.环境搭建

4.1.1.创建maven工程:zookeeper

4.1.2.在pom文件添加依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>zookeeper-test01</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <properties>
    <maven.complier.source>8</maven.complier.source>
    <maven.complier.target>8</maven.complier.target>
  </properties>

  <dependencies>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.8.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.5.7</version>
    </dependency>

  </dependencies>
</project>

4.1.3.在项目的src/main/resources 目录下,新建文件为“log4j.properties”

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4.1.4.创建包名com.orange.zk

在这里插入图片描述

4.2.ZooKeeper 客户端API操作

4.2.1.初始化ZooKeeper对象

/**
 * Description: zookeeper客户端
 */
public class Client {

    //注意:connectString逗号左右不能有空格,否则连接不上
    private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private int sessionTimeout = 200000;
    private ZooKeeper zkClient;

    /**
     * 初始化ZooKeeper对象,必须要先创建,再进行创建节点,否则报空指针异常
     *
     * @throws IOException
     */
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                
            }
        });
    }

}

4.2.2.创建节点

    @Test
    public void create() throws InterruptedException, KeeperException {
        //"/tang":创建的节点的路径;
        //"t.avi".getBytes():节点里面的值,需要转化为字节传输;
        //ZooDefs.Ids.OPEN_ACL_UNSAFE:权限控制,设置访问权限;
        //CreateMode.PERSISTENT:创建的节点类型,如这个是永久不带序号节点。
        String nodeCreated = zkClient.create("/tang", "t.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
# 指定启动host128的客户端,而不是localhost的
/opt/module/zookeeper-3.5.7/bin/zkCli.sh -server host128:2181
[zk: host128:2181(CONNECTED) 0] ls /
[tang, zookeeper]
[zk: host128:2181(CONNECTED) 1] get -s /tang
t.avi
cZxid = 0x100000002
ctime = Wed Feb 28 12:35:13 CST 2024
mZxid = 0x100000002
mtime = Wed Feb 28 12:35:13 CST 2024
pZxid = 0x100000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

4.2.3.获取子节点并监听节点变化

/**
 * Description: zookeeper客户端
 */
public class Client {

    //注意:connectString逗号左右不能有空格,否则连接不上
    private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private int sessionTimeout = 200000;
    private ZooKeeper zkClient;

    /**
     * 初始化ZooKeeper对象,必须要先创建,再进行创建节点,否则报空指针异常
     *
     * @throws IOException
     */
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //收到时间通知后的回调函数(用户的业务逻辑)
                System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
                //再次启动监听
                try {
                    System.out.println("===============================");
                    List<String> children = zkClient.getChildren("/", true);

                    for (String child : children) {
                        System.out.println(child);
                    }
                    System.out.println("===============================");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Test
    public void create() throws InterruptedException, KeeperException {
        //"/tang":创建的节点的路径;
        //"t.avi".getBytes():节点里面的值,需要转化为字节传输;
        //ZooDefs.Ids.OPEN_ACL_UNSAFE:权限控制,设置访问权限;
        //CreateMode.PERSISTENT:创建的节点类型,如这个是永久不带序号节点。
        String nodeCreated = zkClient.create("/tang", "t.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    /**
     * 监听节点变化信息
     */
    @Test
    public void getChildren() throws KeeperException, InterruptedException {
        System.out.println("-----------------------------");
        List<String> children = zkClient.getChildren("/", true);

        for (String child : children){
            System.out.println(child);
        }
        System.out.println("-----------------------------");
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

}
None--null
===============================
tang
zookeeper
-----------------------------
tang
zookeeper
===============================

监听器只能监听一次,如果再发生变化需要重新注册监听器,要想每次节点发生变化都能检测到并且在控制台打印,就在初始化监听器里面再注册一个监听器,每次监听完又马上注册一个新的监听器。

# 指定启动host128的客户端,而不是localhost的
/opt/module/zookeeper-3.5.7/bin/zkCli.sh -server host128:2181
[zk: host128:2181(CONNECTED) 8] create /test01 "test01"
Created /test01
[zk: host128:2181(CONNECTED) 9] create /test02 "test02"
Created /test02
NodeChildrenChanged--/
===============================
tang
zookeeper
test01
===============================
NodeChildrenChanged--/
===============================
tang
test02
zookeeper
test01
===============================

4.2.4.判断节点Node是否存在

    /**
     * 判断节点是否存在
     */
    @Test
    public void exist() throws InterruptedException, KeeperException {
        Stat stat = zkClient.exists("/tang", false);
        System.out.println(stat == null ? "not data" : "exist");
    }

4.3.客户端向服务端写数据流程

4.3.1.写流程之写入请求直接发送给Leader节点

在这里插入图片描述
1.当client向zookeeper的leader上写数据,发送一个写请求

2.这个Leader会把写请求广播给各个server,当各个server写成功后就会通知Leader.

3.当Leader收到半数以上server写成功应答,此时认为写成功,Client会收到Leader写成功应答。

4.3.2.写流程之写入请求发送给follower节点

在这里插入图片描述
1.当client向zookeeper集群的某个server上写数据,发送一个写请求

2.如果接收到请求的不是Leader,那么server会把请求转发给Leader,因为zookeeper的集群中只有一个是Leader,这个Leader会把写请求广播给各个server,当各个server写成功后就会通知Leader.

3.当Leader收到半数以上server写成功应答,此时认为写成功,Leader会告知向他提交申请的server

4.Server会进一步将通知Client写成功, 此时就认为写成功了。

五、服务器动态上下线监听案例

5.1.需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

5.2.需求分析–服务器动态上下线

在这里插入图片描述

5.3.具体实现

5.3.1.先在集群上创建/servers 节点

[zk: host128:2181(CONNECTED) 10] create /servers "servers"
Created /servers

5.3.2.创建包名:com.orange.zkcase1

在这里插入图片描述

5.3.3.服务器端向Zookeeper注册代码

/**
 * Description: 服务端和zookeeper集群创建连接
 */
public class DistributeServer {

    //注意:connectString逗号左右不能有空格,否则连接不上
    private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private int sessionTimeout = 200000;
    private ZooKeeper zkClient;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeServer server = new DistributeServer();
        //1.连接zookeeper集群,获取zk连接,创建zk
        server.getConnect();

        //2.注册服务器到zk集群
        server.regist(args[0]);

        //3.启动业务逻辑
        server.business();
    }

    private void business() throws InterruptedException {
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 注册服务器,创建节点
     */
    private void regist(String hostname) throws InterruptedException, KeeperException {
        String create = zkClient.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + " is online");
    }

    /**
     * 连接上zookeeper集群
     */
    private void getConnect() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

5.3.4.客户端代码

/**
 * Description: 客户端监听集群节点的动态变化
 */
public class DistributeClient {

    //注意:connectString逗号左右不能有空格,否则连接不上
    private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private int sessionTimeout = 200000;
    private ZooKeeper zkClient;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient client = new DistributeClient();
        //1.获取zk连接
        client.getConnect();

        //2.监听服务器 /servers 下面子节点的增加和删除
        client.getServerList(); //获取servers上的所有节点的上线和下线

        //3.业务逻辑
        client.business();
    }

    private void business() throws InterruptedException {
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws InterruptedException, KeeperException {
        //获取servers下的所有节点信息
        List<String> children = zkClient.getChildren("/servers", true);//对父节点监听

        ArrayList<String> servers = new ArrayList<String>(); //集合用来存所有的服务器节点
        //遍历所有节点  获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zkClient.getData("/servers/" + child, false, null);
            servers.add(new String(data));
        }
        //打印服务器列表信息
        System.out.println(servers);
    }

    // 创建zookeeper客户端
    private void getConnect() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //收到事件通知后的回调函数(用户的业务逻辑)
                try {
                    //再次启动监听,避免只监听一次
                    getServerList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

5.4.测试

5.4.1.在Linux 命令行上操作增加减少服务器

5.4.1.1.启动DistributeClient 客户端
5.4.1.2.在host128 上zk 的客户端/servers 目录上创建临时带序号节点
[zk: host128:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: host128:2181(CONNECTED) 1] create /servers "servers"
Created /servers
[zk: host128:2181(CONNECTED) 2] create -e -s /servers/host128 "hsot128"
Created /servers/host1280000000000
[zk: host128:2181(CONNECTED) 3] create -e -s /servers/host129 "hsot129"
Created /servers/host1290000000001
[zk: host128:2181(CONNECTED) 4] create -e -s /servers/host130 "hsot130"
Created /servers/host1300000000002
5.4.1.3.观察Idea 控制台变化
[]
[]
[hsot128]
[hsot129, hsot128]
[hsot130, hsot129, hsot128]
5.4.1.4.执行删除操作
[zk: host128:2181(CONNECTED) 6] ls /servers
[host1280000000000, host1290000000001, host1300000000002]
[zk: host128:2181(CONNECTED) 7] delete /servers/host1280000000000
[zk: host128:2181(CONNECTED) 8] delete /servers/host1290000000001
5.4.1.5.观察Idea 控制台变化
[hsot130, hsot129]
[hsot130]

5.4.2.在Idea 上操作增加减少服务器

5.4.2.1.启动DistributeClient 客户端(如果已经启动过,不需要重启)
5.4.2.2.启动DistributeServer 服务
5.4.2.2.1.点击Edit Configurations…

在这里插入图片描述

5.4.2.2.2.在弹出的窗口中(Program arguments)输入想启动的主机,例如,host130

在这里插入图片描述

5.4.2.2.3.回到DistributeServer的main方法,右 键,在弹出的窗口中点击 Run “DistributeServer.main()”

在这里插入图片描述

5.4.2.2.4.观察DistributeServer 控制台
host130 is online
5.4.2.2.5.观察DistributeClient 控制台
#host130 已经上线
[hsot130]

六、ZooKeeper 分布式锁案例

6.1.分布式锁

比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁

在这里插入图片描述

6.2.ZooKeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
    1. 客户端获取锁时,在lock节点下创建临时顺序节点
    1. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除
    1. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器监听删除事件
    1. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

在这里插入图片描述

6.3.分布式锁案例分析

在这里插入图片描述

1)接收到请求后,在/locks节点下创建一个临时顺序节点

2)判断自己是不是当前节点下最小的节点:是,获取到锁;不是,对前一个节点进行监听

3)获取到锁,处理完业务后,delete节点释放锁,然后下面的节点将收到通知,重复第二步判断

6.3.原生Zookeeper 实现分布式锁案例

6.3.1.分布式锁实现

package com.orange.zkcase2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Description: zookeeper分布式锁案例
 */
public class DistributedLock {
    //注意:connectString逗号左右不能有空格,否则连接不上
    private final String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private final int sessionTimeout = 200000;
    private final ZooKeeper zkClient;

    //增加代码健壮性
    //zookeeper连接
    private CountDownLatch connectLatch = new CountDownLatch(1);
    //zookeeper等待
    private CountDownLatch waitLatch = new CountDownLatch(1);

    //当前client等待的子节点的路径
    private String waitPath;
    //当前client创建的子节点
    private String currentNode;

    /**
     * 和zk创建连接,并创建根节点
     */
    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        //1.获取连接 建立服务端与客户端连接
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("-----process-------");
                // connectLatch 如果连接上zk  可以释放
                // 连接建立时, 打开latch, 唤醒wait在该latch上的线程
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // 发生了waitPath的删除事件 需要释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        //等待zookeeper正常连接后,代码才往下继续执行
        connectLatch.await();

        //2.判断根节点 /locks 是否存在
        Stat stat = zkClient.exists("/locks", false);
        //如果根节点不存在,则创建根节点,根节点类型为永久节点
        if (stat == null) {
            System.out.println("根节点不存在");
            // 创建根节点,根节点必须是永久节点
            zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    //对zk 加锁
    public void zkLock() {
        try {
            //创建对应的临时带序号临时节点,返回值为创建的节点路径
            currentNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName() + "当前节点为:" + currentNode);

            //注意, 没有必要监听"/locks"的子节点的变化情况
            //判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听它序号前一个节点
            List<String> children = zkClient.getChildren("/locks", false);
            //如果children只要一个子节点,那就直接获取锁; 如果有多个节点,需要判断,谁最小
            if (children.size() == 1) {
                System.out.println(Thread.currentThread().getName() + "对zk 加锁, 当前节点:" + currentNode);
                return;
            } else {
                ///对根节点下的所有临时顺序节点进行从小到大排序,有序递增
                Collections.sort(children);
                //获取当前节点名称 seq-00000000
                String thisNode = currentNode.substring("/locks/".length());
                System.out.println(Thread.currentThread().getName() + "当前节点名称为:" + thisNode);
                // 通过seq-00000000 获取该节点在children集合的位置
                int index = children.indexOf(thisNode);
                System.out.println(Thread.currentThread().getName() + "当前节点在集合的位置为:" + index);

                //判断
                if (index == -1) {
                    System.out.println(Thread.currentThread().getName() + "数据异常");
                } else if (index == 0) {
                    //只有一个节点,就可以获取锁了
                    System.out.println(Thread.currentThread().getName() + "对zk 加锁, 当前节点:" + currentNode);
                    return;
                } else {
                    //获得排名比 currentNode 前 1 位的节点
                    waitPath = "/locks/" + children.get(index - 1);
                    System.out.println(Thread.currentThread().getName() + "前一个节点为:" + waitPath);
                    //在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper会回调监听器的 process方法
                    //需要监听 它前一个节点变化
                    zkClient.getData(waitPath, true, null);

                    //入等待锁状态,等待监听
                    waitLatch.await();

                    return;
                }
            }
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    //对zk 解锁
    public void unZkLock() {
        try {
            System.out.println(Thread.currentThread().getName() + "解锁,删除当前节点:" + currentNode);
            //删除节点
            zkClient.delete(currentNode, -1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }
}

6.3.2.分布式锁测试

6.3.2.1.创建两个线程
package com.orange.zkcase2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

/**
 * Description: 测试分布式锁
 */
public class DistributedLockTest {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        // 创建分布式锁
        // final修饰的对象必须被初始化,不能被修改。
        // 非final的对象可以被重新赋值,锁对象就不受管控了。
        // 当一个锁被其他对象占有时,当前线程可以对锁对象重新赋值(相当于从新创建了一个锁对象),从而也拿到了运行的权利。

        //创建分布式锁 1
        final DistributedLock lock1 = new DistributedLock();
        //创建分布式锁 2
        final DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                //获取锁对象
                try {
                    lock1.zkLock();
                    System.out.println("线程0 启动,获取到锁");
                    Thread.sleep(5*1000);//延迟5秒

                    lock1.unZkLock();
                    System.out.println("线程0 释放锁");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                //获取锁对象
                try {
                    lock2.zkLock();
                    System.out.println("线程1 启动,获取到锁");
                    Thread.sleep(5 * 1000);//延迟5秒

                    lock2.unZkLock();
                    System.out.println("线程1 释放锁");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }
}

6.3.2.2.观察控制台变化
-----process-------
根节点不存在

-----process-------
Thread-1当前节点为:/locks/seq-0000000000
Thread-0当前节点为:/locks/seq-0000000001
Thread-1当前节点名称为:seq-0000000000
Thread-1当前节点在集合的位置为:0
Thread-1对zk 加锁, 当前节点:/locks/seq-0000000000
线程1 启动,获取到锁
Thread-0当前节点名称为:seq-0000000001
Thread-0当前节点在集合的位置为:1
Thread-0前一个节点为:/locks/seq-0000000000
Thread-1解锁,删除当前节点:/locks/seq-0000000000
-----process-------
线程0 启动,获取到锁
线程1 释放锁
Thread-0解锁,删除当前节点:/locks/seq-0000000001
线程0 释放锁

6.4.Curator框架实现分布式锁案例

6.4.1.Curator有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

6.4.1.原生的Java API 开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

(2)Watch 需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

6.4.2.Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题

官方文档:https://curator.apache.org/index.html

6.4.3.Curator 案例实操

6.4.3.1.添加依赖
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.3.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.3.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>4.3.0</version>
    </dependency>
6.4.3.2.代码实现
package com.orange.zkcase3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * Description: Curator 框架实现分布式锁案例
 */
public class CuratorLockTest {
    public static void main(String[] args) {
        //创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        //创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //获取到锁
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");
                    //测试锁重入
                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(3 * 1000);

                    //释放锁
                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1 再次释放锁");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //获取到锁
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");
                    //测试锁重入
                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(3 * 1000);

                    //释放锁
                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2 再次释放锁");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }

    /**
     * 分布式锁初始化
     */
    private static CuratorFramework getCuratorFramework() {
        //重试策略,初试时间 3秒,重试3次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        //通过工厂创建Curator
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                //zookeeper server列表
                .connectString("192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181")
                //connection超时时间
                .connectionTimeoutMs(20000)
                //session超时时间
                .sessionTimeoutMs(20000)
                .retryPolicy(policy).build();
        //启动客户端
        client.start();

        System.out.println("zookeeper 初始化完成...");
        return client;
    }
}
6.4.3.3.控制台变化
zookeeper 初始化完成...

zookeeper 初始化完成...

线程1 获取到锁
线程1 再次获取到锁
线程1 释放锁
线程1 再次释放锁
线程2 获取到锁
线程2 再次获取到锁
线程2 释放锁
线程2 再次释放锁

七、模拟12306售票案例

7.1.代码实现

/**
 * Description: 模拟12306售票案例
 */
public class LockTicket implements Runnable {

    private int tickets = 20;//数据库的票数

    private InterProcessMutex lock;

    public LockTicket() {
        //重试策略,初试时间 3秒,重试3次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        //通过工厂创建client客户端对象
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                //zookeeper server列表
                .connectString("192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181")
                //connection超时时间
                .connectionTimeoutMs(20000)
                //session超时时间
                .sessionTimeoutMs(20000)
                .retryPolicy(policy)
                .build();
        //启动客户端
        client.start();

        lock = new InterProcessMutex(client, "/locks");
    }

    @Override
    public void run() {
        while (true) {
            try {
                //获取锁
                lock.acquire(3, TimeUnit.SECONDS);
                if (tickets > 0) {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + ":" + tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

7.2.测试

/**
 * Description: 模拟12306售票案例
 */
public class LockTicketTest {
    public static void main(String[] args) {
        LockTicket lockTicket=new LockTicket();
        //创建客户端
        Thread t1 = new Thread(lockTicket, "携程");
        Thread t2 = new Thread(lockTicket, "飞猪");

        t1.start();
        t2.start();
    }
}

7.3.控制台变化

飞猪:20
携程:19
飞猪:18
携程:17
飞猪:16
携程:15
飞猪:14
携程:13
飞猪:12
携程:11
飞猪:10
携程:9
飞猪:8
携程:7
飞猪:6
携程:5
飞猪:4
携程:3
飞猪:2
携程:1

八、企业面试真题(面试重点)

8.1.选举机制

半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则: 投票过半数时,服务器 id 大的胜出
(2)第二次启动选举规则:

  • EPOCH 大的直接胜出
  • EPOCH 相同,事务 id 大的胜出
  • 事务 id 相同,服务器 id 大的胜出

8.2.生产集群安装多少zk 合适

安装奇数台
生产经验

  • 10 台服务器:3 台zk
  • 20 台服务器:5 台zk
  • 100 台服务器:11 台zk
  • 200 台服务器:11 台zk

服务器台数多:好处,提高可靠性;坏处:提高通信延时

8.3.常用命令

ls、get、create、delete 

endl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/416354.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

springboot 注解属性转换字典

1.注解相关功能实现 定义属性注解 import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.vehicle.manager.core.serializer.DicSerializer;import java.lang.annotation.*;/*** a…

c++学习记录 deque容器—数据存取

函数原型&#xff1a; at(int idx); //返回索引idx所指的数据operator[]; //返回索引idx所指的数据front(); //返回容器中第一个数据元素back(); //返回容器中最后一个数据元素 #include<iostream> using nam…

websocket在django中的运用

14-2 聊天室实现思路&#xff1a;轮训、长轮训、websocket_哔哩哔哩_bilibili 参考大佬的B站学习笔记 https://www.cnblogs.com/wupeiqi/p/6558766.html 参考博客 https://www.cnblogs.com/wupeiqi/articles/9593858.html 参考博客 http协议: 是短连接&#xff0c;无状态…

【LeetCode】每日一题:使二叉树所有路径值相等的最小代价

该题采用自底向上的思路的话&#xff0c;很容易想到使用贪心的思想&#xff0c;但是如何进行具体操作却有些难度。 这里补充一个重要的结论&#xff1a;二叉树的数组形式中&#xff0c;第i个节点的父节点是i/2&#xff1b;接下来只需要让自底向上让每个路径上的代价保持最低限…

Python实现自动检测设备连通性并发送告警到企业微信

背景&#xff1a;门禁机器使用的WiFi连接&#xff0c;因为某些原因会不定期自动断开连接&#xff0c;需要人工及时干预&#xff0c;以免影响门禁数据同步&#xff0c;故写此脚本&#xff0c;定时检测门禁网络联通性。 #首次使用要安装tcping模块 pip install tcpingfrom tcpin…

Jupyter Notebook 下载+简单设置

这里写目录标题 1. Jupyter Notebook安装2.切换打开别的盘3. 创建代码文件4.为jupyter notebook添加目录 (Jupyter安装拓展nbextensions)step1&#xff1a;安装命令step2&#xff1a;用户配置step3&#xff1a;上述过程均完成后&#xff0c;打开jupyter notebook就会发现界面多…

静态方式部署集中式网关

在静态方式部署集中式网关的场景中&#xff0c;控制平面的流程包括VXLAN隧道建立、MAC地址动态学习&#xff1b;转发平面的流程包括同子网已知单播报文转发、同子网BUM&#xff08;Broadcast&Unknown-unicast&Multicast&#xff09;报文转发、跨子网报文转发。 静态方…

qml 项目依赖

文章目录 出现的问题最终对比下一步 把 apptestQml3_6.exe 放到一个单独目录下&#xff0c;执行 windeployqt.exe ./apptestQml3_6.exe但是出了很多问题&#xff0c;根本运行不起来。 但是在release目录下执行下&#xff0c;程序能跑起来。 根据错误提示&#xff0c;进行添加。…

vue3+vite+ts配置多个代理并解决报404问题

之前配置接口代理总是报404,明明接口地址是对的但还是报是因数写法不对;用了vue2中的写法 pathRewrite改为rewrite 根路径下创建env文件根据自己需要名命 .env.development文件内容 # just a flag ENVdevelopment# static前缀 VITE_APP_PUBLIC_PREFIX"" # 基础模块…

php 支持mssqlserver

系统不支持:sqlsrv 需要一下几个环节 1.准备检测php版本 查看 VC 版本 查看操作系统位数&#xff1a;X86(32位) 和X64 2.下载php的sqlserver库 extensionphp_sqlsrv_74_nts_x64.dll extensionphp_pdo_sqlsrv_74_nts_x64.dll extensionphp_sqlsrv_74_nts_x64 extensionphp_…

基于Vue的高校课程考勤成绩管理系统SpringBoot+nodejs+python

设计目标&#xff1a; 课程管理系统开发的目的是管理全校开设课程的基本信息&#xff0c;安排各班级的课程以及上课时间和教室。系统的使用对象包括教务,学生、教师、管理员等。通过对日常课程管理工作的分析&#xff0c;可以将课程管理系统的功能分为下面几个方面&#xff1a…

C++ //练习10.3 用accumulate求一个vector<int>中的元素之和。

C Primer&#xff08;第5版&#xff09; 练习 10.3 练习10.3 用accumulate求一个vector中的元素之和。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 代码块 /*******************************************************************…

DataGrip2023配置连接Mssqlserver、Mysql、Oracle若干问题解决方案

1、Mssqlserver连接 本人连的是Sql2008&#xff0c;默认添加时&#xff0c;地址、端口、实例、账号、密码后&#xff0c;测试连接出现错误。 Use SSL&#xff1a;不要勾选 VM option&#xff1a;填写&#xff0c;"-Djdk.tls.disabledAlgorithmsSSLv3, RC4, DES, MD5withR…

(Linux学习二)文件管理基础操作命令笔记

Linux目录结构&#xff1a; bin 二进制文件 boot 启动目录 home 普通用户 root 超管 tmp 临时文件 run 临时运行数据 var 日志 usr 应用程序、文件 etc 配置文件 dev 文件系统 一、基础操作 在 Linux 终端中&#xff0c;你可以使用以下命令来清屏&#xff1a; clear 命令&am…

HW高水位问题及解决办法

一、问题描述及分析 应用业务反馈应用响应缓慢。登录数据库检查&#xff0c;发现数据库响应慢&#xff0c;有大量enq:HW–contention等待事件。结合awr报告和ash报告&#xff0c;发现整体等待时间消耗在推高水位线征用上&#xff0c;如下awr top事件&#xff1a;Ash消耗也是en…

数据恢复软件有哪些?分享10款好用的数据恢复软件

在数字化时代&#xff0c;数据的安全性和可恢复性变得至关重要。由于各种原因&#xff0c;如设备故障、误删、病毒攻击等&#xff0c;我们可能会面临数据丢失的风险。为了应对这种情况&#xff0c;市场上涌现出许多数据恢复软件。下面给大家分享10个好用的数据恢复软件&#xf…

[golang] 25 图片操作

用 “github.com/fogleman/gg” 可以画线, 框 用 “github.com/disintegration/imaging” 可以变换颜色 一 渲染 1.1 框和字 import "github.com/fogleman/gg"func DrawRectangles(inPath string, cRects []ColorTextRect, fnImgNameChange FnImgNameChange) (str…

C语言数据结构基础-单链表

1.链表概念 在前面的学习中&#xff0c;我们知道了线性表&#xff0c;其中逻辑结构与物理结构都连续的叫顺序表&#xff0c;那么&#xff1a; 链表是⼀种物理存储结构上⾮连续、⾮顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表 中的指针链接次序实现的 。 2.链表组…

线性稳压器电路,用于各种电视机、收录机、电子仪器、设备的稳压电源上,内置短路保护电路,热保护电路——78MXX

78MXX系列是用于各种电视机、收录机、电子仪器、设备的稳压电源电路。包括78M05、78M06、 78M08、 78M09、 78M10、 78M12、 78M15。 主要特点&#xff1a; ● 极限输出电流: 0.5A ● 固定输出电压: 5V、6V、8V、9V、10V、 12V、 15V ● 内置短路保护电路 ● 内置热保护电路 …

云原生精品资料合集(附下载)

云计算是产业数字化转型的关键基础设施,以基础设施资源为中心的云搬迁时代接近尾声&#xff0c;以应用价值为中心的云原生时代已经到&#xff0c;所以IT人员学习云原生正当时&#xff01;最近跟各位大神征集了云原生的教程&#xff0c;行业报告和最佳实践&#xff0c;总有一款适…