文章目录
- 一、概述
- 二、导入依赖包
- 三、创建锁的过程
- 3.1 通过 create 创建节点信息
- 3.2 AsyncCallback.StringCallback 回调函数
- 3.3 AsyncCallback.Children2Callback 的回调函数
- 3.4 Watcher 的回调函数
- 四、完整示例
- 4.1 完整分布式锁代码
- 4.2 测试类
如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明,Zookeeper Java 开发入门。
一、概述
-
情景:假设有10个客户端(分散的10台主机)要执行一个任务,这个任务某些过程需要保持原子性。那么我们就需要一个分布式锁。
-
原理:通过在Zookeeper中创建序列节点来实现获得锁,删除节点来释放锁。其实质是一个按先来后到的排序过程,实现过程如下:
-
客户端发起请求,创建锁序列节点(/lock/xxxxxxxx)
-
获取所有锁节点,判断自己是否为最小节点
- 如果自己是最小序列节点,则立即获得锁。
- 否则不能获得锁,但要监控前一个序列节点的状态。
-
获得锁的客户端开始执行任务。
-
执行完任务后释放锁。
-
由于后一个节点监控了前一个节点,当前一个节点删除时,后一个客户端会收到回调。
-
在这个回调中再获取所有锁节点,判断自己是否为最小节点。
-
以此类推,直到全部结束。
-
-
-
流程如下
- 如果您对没有做过 Zookeeper 开发,强烈建立先看 Zookeeper Java 开发入门。
二、导入依赖包
-
在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.8.2</version> </dependency>
三、创建锁的过程
3.1 通过 create 创建节点信息
- 通过 create 创建序列节点信息。他是异步方式,创建成功后会调用 AsyncCallback.StringCallback.processResult 回调函数。
public void lock() throws InterruptedException, LockException {
zooKeeper.create("/lock", "xxx".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);
countDownLatch.await();
if(StringUtils.isEmpty(this.lockNodePath)){
throw new LockException("创建锁失败");
}
System.out.println(this.appName + " 获得锁");
}
3.2 AsyncCallback.StringCallback 回调函数
- 在 AsyncCallback.StringCallback 的回调函数中通过 getChildren 方法获取 ZooKeeper 锁节点下的所有节点信息。这个方法是异步的,调用成功后会调用 AsyncCallback.Children2Callback.processResult 回调函数。
// AsyncCallback.StringCallback
@Override
public void processResult(int i, String s, Object o, String s1) {
if(StringUtils.isEmpty(s1)){
// 这里是创建锁失败的情况。
this.countDownLatch.countDown();
return;
}
System.out.println(this.appName + " create lock node="+s1);
this.lockNodePath = s1;
// 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。
zooKeeper.getChildren("/", false, this, context);
}
3.3 AsyncCallback.Children2Callback 的回调函数
- 在 AsyncCallback.Children2Callback 的回调函数判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。监控前一个节点信息使用 exists 方法,这个方法设置了 Watcher 的 processResult 回调函数
// AsyncCallback.Children2Callback
@Override
public void processResult(int i, String s, Object o, List<String> list, Stat stat) {
Collections.sort(list);
// for (String s1 : list) {
// System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
// }
int index = list.indexOf(lockNodePath.substring(1));
if(0 == index){
// 如果我现在是第一个节点,则获得锁
try {
zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
this.countDownLatch.countDown();
}
else {
// 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)
String watchNodePath = "/" + list.get(index - 1);
System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);
zooKeeper.exists(watchNodePath, this, new StatCallback() {
@Override
public void processResult(int i, String s, Object o, Stat stat) {
}
}, context);
}
}
3.4 Watcher 的回调函数
- 在 Watcher 的回调函数,我们通过判断 watchedEvent.getType() 为 NodeDeleted 类型时,重新获取 ZooKeeper 锁节点下的所有节点信息,这使得消息回到了 “3.3”步,判断谁是第一个节点,然后获得得,完成整个流程。
// Watcher
@Override
public void process(WatchedEvent watchedEvent) {
switch (watchedEvent.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zooKeeper.getChildren("/", false, this, context);
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
四、完整示例
4.1 完整分布式锁代码
package top.yiqifu.study.p131;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZookeeperLock implements Watcher, AsyncCallback.StringCallback,
AsyncCallback.Children2Callback {
private String appName;
private ZooKeeper zooKeeper;
private Object context;
private String lockNodePath;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public ZookeeperLock(String name, ZooKeeper zk){
this.appName = name;
this.zooKeeper = zk;
this.context = this;
}
public void lock() throws InterruptedException, LockException {
zooKeeper.create("/lock", "xxx".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);
countDownLatch.await();
if(StringUtils.isEmpty(this.lockNodePath)){
throw new LockException("创建锁失败");
}
System.out.println(this.appName + " 获得锁");
}
public void unlock() throws KeeperException, InterruptedException, LockException {
if(StringUtils.isEmpty(this.lockNodePath)){
throw new LockException("没有获得锁,无法释放锁");
}
zooKeeper.delete(lockNodePath, -1);
System.out.println(this.appName + " 释放锁");
}
// AsyncCallback.StringCallback
@Override
public void processResult(int i, String s, Object o, String s1) {
if(StringUtils.isEmpty(s1)){
// 这里是创建锁失败的情况。
this.countDownLatch.countDown();
return;
}
System.out.println(this.appName + " create lock node="+s1);
this.lockNodePath = s1;
// 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。
zooKeeper.getChildren("/", false, this, context);
}
// AsyncCallback.Children2Callback
@Override
public void processResult(int i, String s, Object o, List<String> list, Stat stat) {
Collections.sort(list);
// for (String s1 : list) {
// System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
// }
int index = list.indexOf(lockNodePath.substring(1));
if(0 == index){
// 如果我现在是第一个节点,则获得锁
try {
zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
this.countDownLatch.countDown();
}
else {
// 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)
String watchNodePath = "/" + list.get(index - 1);
System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);
zooKeeper.exists(watchNodePath, this, new StatCallback() {
@Override
public void processResult(int i, String s, Object o, Stat stat) {
}
}, context);
}
}
// Watcher
@Override
public void process(WatchedEvent watchedEvent) {
switch (watchedEvent.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zooKeeper.getChildren("/", false, this, context);
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
public class LockException extends Exception
{
public LockException(String message){
super(message);
}
}
}
4.2 测试类
package top.yiqifu.study.p131;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class Test06_ZookeeperLock {
public static void main(String[] args) {
try {
// 创建 ZooKeeper 对象
final ZooKeeper zooKeeper = testCreateZookeeper();
int clientCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(clientCount);
for (int i = 0; i < clientCount; i++) {
new Thread(new Runnable(){
@Override
public void run() {
TestLock(zooKeeper);
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
private static void TestLock(ZooKeeper zooKeeper){
try {
String appName = Thread.currentThread().getName();
ZookeeperLock zookeeperLock = new ZookeeperLock(appName, zooKeeper);
// 加锁(获得分布式锁)
zookeeperLock.lock();
System.out.println(appName + " 执行任务");
Thread.sleep(1000);
// 释放锁
zookeeperLock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (ZookeeperLock.LockException e) {
e.printStackTrace();
}
}
private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
// ZooKeeper 集群地址(没连接池的概念,是Session的概念)
//String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181/aaa";
// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
Integer sessionTimeout = 3000;
// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
Event.KeeperState state = watchedEvent.getState();
Event.EventType type = watchedEvent.getType();
String path = watchedEvent.getPath();
switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
countDownLatch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
System.out.println("Session watch state=" + state);
System.out.println("Session watch type=" + type);
System.out.println("Session watch path=" + path);
} catch (Exception e) {
e.printStackTrace();
}
}
});
countDownLatch.await();
ZooKeeper.States state = zooKeeper.getState();
switch (state) {
case CONNECTING:
break;
case ASSOCIATING:
break;
case CONNECTED:
break;
case CONNECTEDREADONLY:
break;
case CLOSED:
break;
case AUTH_FAILED:
break;
case NOT_CONNECTED:
break;
}
System.out.println("ZooKeeper state=" + state);
return zooKeeper;
}
}