先提前说明,对Codis的无知是因为Codis不支持一些Redis的命令,而这次的逻辑变更,就是因为使用了PUBLISH,而Codis又不支持PUBLISH导致的。
1. 前言
前段时间的一次需求中,因为设计到多个服务的注册问题,在项目中没有Zookeeper的情况下,决定采用Redis替代Zookeeper的方案实现多个服务注册、动态扩缩容等逻辑。
最初的服务逻辑是:
主要可以分为5个函数:
- ServiceRegister: 主要负责服务的注册
- 服务启动时,会通过Lua脚本将自己注册到Redis的SortedSet中(其中member代表服务名称(保证唯一),score代表服务的下标)
- ServiceUnregister: 主要负责服务的注销
- 当服务下线时,会通过Lua脚本将自己从Redis的SortedSet中移除,同时触发其他服务的Rebalance
- ReportHeartbeat: 主要负责心跳上报
- 当服务注册后,会启动心跳上报机制,主要为了防止服务异常下线,其他服务对异常下线的服务无感知
- EvictedExpiredService: 主要负责驱逐心跳数据过期的服务
- 这个与ReportHeartbeat相辅相成,如果ReportHeartbeat的心跳数据上报时间戳太老(过期),则会被驱逐,同时会告知其他服务进行Rebalance
- ServiceRebalace: 主要负责在服务扩缩容、上下线过程中的服务重平衡逻辑
- 如果有服务下线、服务上线都会触发服务重平衡,以使得所有注册的服务均处于正常状态。
介绍完上面的5个函数,下面就记录下为什么会引发对Codis无知的思考。
2. 对Codis的无知
为什么会引发对Codis的无知呢?
这个问题在代码合入Test分支以及自己在本地单测过程中均未发现,未发现的原因是自己本地的Redis是单节点,部署在Test环境的Redis是单节点的Redis Cluster,根本不具备高可用性。而我们的生产环境是采用了Codis proxy进行部署的分布式Redis的集群。
在不了解Codis的情况下,上面5个函数中的ServiceRebalance的逻辑我采用的是Redis Publish/Subscribe的命令来监听其他服务下线时发送到channel中的Rebalance消息,从而触发ServiceRegister的逻辑。
问题?
看起来一切似乎都没有毛病,毕竟从Redis 2.x版本开始,Pub/Sub的逻辑就已经支持了。当一切都准备就绪之后,它们遇到了Codis,这个时候当下线服务Publish Rebalace消息的时候,报错了:
command PUBLISH is not allowed
在没有去了解Codis之前,看到这个报错,第一想法就是SRE从redis.conf中禁止了此命令的使用,想着如果放开,这个报错不就消失了吗?
于是带着这个问题去寻找SRE,SRE反馈”Codis不支持PUBLISH这个命令”,收到反馈后,第一时间就去github寻找答案了,于是发现Codis有一个文档,文档标注了Codis不支持的Redis Command:https://github.com/CodisLabs/codis/blob/release3.2/doc/unsupported_cmds.md,而PUBLISH就包含其中,而不支持的原因主要是因为在分布式环境下,为了确保消息在所有节点之间正确传播和同步,Codis为了简化分布式处理的复杂性,所以就没有实现PUBLISH命令:
var opTable = make(map[string]OpInfo, 256)
func init() {
for _, i := range []OpInfo{
// ...
{"PUBLISH", FlagNotAllow},
// ...
} {
opTable[i.Name] = i
}
}
func (s *Session) handleRequest(r *Request, d *Router) error {
opstr, flag, err := getOpInfo(r.Multi)
if err != nil {
return err
}
r.OpStr = opstr
r.OpFlag = flag
r.Broken = &s.broken
if flag.IsNotAllowed() {
return fmt.Errorf("command '%s' is not allowed", opstr)
}
// ignore...
}
3. 解决办法
因为无法使用Pub/Sub的逻辑,就需要移除基于Pub/Sub机制实现的代码逻辑,从而采用其他的方式来替代,因为Publish是嵌入在Lua脚本中的,所以导致我们没有办法使用Kafka这种消息队列来实现这个服务的注册,再加上我们希望这个服务注册的逻辑本身只依赖Redis,所以采用Kafka消息队列来替代Publish/Subscribe的逻辑就被抛弃掉了。
后面采用了Get/Set来替代,因为从Pub/Sub的逻辑来看,对于Publish和Subscribe来看,在go-redis/v8的实现中,subscribe的实现如下列代码所示:
func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
c.chOnce.Do(func() {
c.msgCh = newChannel(c, opts...) // new出新的channel
c.msgCh.initMsgChan() // 初始化msgChan
})
if c.msgCh == nil {
err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
panic(err)
}
return c.msgCh.msgCh
}
func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
c := &channel{
pubSub: pubSub,
chanSize: 100,
chanSendTimeout: time.Minute,
checkInterval: 3 * time.Second,
}
for _, opt := range opts {
opt(c)
}
if c.checkInterval > 0 {
c.initHealthCheck() // 这里默认是3s发送一次ping, 保证subscribe与redis的连通性
}
return c
}
func (c *channel) initHealthCheck() {
ctx := context.TODO()
c.ping = make(chan struct{}, 1)
go func() {
timer := time.NewTimer(time.Minute) // 这里默认是1分钟
timer.Stop()
for {
timer.Reset(c.checkInterval) // 重置为3s
select {
case <-c.ping: // 这里是与下面的函数(initMsgChan)结合使用的
// 一旦接收到c.ping, 实际就说明网络是联通的,就没必要再去发送ping消息验证健康状态了
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
if pingErr := c.pubSub.Ping(ctx); pingErr != nil {
c.pubSub.mu.Lock()
c.pubSub.reconnect(ctx, pingErr)
c.pubSub.mu.Unlock()
}
case <-c.pubSub.exit:
return
}
}
}()
}
func (c *channel) initMsgChan() {
ctx := context.TODO()
c.msgCh = make(chan *Message, c.chanSize)
go func() {
timer := time.NewTimer(time.Minute)
timer.Stop()
var errCount int
for {
// 这里的连接如果往底层追的,主要是一个延迟阻塞读取的逻辑
msg, err := c.pubSub.Receive(ctx)
if err != nil {
if err == pool.ErrClosed {
close(c.msgCh)
return
}
if errCount > 0 {
time.Sleep(100 * time.Millisecond)
}
errCount++
continue
}
errCount = 0
// Any message is as good as a ping.
// 这里只要获取到了消息,就意味着subscribe的连接是健康的
select {
case c.ping <- struct{}{}:
default:
}
switch msg := msg.(type) {
case *Subscription:
// Ignore.
case *Pong:
// Ignore.
case *Message:
// 如果获取到消息之后,直接重置定时器
timer.Reset(c.chanSendTimeout)
select {
case c.msgCh <- msg:
if !timer.Stop() { // 这一步主要是因为Reset只能重置timer已经stopped或者已经过了timer expired的timer,可以看其方法comment
// 这里做这一步因为如果timer.C在timer.Reset之前就到期了,这里不释放,会导致下次出现命中 <-timer.C的误操作(select多个条件命中会随机选择一个)
<-timer.C // 如果定时器没有stop,说明可能到期了,先把chan释放
}
case <-timer.C:
internal.Logger.Printf(
ctx, "redis: %s channel is full for %s (message is dropped)",
c, c.chanSendTimeout)
}
default:
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
}
}
}()
}
关于c.pubSub.Receive(ctx)
方法,继续向下看,会一直看到
func (c *PubSub) Receive(ctx context.Context) (interface{}, error) {
return c.ReceiveTimeout(ctx, 0)
}
func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
return fn(cn.rd)
}
func (c *conn) SetReadDeadline(t time.Time) error {
if !c.ok() {
return syscall.EINVAL
}
if err := c.fd.SetReadDeadline(t); err != nil {
return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
}
return nil
}
func (fd *netFD) SetReadDeadline(t time.Time) error {
return fd.pfd.SetReadDeadline(t)
}
func (fd *FD) SetReadDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, 'r')
}
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
这里主要是为读取fd数据设置了一个延迟时间,而我们的timeout传递为0,表示的是一直在监听,而我们的initHealthCheck
函数会每3s发送一个ping的命令给Redis,以保证这里3s一定会读取到一个”pong”的回复。
关于runtime_pollSetDeadline的进一步的实现原理,可以参考【5-4 Golang】实战—Go服务502总结这篇博客。
3.1 实现
有了上面对subscribe获取数据的分析,对于Get/Set的实现方案,则采用如下的逻辑:
- 当服务下线时,之前利用PUBLISH给所有SUBSCRIBE发送消息的行为被改写成了获取所有心跳数据中的服务key,然后为每一个key写一个需要rebalance的数据到这个key中
- 其他的服务则按时轮询这个key是否存在,如果存在则相当于收到了类似之前PUBLISH的消息,则触发Rebalance的行为。
上面的实现方案,在某种程度上是可行的,但是相比于go-redis的实现方案,自己写的代码差距还是有些大,因为这样就导致了轮询必须不断地发送Redis的请求,而不是像go-redis一样利用healthcheck每3s发送一个命令,在receive方法中也是采用读到了才返回数据的机制(timeout=0意味着没有任何的deadline)实现的,则保证了服务在没有收到新的请求下每3s才会有一个请求发送给redis。
4. 小结
因为使用了Redis,引入了PUBLISH,因为Codis,PUBLISH不允许使用,导致了需求逻辑的变更。同时也暴露出自己对于服务中一些基础中间件的不了解,作为一个后端服务开发,在对中间件不了解的情况下使用一些中间件的技术来实现服务逻辑,这个行为本身其实就是需求分析不到位导致的。虽然在刚开始自己还在为Redis不允许使用PUBLISH命令感觉到自己也很难去知道命令不可用,但考虑到自己对系统服务使用的是Codis proxy搭建的分布式Redis集群,就让自己对这次的问题产生了一些反思。
希望后续遇到类似的需求,可以先调查清楚这些基础中间件的支持情况,然后再决定业务逻辑代码的编写,而不是先实现了业务逻辑代码,因为遇到基础中间件不支持的情况,再反过来修改业务逻辑代码!
参考
- Codis:https://github.com/CodisLabs/codis
- go-redis: https://github.com/redis/go-redis/blob/f3fe61148b2b8fe0a669dc23620690407f5f92af/pubsub.go#L564