文章目录
- 为什么需要分布式锁?
- go语言分布式锁的实现
- Redis
- 自己的实现
- 红锁是什么
- 别人的带红锁的实现
- etcd
- zk的实现
- 面试问题
- 什么是分布式锁?你用过分布式锁吗?
- 你使用的分布式锁性能如何,可以优化吗?
- 怎么用Redis来实现一个分布式锁?
- 怎么确定分布式锁的过期时间?
- 如果分布式锁过期了,但是业务还没有执行完毕,怎么办?
- 加锁的时候得到了超时响应,怎么办?
- 加锁的时候如果锁被人持有了,这时候怎么办?
- 分布式锁为什么要续约?续约失败了怎么办?如果重试一直都失败,怎么办?
- 怎么减少分布式锁竞争?
- 你知道redlock是什么吗?
为什么需要分布式锁?
保证分布式系统并发请求或不同服务实例操作共享资源的安全性,确保在同一时间内,仅有一个进程能够修改共享资源,例如数据库记录或文件,主要用于解决分布式环境中的数据一致性和并发控制问题。 应用场景:用户下单,库存扣减,余额扣减。我们的场景:防止用户信息多写,防止重复发送邮件,防止重复设置定时任务。
- 使用分布式锁可能会对性能产生一定的影响,但这是为了确保数据的一致性和正确性所必需的;如果操作是操作是幂等的(即使多次执行也会产生相同的结果),可能不需要分布式锁
go语言分布式锁的实现
Redis
https://github.com/zeromicro/go-zero go-zero里已经实现了redislock,但没有续约机制
自己的实现
// 需要实现的能力
// 1.排他性、原子性
// 2.主动释放/自动释放
// 3.可重入
// 4.可续约
package r_lc
import (
"context"
"errors"
"github.com/go-redis/redis/v8"
"time"
"github.com/google/uuid"
)
// RLc 基于redis的分布式锁
type RLc struct {
rdb *redis.Client
// key 锁标识
key string
// lcTag 唯一标识,防止串锁
lcTag string
// expiresIn 过期时间
expiresIn time.Duration
// releaseCh 锁释放信号 (看门狗)
releaseCh chan struct{}
// RetryInterval LockWait重试锁的间隔。默认100ms
RetryInterval time.Duration
// RenewInterval 续约锁间隔,默认为expiresIn/2
RenewInterval time.Duration
// MaxRenewDur 自动续约最长时间。默认1小时,当expiresIn大于1小时,为expiresIn
MaxRenewDur time.Duration
}
type RlcOpt func(lc *RLc)
const (
retryIntervalDefault = 100 * time.Millisecond
maxRenewDurDefault = time.Hour
)
// LUA脚本
var (
// tryLockLua
// return 0. 加锁失败
// return >0. 加锁成功,当前锁的数量
tryLockLua = `
local key = KEYS[1]
local val = ARGV[1]
local expiresIn = ARGV[2]
-- 锁不存在,加锁
if redis.call('EXISTS', key) == 0 then
redis.call('HINCRBY', key, val, 1)
redis.call('PEXPIRE', key, expiresIn)
return 1
end
-- 锁存在,判断持有锁,增加加锁次数 (可重入)
if redis.call('HEXISTS', key, val) == 1 then
return redis.call('HINCRBY', key, val, 1)
end
-- 锁被其他进程占用
return 0
`
// unlockLua
// return > 0. 剩余待解锁次数
// return = 0. 解锁成功
// return = -1. 锁不存在 | 未持有锁
unlockLua = `
local key = KEYS[1]
local val = ARGV[1]
-- 锁不存在或未持有锁
if redis.call('HEXISTS', key, val) == 0 then
return -1
end
-- 按次数解锁
local count = redis.call('HINCRBY', key, val, -1)
if count <= 0 then
-- 全部解锁
redis.call("DEL",key)
return 0
end
-- 剩余待解锁次数
return count
`
// renewLua
// return 0. 续约失败
// return 1. 续约成功
renewLua = `
local key = KEYS[1]
local val = ARGV[1]
local expiresIn = ARGV[2]
-- 锁不存在或未持有锁
if redis.call('HEXISTS', key, val) == 0 then
return 0
end
-- 设置过期时间
return redis.call('PEXPIRE', key, expiresIn)
`
)
var (
ErrLostKey = errors.New("lost key") // 锁不存在或被其他进程占用
)
func NewRLc(rdb *redis.Client, key string, expiresIn time.Duration, opts ...RlcOpt) *RLc {
lc := &RLc{
rdb: rdb,
key: key,
lcTag: uuid.New().String(),
expiresIn: expiresIn,
releaseCh: make(chan struct{}),
}
for _, opt := range opts {
opt(lc)
}
if lc.RetryInterval == 0 {
lc.RetryInterval = retryIntervalDefault
}
if lc.RenewInterval == 0 {
lc.RenewInterval = lc.expiresIn / 2
}
if lc.MaxRenewDur == 0 {
lc.MaxRenewDur = maxRenewDurDefault
if lc.MaxRenewDur < lc.expiresIn {
lc.MaxRenewDur = lc.expiresIn
}
}
return lc
}
// TryLock 尝试锁
func (lc *RLc) TryLock(ctx context.Context) (lcNum int, res bool) {
lua := redis.NewScript(tryLockLua)
lcNum, _ = lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag, lc.expiresIn.Milliseconds()).Int()
if lcNum == 0 {
return 0, false
}
return lcNum, true
}
// LockWait 尝试锁并等待
func (lc *RLc) LockWait(ctx context.Context, wait time.Duration) (lcNum int, res bool) {
ctx, cancel := context.WithTimeout(ctx, wait)
defer cancel()
jumpEnd:
for {
select {
case <-ctx.Done():
break jumpEnd
case <-lc.releaseCh:
break jumpEnd
default:
lcNum, res = lc.TryLock(ctx)
if res {
return
}
time.Sleep(lc.RetryInterval)
}
}
return 0, false
}
// Unlock 解锁
func (lc *RLc) Unlock(ctx context.Context) (leftLcNum int, err error) {
lua := redis.NewScript(unlockLua)
leftLcNum, err = lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag).Int()
if err != nil {
return 0, err
}
if leftLcNum < 0 {
return 0, ErrLostKey
}
if leftLcNum == 0 {
close(lc.releaseCh)
}
return
}
// Renew 续约
// expiresIn=0时,会使用初始化时设定的expiresIn
func (lc *RLc) Renew(ctx context.Context) {
// 限制续约最大持续时间,减少协程泄露影响
ctx, cancel := context.WithTimeout(ctx, lc.MaxRenewDur)
// 续约
go func(lc *RLc) {
for {
select {
case <-ctx.Done():
return
case <-lc.releaseCh:
cancel()
return
default:
lua := redis.NewScript(renewLua)
res, _ := lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag, lc.expiresIn.Milliseconds()).Int()
if res == 0 {
cancel()
}
time.Sleep(lc.RenewInterval)
}
}
}(lc)
return
}
单元测试
package r_lc
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"runtime"
"testing"
"time"
)
func getRdb() (rdb *redis.Client, err error) {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 1,
})
_, err = rdb.Ping(context.TODO()).Result()
if err != nil {
return
}
return
}
func TestLockWait(t *testing.T) {
rdb, err := getRdb()
if err != nil {
t.Fatal(err)
}
t.Run("lock1", func(t1 *testing.T) {
go func() {
lockWaitFunc(t1, rdb, 10*time.Second)
}()
})
time.Sleep(10 * time.Millisecond)
t.Run("lock2", func(t2 *testing.T) {
go func() {
lockWaitFunc(t2, rdb, 5*time.Second)
}()
})
fmt.Println("0s NumGoroutine", runtime.NumGoroutine())
time.Sleep(3 * time.Second)
fmt.Println("3s NumGoroutine", runtime.NumGoroutine()) // 续约协程启动
time.Sleep(2 * time.Second)
fmt.Println("5s NumGoroutine", runtime.NumGoroutine()) // lock2 续约协程释放
time.Sleep(5 * time.Second)
fmt.Println("10s NumGoroutine", runtime.NumGoroutine()) // lock1 续约协程保持
time.Sleep(5 * time.Second)
fmt.Println("15s NumGoroutine", runtime.NumGoroutine()) // lock1 续约协程释放
time.Sleep(2 * time.Second)
}
func TestRetry(t *testing.T) {
rdb, err := getRdb()
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
// 初始化锁信息
lc := NewRLc(rdb, "test-lock", 5*time.Second, func(lc *RLc) {
lc.MaxRenewDur = time.Second * 10
lc.RenewInterval = time.Second * 5
lc.RetryInterval = 10 * time.Millisecond
})
lc2 := NewRLc(rdb, "test-lock", 5*time.Second, func(lc *RLc) {
lc.MaxRenewDur = time.Second * 10
lc.RenewInterval = time.Second * 5
lc.RetryInterval = 10 * time.Millisecond
})
// 启动续约
lc.Renew(ctx)
fmt.Println(lc.TryLock(ctx))
fmt.Println(lc.TryLock(ctx))
fmt.Println(lc.TryLock(ctx))
time.Sleep(5 * time.Second)
fmt.Println("wwwww")
fmt.Println(lc2.LockWait(ctx, 10*time.Second))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
return
}
func lockWaitFunc(t *testing.T, rdb *redis.Client, wait time.Duration) {
ctx := context.Background()
// 初始化锁信息
lc := NewRLc(rdb, "test-lock", 15*time.Second)
// 阻塞式获取锁
_, getLock := lc.LockWait(ctx, wait)
if getLock == false {
fmt.Println("获取锁超时")
t.Log("获取锁超时")
return
}
defer lc.Unlock(ctx)
// 启动续约
lc.Renew(ctx)
// 处理业务代码
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
}
return
}
使用分布式锁
// 加分布式锁
contxt := ctx.GetSpanCtx()
lc := r_lc.NewRLc(server.GetRedisClient(), config.GetDistributedLockKey(fmt.Sprintf("user_info:%s", wsId)), 30*time.Second)
_, lcRes := lc.LockWait(contxt, 30*time.Second)
if lcRes == false {
err = my_err.WrapF(err, Err.ErrCodeSysRequestTimeout, "user_info lc.LockWait wsId:%s", wsId)
return
}
defer lc.Unlock(contxt)
lc.Renew(contxt)
测试结果,可以使用压测工具
红锁是什么
红锁算法(Redlock)是一种分布式锁的实现算法,由 Redis 的作者 Antirez 发布。它主要用于解决分布式环境下的资源争用问题,同时保证锁的可靠性和安全性。红锁算法通过在多个 Redis 节点上创建锁,要求获得锁的客户端必须在大多数节点上成功创建锁,从而确保在分布式环境中只有一个客户端可以获得锁。
红锁算法的基本步骤如下:
- 客户端获取当前系统时间。
- 客户端尝试在 N 个 Redis 节点上创建锁,设置锁的过期时间为过期时间加上一个小的时延。
- 如果客户端在大多数节点上成功创建了锁(N/2+1),则认为客户端获得了锁。客户端应将锁的有效期设置为从步骤1开始计算的实际过期时间。
- 如果客户端未能在大多数节点上创建锁,那么客户端需要删除在其他节点上创建的锁,并等待一段随机时间后重新尝试。
别人的带红锁的实现
https://juejin.cn/post/7148391514966589477
etcd
todo 待研究
库:https://github.com/etcd-io/etcd
https://juejin.cn/post/7148391514966589477
https://www.liwenzhou.com/posts/Go/etcd/
zk的实现
todo 待研究
库:https://github.com/samuel/go-zookeeper
zookeeper简称zk,zk是通过生成临时有序节点来实现分布式锁的,首先会在/lock目录下一个临时有序节点,后续请求会在节点后面继续创建临时节点。新的子节点后面,会添加一个次序编号,这个生成的编号,会在上一次的编号进行 +1 操作。
zk节点监听机制:每个线程抢占锁之前,先尝试创建自己的ZNode。同样,释放锁的时候,就需要删除创建的Znode。创建成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode的通知就可以了。前一个Znode删除的时候,会触发Znode事件,当前节点能监听到删除事件,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个依次向后。
zk临时节点自动删除:当我们客户端断开连接之后,我们出创建的临时节点会进行自动删除操作,所以我们在使用分布式锁的时候,一般都是会去创建临时节点,这样可以避免因为网络异常等原因,造成的死锁。