分布式锁
分布式锁是在分布式系统中用于协调多个节点之间对共享资源的访问的一种机制。个人认为实现分布式锁,需要一个中间件例如数据库,redis等等这样的存储锁即可实现分布式锁。
分布式锁实现方案
-
基于数据库(唯一索引)
-
基于内存(redis,memcache)
-
zookeeper
-
…
数据库
使用数据库的事务特性来实现分布式锁。
通过在数据库中创建一个表,将锁状态存储在表的行中,使用数据库的事务来确保对该行的操作是原子的。当一个节点想要获取锁时,尝试插入一行记录,如果插入成功则获取到锁,否则表示锁已被其他节点持有。
创建数据库表用于存储锁的状态:
CREATE TABLE distributed_lock (
lock_name VARCHAR(255) PRIMARY KEY,
is_locked BOOLEAN NOT NULL DEFAULT FALSE
);
Java代码实现获取和释放分布式锁
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class DistributedLock {
private static final String JDBC_URL = "jdbc:mysql://localhost:3306/your_database";
private static final String JDBC_USER = "your_username";
private static final String JDBC_PASSWORD = "your_password";
private String lockName;
private Connection connection;
public DistributedLock(String lockName) {
this.lockName = lockName;
try {
this.connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize database connection", e);
}
}
public boolean acquireLock() {
try {
// Start a transaction
connection.setAutoCommit(false);
// Try to insert a new lock record
try (PreparedStatement insertStatement = connection.prepareStatement(
"INSERT INTO distributed_lock (lock_name, is_locked) VALUES (?, TRUE)"
)) {
insertStatement.setString(1, lockName);
int affectedRows = insertStatement.executeUpdate();
// If affectedRows is 1, the lock is acquired
if (affectedRows == 1) {
connection.commit();
return true;
}
}
// If a lock record already exists, it means the lock is held by another process
connection.rollback();
return false;
} catch (SQLException e) {
try {
connection.rollback();
} catch (SQLException rollbackException) {
// Handle rollback exception
}
throw new RuntimeException("Error while acquiring lock", e);
} finally {
try {
connection.setAutoCommit(true);
} catch (SQLException e) {
// Handle setAutoCommit exception
}
}
}
public void releaseLock() {
try (PreparedStatement deleteStatement = connection.prepareStatement(
"DELETE FROM distributed_lock WHERE lock_name = ?"
)) {
deleteStatement.setString(1, lockName);
deleteStatement.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException("Error while releasing lock", e);
}
}
public void closeConnection() {
try {
if (connection != null && !connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
// Handle close connection exception
}
}
}
基于缓存
这里主要讲基于Redis的分布式锁。
setNX + Lua脚本
在Redis中,你可以使用Lua脚本将SETNX和EXPIRE两个操作打包在一起,从而实现原子性。
local key = KEYS[1]
local value = ARGV[1]
local ttl = ARGV[2]
local lockSet = redis.call('setnx', key, value)
if lockSet == 1 then
redis.call('expire', key, ttl)
end
return lockSet
- 尝试使用SETNX命令设置一个锁。如果锁已经存在,那么SETNX命令会返回0,如果锁不存在,那么SETNX命令会返回1,并且设置锁的值为
value
。 - 如果上一步设置锁成功(即SETNX命令返回1),那么使用EXPIRE命令设置锁的过期时间。过期时间是
ttl
。 - 最后,返回SETNX命令的结果。如果结果为1,那么表示成功获取到了锁,如果结果为0,那么表示获取锁失败。
客户端调用脚本:
EVAL <lua_script> 1 lock_key lock_value lock_ttl
Redisson + RLock可重入锁
Redisson是一个分布式协调Redis客服端,实现了大部分java环境下分布式对象。
代码实现:
Config config = new Config();
config.useSingleServer().setAddress("redis://ip:6378").setPassword("root");
Redisson redisson = Redisson.create(config);
// 使用redisson自带的分布式锁
RLock redisLock = redissonClient.getLock("junfeng");
if (!redisLock.tryLock()) {
return Result.fail("禁止重复参与!");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
redisLock.unlock();
}
redissonLock 比setnx有哪些优势
- 实现了可重入
- 锁续约机制
- 实现多节点保存同一把锁,防止主从不一致问题
基于ZooKeeper
- 使用ZooKeeper分布式协调服务。
- 利用ZooKeeper的顺序临时节点(Sequential Ephemeral Node)创建有序节点,确保节点顺序代表获取锁的顺序。
- 最小的节点代表获取了锁,其他节点监听前一个节点的删除事件。