概述
- 我们的 redis 一般都是集群来给我们程序提供服务的,单体的redis现在也不多见
- 看到上面是主节点redis和下面是6个重节点redis,主节点和重节点的通讯都是畅通没问题的
- 这个时候,我们有 gorouting 写我们的数据,那它就会用到我们的setNX
- 写完数据内部是自动同步的,就是你的这个数据通过主节点同步到这些从节点了
- 下面又有我们的 gorouting 去读我们的从节点,但是,我们是在高并发和网络不确定的情况下可能会遇到一些问题
可能会遇到的一些问题
1 )集群方面
- 如果,我们上面是一个gorouting,在主节点上,它用setNX写数据,如果主节点挂了
- 集群就从我们的所有子节点中抽取一个节点,当成主节点顶上去,集群又可以正常工作了
- 这个时候,有一个gorouting在右下角,它又来读数据了,由于我们上面刚写了数据
- 还没有来得及同步到最后一个 redis 这个节点上, 但是面临着新的gorouting读取数据或操作
- 这个时候最后这一台redis它是拿不到那个锁的,是没有同步到的
- 最后来的 gorouting,就认为你没有锁, 或者说我要的资源,你没锁住
- 那其他 gorouting 就认为它是无主的, 就可以锁, 这个时候就会造成一些问题
2 )网络方面
- 正常的时候,写数据还有下面的从节点去获取数据,获取锁,都是没问题的
- 如果是网络抖动或不通会有一些问题,由于redis它是网络传输的
- 如果说我们右边的这网络络不通了,相当于右边的 redis 没有和主节点通讯
- 这个时候我们的一个gorouting就来获取锁进行数据的操作
- 如果这个时候,我要操作的资源没有上锁,那这个gorouting就认为它是还没有被加锁,就把这个锁锁上了
- 所以这个地方也是有可能出问题的风险
解决方案
1 )使用 redLock
- 锁不住资源,有可能因为节点挂了或网络抖动, 我们现在尝试使用 redLock 来解决这一个问题
- redLock它没有master节点,也没有这个slave从节点,都是独立的
- 每一个redis,都是有一个 SetNx 这么一个锁, 现在有两个协程来申请锁
- redis集群的一般是7个,而不是说双数的, 如果双数的那我左边的 gorouting 获得3个
- 右边的 gorouting 获得3个,他就要重新再做选举投票之类的东西
- 基于redLock, 当左边的 gorouting 抢到了4个,那右边的只有3个就应该释放掉
- 为下一次再运行做准备,右边这个锁就消失了
2 ) 源码
- redlock把原来的master,slave这种模式改成了平等的模式,最终解决了问题
2.1 ) NewMutex
- 在源码的
NewMutex
函数中// NewMutex returns a new distributed mutex with given name. func (r *Redsync) NewMutex(name string, options ...Option) *Mutex { m := &Mutex{ name: name, expiry: 8 * time.Second, tries: 32, // 重试 32 次 delayFunc: func(tries int) time.Duration { return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond }, genValueFunc: genValue, driftFactor: 0.01, // 漂移因子 timeoutFactor: 0.05, // 超时因子 quorum: len(r.pools)/2 + 1, // 法定数,找一半+1,大多数 pools: r.pools, } for _, o := range options { o.Apply(m) } if m.shuffle { randomPools(m.pools) } return m }
- 上面
driftFactor
是说我们的服务器的时钟漂移- 比如说我们的A服务器是中午12点,但是B服务器是中午11点59分30秒
- C服务器是中午的12点0分30秒,相当于它们每台服务器相差30秒
- 这就是服务器的时间漂移,它不准,那这会导致什么呢?
- 假如说我们的这个过期时间是8秒,那你差了30秒,肯定就是有的服务器会先释放锁
- 那先释放锁,其他人就可以拿到锁,所以他就设置了一个因子
- 关于
quorum
就如同上面的例子,7台服务器拿到了4台就是成功的
2.2 ) Lock
- 回到锁定的函数
Lock
中,进入其lockContext
// lockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again. func (m *Mutex) lockContext(ctx context.Context, tries int) error { if ctx == nil { ctx = context.Background() } // 获取 base64 的值 value, err := m.genValueFunc() if err != nil { return err } var timer *time.Timer // 对默认32次循环的操作 for i := 0; i < tries; i++ { if i != 0 { if timer == nil { timer = time.NewTimer(m.delayFunc(i)) } else { timer.Reset(m.delayFunc(i)) } select { // 如果 完成 状态,则返回 ErrFailed case <-ctx.Done(): timer.Stop() // Exit early if the context is done. return ErrFailed // 没有完成,则不动 case <-timer.C: // Fall-through when the delay timer completes. } } start := time.Now() // 拿到计数器和错误信息 n, err := func() (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor))) defer cancel() /// 注意这里,最终的函数就是执行的这里 return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { return m.acquire(ctx, pool, value) }) }() now := time.Now() // 下面是核心算法: 过期时间 - 拿锁的时间 - 漂移因子可能的时间 until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor))) // 判断是否是大多数并且没有过期,则直接进行赋值 if n >= m.quorum && now.Before(until) { m.value = value m.until = until return nil } // 否则 release 进行释放 _, _ = func() (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor))) defer cancel() return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { return m.release(ctx, pool, value) }) }() if i == tries-1 && err != nil { return err } } return ErrFailed }
- 进入
actOnPoolsAsync
这里参数是一个函数func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) { type result struct { node int statusOK bool err error } // 创建 channel ch := make(chan result, len(m.pools)) // 循环 pools for node, pool := range m.pools { // 开协程提速 go func(node int, pool redis.Pool) { r := result{node: node} r.statusOK, r.err = actFn(pool) ch <- r }(node, pool) } var ( n = 0 // 计数器 taken []int err error // 错误 ) // 循环 pools for range m.pools { // 从 channel 中拿到结果 r := <-ch if r.statusOK { n++ // 计数器加加 } else if r.err == ErrLockAlreadyExpired { err = multierror.Append(err, ErrLockAlreadyExpired) } else if r.err != nil { err = multierror.Append(err, &RedisError{Node: r.node, Err: r.err}) } else { taken = append(taken, r.node) err = multierror.Append(err, &ErrNodeTaken{Node: r.node}) } if m.failFast { // fast retrun if n >= m.quorum { return n, err } // fail fast if len(taken) >= m.quorum { return n, &ErrTaken{Nodes: taken} } } } if len(taken) >= m.quorum { return n, &ErrTaken{Nodes: taken} } return n, err }
- 以上就是 redLock 源码锁的机制,通过源代码可以更好的理解框架
- 即使上面一些细节点看不懂,跳过即可,前期可以先看大的实现脉络帮助我们理解