前言
我之前写了一篇快速上手ZK的文章:https://blog.csdn.net/qq_38974073/article/details/135293106
本篇最要是进一步加深学习ZK,算是一次简单的实践,巩固学习成果。
设计一个分布式锁
对锁的基本要求
- 可重入:允许同一个应用内的同一个线程重复调用同一个方法;
- 阻塞:没有拿到锁的线程将进入阻塞。
- 公平的:先来先得。
实现原理
使用zk作为发号器,每个线程申请锁时会创建一个临时有序节点:
- 节点编号最小的获得锁,完成业务操作之后删除临时节点;
- 如果不是最小编号的节点,就监听前一个节点的删除事件,并进入阻塞状态,当触发回调的事件时,唤醒阻塞线程,并重新进行获取锁操作。
锁要求实现的描述:
- 可重入:对同一个线程,不用重复获取锁,重入计数+1即可;
- 阻塞:利用
CountDownLatch
实现,当触发回调时唤醒线程; - 公平的:利用zk临时有序节点的特点进行排队,先到先申请锁。
问:申请到锁之后,网络中断怎么办?
- 临时节点随客户端关闭而被删除
问:如何避免羊群效应?
- 每个线程只监听前一个节点
关键流程
关键代码实现
锁的关键方法:
- 加锁:lock
- 解锁:unLock
- 尝试加锁:tryLock
public boolean lock() {
if(Thread.currentThread().equals(thread)) {
lockCount.incrementAndGet();
return true;
}
while (true) {
if (tryLock()) {
thread = Thread.currentThread();
lockCount.incrementAndGet();
return true;
}
try {
await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public synchronized boolean unlock() {
if (!thread.equals(Thread.currentThread())) {
return false;
}
int newLockCount = lockCount.decrementAndGet();
if (newLockCount < 0) {
throw new IllegalMonitorStateException("重入锁计数不可为负数" );
}
// 是否剩余重入次数
if (newLockCount != 0) {
return true;
}
// 到这一步,意味着lockCount已经为0,可以删除临时节点了
try{
if(client.isNodeExist(properties.getZkPath())) {
client.deleteNode(lockedPathMap.get(thread));
}
} catch (Exception e) {
return false;
} finally {
lockedPathMap.remove(thread);
priorPathMap.remove(thread);
}
return true;
}
protected boolean tryLock() {
String lockedPath = lockedPathMap.get(Thread.currentThread());
if (null == lockedPath || !client.isNodeExist(lockedPath)) {
lockedPathMap.put(Thread.currentThread(), lockedPath = client.createEphemeralSeqNode(getLockPrefix()));
}
// 取得加锁的排队编号
String lockedShortPath = getShorPath(lockedPath);
List<String> waiters = getWaiters();
// 如果自己是所有等待锁中的第一个,则获得锁
if (checkLocked(waiters, lockedShortPath)) {
return true;
}
// 当前线程节点是否在排队
int index = Collections.binarySearch(waiters, lockedShortPath);
if(index < 0) {
throw new NullPointerException("可能网络抖动,连接断开,临时节点失效");
}
// waiters最后面的节点写入map,用来监听
priorPathMap.put(Thread.currentThread(), getLockPrefix() + waiters.get(index - 1));
return false;
}
private boolean await() throws Exception {
String priorPath = priorPathMap.get(Thread.currentThread());
if (null == priorPath) {
throw new NullPointerException("prior_path error");
}
final CountDownLatch latch = new CountDownLatch(1);
// 删除事件
Watcher w = watchedEvent -> {
// 监测到前一个节点发生变化,接下来就可以唤起等待线程,重新尝试获取锁
latch.countDown();
};
try{
// 监听前一个节点的删除时间
client.watcher(w, priorPath);
} catch (KeeperException.NoNodeException e) {
e.printStackTrace();
return false;
}
return latch.await(properties.getTimeout(), TimeUnit.MILLISECONDS);
}
好了,如果你对这个感兴趣,不妨拉一下完整源码: https://gitee.com/liangshij/zk-lock-demo
源码简要说明
模块说明
- lsj-zk-lock:核心实现。
- lsj-zk-lock-spring-boot-starter:整合springboot
- lsj-zk-lock-test:使用demo
安装
经典三步走:导包、配置、使用
- 拉取代码,将lsj-zk-lock、lsj-zk-lock-spring-boot-starter通过 mvn install 命令安装到本地仓库。
- 引入依赖:
<dependency>
<groupId>cn.lsj</groupId>
<artifactId>lsj-zk-lock-spring-boot-starter</artifactId>
<version>2.4.2</version>
</dependency>
配置
- 配置locks和dataSource:
spring:
zk:
dataSource:
url: "localhost"
port: 2181
locks:
- zkPath: "/test/lock"
lockName: "countLock"
# 获取锁失败时,进入等待的时间,等待结束将重新尝试获取锁
timeout: 5000
- zkPath: "/test2/lock"
lockName: "lock"
timeout: 5000
使用
- 使用方式1:通过@GlobalLock注解,指定要使用那个lock
@GetMapping("test2")
@GlobalLock("countLock")
public String test2() {
// 业务代码
return "";
}
- 使用方式2:通过@Qualifier注解,指定要使用那个lock
@RestController
public class TestController {
int count = 0;
@Resource
@Qualifier("lock")
private ReentrantLock lock;
@Resource
@Qualifier("countLock")
private ReentrantLock countLock;
@GetMapping("test")
public String test() {
countLock.lock();
try{
for (int i = 0; i < 10000; i++) {
count++;
}
} finally {
countLock.unlock();
}
return String.valueOf(count);
}
}