分布式锁实现
一、概述
分布式锁是分布式系统中的一个重要组件,用于协调分布式环境下的资源访问和并发控制。我们将从锁设计、死锁预防、性能优化和容错处理四个维度深入学习。
学习目标
维度 | 重点内容 | 掌握程度 |
---|---|---|
锁设计 | 基于Redis/etcd的锁实现原理 | 必须掌握 |
死锁预防 | 超时机制、重入机制 | 必须掌握 |
性能优化 | 锁粒度控制、读写分离 | 重点掌握 |
容错处理 | 节点故障、网络分区 | 重点掌握 |
二、实现流程图
三、基础锁实现
让我们首先实现一个基于Redis的分布式锁基础版本:
package distlock
import (
"context"
"crypto/rand"
"encoding/base64"
"errors"
"time"
"github.com/go-redis/redis/v8"
)
type DistributedLock struct {
client *redis.Client
key string
value string
expiration time.Duration
}
// NewDistributedLock 创建一个新的分布式锁实例
func NewDistributedLock(client *redis.Client, key string, expiration time.Duration) (*DistributedLock, error) {
// 生成随机值作为锁的标识
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return nil, err
}
value := base64.StdEncoding.EncodeToString(b)
return &DistributedLock{
client: client,
key: key,
value: value,
expiration: expiration,
}, nil
}
// TryLock 尝试获取锁
func (dl *DistributedLock) TryLock(ctx context.Context) (bool, error) {
return dl.client.SetNX(ctx, dl.key, dl.value, dl.expiration).Result()
}
// Unlock 释放锁
func (dl *DistributedLock) Unlock(ctx context.Context) error {
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
result, err := dl.client.Eval(ctx, script, []string{dl.key}, dl.value).Result()
if err != nil {
return err
}
if result == 0 {
return errors.New("lock not held")
}
return nil
}
// RefreshLock 刷新锁的过期时间
func (dl *DistributedLock) RefreshLock(ctx context.Context) error {
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
else
return 0
end
`
result, err := dl.client.Eval(
ctx,
script,
[]string{dl.key},
dl.value,
dl.expiration.Milliseconds(),
).Result()
if err != nil {
return err
}
if result == 0 {
return errors.New("lock not held")
}
return nil
}
// IsLocked 检查锁是否被持有
func (dl *DistributedLock) IsLocked(ctx context.Context) (bool, error) {
exists, err := dl.client.Exists(ctx, dl.key).Result()
if err != nil {
return false, err
}
return exists == 1, nil
}
四、增强版锁实现(带可重入特性)
下面是一个支持可重入的分布式锁实现:
package distlock
import (
"context"
"encoding/json"
"errors"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
type LockInfo struct {
Owner string `json:"owner"`
Count int `json:"count"`
Timestamp int64 `json:"timestamp"`
}
type ReentrantLock struct {
client *redis.Client
key string
owner string
expiration time.Duration
mu sync.Mutex
}
// NewReentrantLock 创建可重入锁
func NewReentrantLock(client *redis.Client, key string, owner string, expiration time.Duration) *ReentrantLock {
return &ReentrantLock{
client: client,
key: key,
owner: owner,
expiration: expiration,
}
}
// Lock 获取可重入锁
func (rl *ReentrantLock) Lock(ctx context.Context) error {
rl.mu.Lock()
defer rl.mu.Unlock()
script := `
local lockInfo = redis.call('get', KEYS[1])
if not lockInfo then
-- 锁不存在,创建新锁
redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])
return 1
end
local info = cjson.decode(lockInfo)
if info.owner == ARGV[3] then
-- 重入锁
info.count = info.count + 1
info.timestamp = tonumber(ARGV[4])
redis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])
return 1
end
return 0
`
lockInfo := LockInfo{
Owner: rl.owner,
Count: 1,
Timestamp: time.Now().UnixNano(),
}
lockInfoJSON, err := json.Marshal(lockInfo)
if err != nil {
return err
}
result, err := rl.client.Eval(
ctx,
script,
[]string{rl.key},
string(lockInfoJSON),
rl.expiration.Milliseconds(),
rl.owner,
time.Now().UnixNano(),
).Result()
if err != nil {
return err
}
if result.(int64) == 0 {
return errors.New("failed to acquire lock")
}
return nil
}
// Unlock 释放可重入锁
func (rl *ReentrantLock) Unlock(ctx context.Context) error {
rl.mu.Lock()
defer rl.mu.Unlock()
script := `
local lockInfo = redis.call('get', KEYS[1])
if not lockInfo then
return 0
end
local info = cjson.decode(lockInfo)
if info.owner ~= ARGV[1] then
return -1
end
info.count = info.count - 1
if info.count <= 0 then
redis.call('del', KEYS[1])
return 1
else
redis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])
return 1
end
`
result, err := rl.client.Eval(
ctx,
script,
[]string{rl.key},
rl.owner,
rl.expiration.Milliseconds(),
).Result()
if err != nil {
return err
}
switch result.(int64) {
case -1:
return errors.New("lock held by another owner")
case 0:
return errors.New("lock not held")
default:
return nil
}
}
// RefreshLock 刷新锁的过期时间
func (rl *ReentrantLock) RefreshLock(ctx context.Context) error {
script := `
local lockInfo = redis.call('get', KEYS[1])
if not lockInfo then
return 0
end
local info = cjson.decode(lockInfo)
if info.owner ~= ARGV[1] then
return 0
end
info.timestamp = tonumber(ARGV[3])
redis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])
return 1
`
result, err := rl.client.Eval(
ctx,
script,
[]string{rl.key},
rl.owner,
rl.expiration.Milliseconds(),
time.Now().UnixNano(),
).Result()
if err != nil {
return err
}
if result.(int64) == 0 {
return errors.New("lock not held")
}
return nil
}
五、死锁预防机制
1. 超时机制
- 所有锁操作都设置了过期时间
- 使用看门狗机制自动续期
- 防止客户端崩溃导致的死锁
2. 死锁检测
检测项 | 处理方式 | 实现难度 |
---|---|---|
循环等待 | 资源有序分配 | 中等 |
持有等待 | 一次性申请所有资源 | 简单 |
不可剥夺 | 超时自动释放 | 简单 |
互斥访问 | 读写分离 | 较难 |
六、性能优化策略
1. 锁粒度优化
- 降低锁粒度,提高并发度
- 使用多粒度锁机制
- 实现分段锁
2. 读写分离
package distlock
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
type RWLock struct {
client *redis.Client
key string
owner string
expiration time.Duration
}
func NewRWLock(client *redis.Client, key string, owner string, expiration time.Duration) *RWLock {
return &RWLock{
client: client,
key: key,
owner: owner,
expiration: expiration,
}
}
// RLock 获取读锁
func (rwl *RWLock) RLock(ctx context.Context) error {
script := `
-- 检查是否存在写锁
if redis.call('exists', KEYS[1] .. ':write') == 1 then
return 0
end
-- 增加读锁计数
local count = redis.call('incr', KEYS[1] .. ':read')
redis.call('pexpire', KEYS[1] .. ':read', ARGV[1])
-- 记录读锁持有者
redis.call('hset', KEYS[1] .. ':readers', ARGV[2], '1')
redis.call('pexpire', KEYS[1] .. ':readers', ARGV[1])
return 1
`
result, err := rwl.client.Eval(
ctx,
script,
[]string{rwl.key},
rwl.expiration.Milliseconds(),
rwl.owner,
).Result()
if err != nil {
return fmt.Errorf("failed to acquire read lock: %v", err)
}
if result.(int64) == 0 {
return fmt.Errorf("write lock exists")
}
return nil
}
// RUnlock 释放读锁
func (rwl *RWLock) RUnlock(ctx context.Context) error {
script := `
-- 检查读锁是否存在
if redis.call('exists', KEYS[1] .. ':read') == 0 then
return 0
end
-- 检查当前客户端是否持有读锁
if redis.call('hexists', KEYS[1] .. ':readers', ARGV[1]) == 0 then
return -1
end
-- 移除读锁持有者记录
redis.call('hdel', KEYS[1] .. ':readers', ARGV[1])
-- 减少读锁计数
local count = redis.call('decr', KEYS[1] .. ':read')
if count <= 0 then
redis.call('del', KEYS[1] .. ':read')
redis.call('del', KEYS[1] .. ':readers')
end
return 1
`
result, err := rwl.client.Eval(
ctx,
script,
[]string{rwl.key},
rwl.owner,
).Result()
if err != nil {
return fmt.Errorf("failed to release read lock: %v", err)
}
switch result.(int64) {
case -1:
return fmt.Errorf("read lock not held by this client")
case 0:
return fmt.Errorf("read lock not exists")
default:
return nil
}
}
// Lock 获取写锁
func (rwl *RWLock) Lock(ctx context.Context) error {
script := `
-- 检查是否存在读锁或写锁
if redis.call('exists', KEYS[1] .. ':read') == 1 or
redis.call('exists', KEYS[1] .. ':write') == 1 then
return 0
end
-- 设置写锁
redis.call('set', KEYS[1] .. ':write', ARGV[1], 'PX', ARGV[2])
return 1
`
result, err := rwl.client.Eval(
ctx,
script,
[]string{rwl.key},
rwl.owner,
rwl.expiration.Milliseconds(),
).Result()
if err != nil {
return fmt.Errorf("failed to acquire write lock: %v", err)
}
if result.(int64) == 0 {
return fmt.Errorf("lock exists")
}
return nil
}
// Unlock 释放写锁
func (rwl *RWLock) Unlock(ctx context.Context) error {
script := `
-- 检查写锁是否存在且属于当前客户端
local value = redis.call('get', KEYS[1] .. ':write')
if not value then
return 0
end
if value ~= ARGV[1] then
return -1
end
-- 删除写锁
redis.call('del', KEYS[1] .. ':write')
return 1
`
result, err := rwl.client.Eval(
ctx,
script,
[]string{rwl.key},
rwl.owner,
).Result()
if err != nil {
return fmt.Errorf("failed to release write lock: %v", err)
}
switch result.(int64) {
case -1:
return fmt.Errorf("write lock not held by this client")
case 0:
return fmt.Errorf("write lock not exists")
default:
return nil
}
}
七、容错处理
1. 容错机制设计
2. 故障处理实现
package distlock
import (
"context"
"errors"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
type FaultTolerantLock struct {
master *redis.Client
slaves []*redis.Client
localLock sync.Mutex
key string
owner string
expiration time.Duration
}
func NewFaultTolerantLock(
master *redis.Client,
slaves []*redis.Client,
key string,
owner string,
expiration time.Duration,
) *FaultTolerantLock {
return &FaultTolerantLock{
master: master,
slaves: slaves,
key: key,
owner: owner,
expiration: expiration,
}
}
// Lock 获取容错锁
func (ftl *FaultTolerantLock) Lock(ctx context.Context) error {
// 1. 尝试在主节点获取锁
if err := ftl.tryLockOnMaster(ctx); err == nil {
return nil
}
// 2. 主节点失败,尝试在从节点获取锁
if err := ftl.tryLockOnSlaves(ctx); err == nil {
return nil
}
// 3. 所有Redis节点都失败,降级使用本地锁
ftl.localLock.Lock()
// 4. 启动后台协程尝试恢复到Redis锁
go ftl.tryRecoverToRedis(context.Background())
return nil
}
func (ftl *FaultTolerantLock) tryLockOnMaster(ctx context.Context) error {
script := `
if redis.call('exists', KEYS[1]) == 0 then
redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])
return 1
end
return 0
`
result, err := ftl.master.Eval(
ctx,
script,
[]string{ftl.key},
ftl.owner,
ftl.expiration.Milliseconds(),
).Result()
if err != nil {
return err
}
if result.(int64) == 0 {
return errors.New("lock exists")
}
return nil
}
func (ftl *FaultTolerantLock) tryLockOnSlaves(ctx context.Context) error {
// 需要在多数从节点上获取锁才算成功
successCount := 0
majorityCount := (len(ftl.slaves) / 2) + 1
for _, slave := range ftl.slaves {
if err := ftl.tryLockOnNode(ctx, slave); err == nil {
successCount++
if successCount >= majorityCount {
return nil
}
}
}
return errors.New("failed to acquire lock on majority of slaves")
}
func (ftl *FaultTolerantLock) tryLockOnNode(ctx context.Context, node *redis.Client) error {
script := `
if redis.call('exists', KEYS[1]) == 0 then
redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])
return 1
end
return 0
`
result, err := node.Eval(
ctx,
script,
[]string{ftl.key},
ftl.owner,
ftl.expiration.Milliseconds(),
).Result()
if err != nil {
return err
}
if result.(int64) == 0 {
return errors.New("lock exists")
}
return nil
}
func (ftl *FaultTolerantLock) tryRecoverToRedis(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 尝试恢复到Redis主节点
if err := ftl.tryLockOnMaster(ctx); err == nil {
ftl.localLock.Unlock()
return
}
// 尝试恢复到Redis从节点
if err := ftl.tryLockOnSlaves(ctx); err == nil {
ftl.localLock.Unlock()
return
}
}
}
}
// Unlock 释放锁
func (ftl *FaultTolerantLock) Unlock(ctx context.Context) error {
// 尝试释放Redis锁
if err := ftl.unlockRedis(ctx); err == nil {
return nil
}
// Redis释放失败,释放本地锁
ftl.localLock.Unlock()
return nil
}
func (ftl *FaultTolerantLock) unlockRedis(ctx context.Context) error {
script := `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
`
// 先尝试在主节点释放
result, err := ftl.master.Eval(
ctx,
script,
[]string{ftl.key},
ftl.owner,
).Result()
if err == nil && result.(int64) == 1 {
return nil
}
// 主节点释放失败,尝试在从节点释放
for _, slave := range ftl.slaves {
result, err = slave.Eval(
ctx,
script,
[]string{ftl.key},
ftl.owner,
).Result()
if err == nil && result.(int64) == 1 {
return nil
}
}
return errors.New("failed to release lock on all nodes")
}
八、性能测试与监控
1. 性能指标
指标 | 说明 | 目标值 |
---|---|---|
获取锁延迟 | 从发起请求到获取锁的时间 | <50ms |
释放锁延迟 | 从发起释放到完成的时间 | <30ms |
锁冲突率 | 获取锁失败的比例 | <10% |
QPS | 每秒处理的锁请求数 | >1000 |
2. 监控指标
-
系统监控
- CPU使用率
- 内存使用
- 网络延迟
- 磁盘IO
-
业务监控
- 锁获取成功率
- 锁超时次数
- 死锁检测次数
- 降级次数
九、最佳实践总结
-
锁设计
- 使用唯一标识确保锁的归属
- 合理设置超时时间
- 实现可重入机制
- 使用Lua脚本保证原子性
-
死锁预防
- 实现超时自动释放
- 避免循环等待
- 实现锁的重入
- 定期检测死锁
-
性能优化
- 使用读写锁分离
- 控制锁粒度
- 批量处理
- 使用本地缓存
-
容错处理
- 实现主从切换
- 支持优雅降级
- 异步恢复机制
- 多副本数据同步
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!