记一次对Codis的无知引起的逻辑变更

先提前说明,对Codis的无知是因为Codis不支持一些Redis的命令,而这次的逻辑变更,就是因为使用了PUBLISH,而Codis又不支持PUBLISH导致的。

1. 前言

前段时间的一次需求中,因为设计到多个服务的注册问题,在项目中没有Zookeeper的情况下,决定采用Redis替代Zookeeper的方案实现多个服务注册、动态扩缩容等逻辑。

最初的服务逻辑是:

基于Redis的注册服务主体逻辑

主要可以分为5个函数:

  1. ServiceRegister: 主要负责服务的注册
    1. 服务启动时,会通过Lua脚本将自己注册到Redis的SortedSet中(其中member代表服务名称(保证唯一),score代表服务的下标)
  2. ServiceUnregister: 主要负责服务的注销
    1. 当服务下线时,会通过Lua脚本将自己从Redis的SortedSet中移除,同时触发其他服务的Rebalance
  3. ReportHeartbeat: 主要负责心跳上报
    1. 当服务注册后,会启动心跳上报机制,主要为了防止服务异常下线,其他服务对异常下线的服务无感知
  4. EvictedExpiredService: 主要负责驱逐心跳数据过期的服务
    1. 这个与ReportHeartbeat相辅相成,如果ReportHeartbeat的心跳数据上报时间戳太老(过期),则会被驱逐,同时会告知其他服务进行Rebalance
  5. ServiceRebalace: 主要负责在服务扩缩容、上下线过程中的服务重平衡逻辑
    1. 如果有服务下线、服务上线都会触发服务重平衡,以使得所有注册的服务均处于正常状态。

介绍完上面的5个函数,下面就记录下为什么会引发对Codis无知的思考。

2. 对Codis的无知

为什么会引发对Codis的无知呢?

这个问题在代码合入Test分支以及自己在本地单测过程中均未发现,未发现的原因是自己本地的Redis是单节点,部署在Test环境的Redis是单节点的Redis Cluster,根本不具备高可用性。而我们的生产环境是采用了Codis proxy进行部署的分布式Redis的集群。

在不了解Codis的情况下,上面5个函数中的ServiceRebalance的逻辑我采用的是Redis Publish/Subscribe的命令来监听其他服务下线时发送到channel中的Rebalance消息,从而触发ServiceRegister的逻辑。

Rebalance逻辑流程示意图

问题?

看起来一切似乎都没有毛病,毕竟从Redis 2.x版本开始,Pub/Sub的逻辑就已经支持了。当一切都准备就绪之后,它们遇到了Codis,这个时候当下线服务Publish Rebalace消息的时候,报错了:

command PUBLISH is not allowed

在没有去了解Codis之前,看到这个报错,第一想法就是SRE从redis.conf中禁止了此命令的使用,想着如果放开,这个报错不就消失了吗?

于是带着这个问题去寻找SRE,SRE反馈”Codis不支持PUBLISH这个命令”,收到反馈后,第一时间就去github寻找答案了,于是发现Codis有一个文档,文档标注了Codis不支持的Redis Command:https://github.com/CodisLabs/codis/blob/release3.2/doc/unsupported_cmds.md,而PUBLISH就包含其中,而不支持的原因主要是因为在分布式环境下,为了确保消息在所有节点之间正确传播和同步,Codis为了简化分布式处理的复杂性,所以就没有实现PUBLISH命令:

var opTable = make(map[string]OpInfo, 256)

func init() {
	for _, i := range []OpInfo{
	  // ...
		{"PUBLISH", FlagNotAllow},
		// ...
	} {
		opTable[i.Name] = i
	}
}

func (s *Session) handleRequest(r *Request, d *Router) error {
	opstr, flag, err := getOpInfo(r.Multi)
	if err != nil {
		return err
	}
	r.OpStr = opstr
	r.OpFlag = flag
	r.Broken = &s.broken

	if flag.IsNotAllowed() {
		return fmt.Errorf("command '%s' is not allowed", opstr)
	}
// ignore...
}

3. 解决办法

因为无法使用Pub/Sub的逻辑,就需要移除基于Pub/Sub机制实现的代码逻辑,从而采用其他的方式来替代,因为Publish是嵌入在Lua脚本中的,所以导致我们没有办法使用Kafka这种消息队列来实现这个服务的注册,再加上我们希望这个服务注册的逻辑本身只依赖Redis,所以采用Kafka消息队列来替代Publish/Subscribe的逻辑就被抛弃掉了。

后面采用了Get/Set来替代,因为从Pub/Sub的逻辑来看,对于Publish和Subscribe来看,在go-redis/v8的实现中,subscribe的实现如下列代码所示:

func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
	c.chOnce.Do(func() {
		c.msgCh = newChannel(c, opts...) // new出新的channel
		c.msgCh.initMsgChan() // 初始化msgChan
	})
	if c.msgCh == nil {
		err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
		panic(err)
	}
	return c.msgCh.msgCh
}

func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
	c := &channel{
		pubSub: pubSub,

		chanSize:        100,
		chanSendTimeout: time.Minute,
		checkInterval:   3 * time.Second,
	}
	for _, opt := range opts {
		opt(c)
	}
	if c.checkInterval > 0 {
		c.initHealthCheck() // 这里默认是3s发送一次ping, 保证subscribe与redis的连通性
	}
	return c
}

func (c *channel) initHealthCheck() {
	ctx := context.TODO()
	c.ping = make(chan struct{}, 1)

	go func() {
		timer := time.NewTimer(time.Minute) // 这里默认是1分钟
		timer.Stop()

		for {
			timer.Reset(c.checkInterval) // 重置为3s
			select {
			case <-c.ping: // 这里是与下面的函数(initMsgChan)结合使用的
			// 一旦接收到c.ping, 实际就说明网络是联通的,就没必要再去发送ping消息验证健康状态了
				if !timer.Stop() {
					<-timer.C
				}
			case <-timer.C:
				if pingErr := c.pubSub.Ping(ctx); pingErr != nil {
					c.pubSub.mu.Lock()
					c.pubSub.reconnect(ctx, pingErr)
					c.pubSub.mu.Unlock()
				}
			case <-c.pubSub.exit:
				return
			}
		}
	}()
}

func (c *channel) initMsgChan() {
	ctx := context.TODO()
	c.msgCh = make(chan *Message, c.chanSize)

	go func() {
		timer := time.NewTimer(time.Minute)
		timer.Stop()

		var errCount int
		for {
			// 这里的连接如果往底层追的,主要是一个延迟阻塞读取的逻辑
			msg, err := c.pubSub.Receive(ctx)
			if err != nil {
				if err == pool.ErrClosed {
					close(c.msgCh)
					return
				}
				if errCount > 0 {
					time.Sleep(100 * time.Millisecond)
				}
				errCount++
				continue
			}

			errCount = 0

			// Any message is as good as a ping.
			// 这里只要获取到了消息,就意味着subscribe的连接是健康的
			select {
			case c.ping <- struct{}{}:
			default:
			}

			switch msg := msg.(type) {
			case *Subscription:
				// Ignore.
			case *Pong:
				// Ignore.
			case *Message:
				// 如果获取到消息之后,直接重置定时器
				timer.Reset(c.chanSendTimeout)
				select {
				case c.msgCh <- msg:
					if !timer.Stop() { // 这一步主要是因为Reset只能重置timer已经stopped或者已经过了timer expired的timer,可以看其方法comment
					// 这里做这一步因为如果timer.C在timer.Reset之前就到期了,这里不释放,会导致下次出现命中 <-timer.C的误操作(select多个条件命中会随机选择一个)
						<-timer.C // 如果定时器没有stop,说明可能到期了,先把chan释放
					}
				case <-timer.C:
					internal.Logger.Printf(
						ctx, "redis: %s channel is full for %s (message is dropped)",
						c, c.chanSendTimeout)
				}
			default:
				internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
			}
		}
	}()
}

关于c.pubSub.Receive(ctx)方法,继续向下看,会一直看到

func (c *PubSub) Receive(ctx context.Context) (interface{}, error) {
	return c.ReceiveTimeout(ctx, 0)
}

func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
	if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
		return err
	}
	return fn(cn.rd)
}

func (c *conn) SetReadDeadline(t time.Time) error {
	if !c.ok() {
		return syscall.EINVAL
	}
	if err := c.fd.SetReadDeadline(t); err != nil {
		return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
	}
	return nil
}

func (fd *netFD) SetReadDeadline(t time.Time) error {
	return fd.pfd.SetReadDeadline(t)
}

func (fd *FD) SetReadDeadline(t time.Time) error {
	return setDeadlineImpl(fd, t, 'r')
}

func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)

这里主要是为读取fd数据设置了一个延迟时间,而我们的timeout传递为0,表示的是一直在监听,而我们的initHealthCheck函数会每3s发送一个ping的命令给Redis,以保证这里3s一定会读取到一个”pong”的回复。

关于runtime_pollSetDeadline的进一步的实现原理,可以参考【5-4 Golang】实战—Go服务502总结这篇博客。

3.1 实现

有了上面对subscribe获取数据的分析,对于Get/Set的实现方案,则采用如下的逻辑:

  1. 当服务下线时,之前利用PUBLISH给所有SUBSCRIBE发送消息的行为被改写成了获取所有心跳数据中的服务key,然后为每一个key写一个需要rebalance的数据到这个key中
  2. 其他的服务则按时轮询这个key是否存在,如果存在则相当于收到了类似之前PUBLISH的消息,则触发Rebalance的行为。

上面的实现方案,在某种程度上是可行的,但是相比于go-redis的实现方案,自己写的代码差距还是有些大,因为这样就导致了轮询必须不断地发送Redis的请求,而不是像go-redis一样利用healthcheck每3s发送一个命令,在receive方法中也是采用读到了才返回数据的机制(timeout=0意味着没有任何的deadline)实现的,则保证了服务在没有收到新的请求下每3s才会有一个请求发送给redis。

4. 小结

因为使用了Redis,引入了PUBLISH,因为Codis,PUBLISH不允许使用,导致了需求逻辑的变更。同时也暴露出自己对于服务中一些基础中间件的不了解,作为一个后端服务开发,在对中间件不了解的情况下使用一些中间件的技术来实现服务逻辑,这个行为本身其实就是需求分析不到位导致的。虽然在刚开始自己还在为Redis不允许使用PUBLISH命令感觉到自己也很难去知道命令不可用,但考虑到自己对系统服务使用的是Codis proxy搭建的分布式Redis集群,就让自己对这次的问题产生了一些反思。

希望后续遇到类似的需求,可以先调查清楚这些基础中间件的支持情况,然后再决定业务逻辑代码的编写,而不是先实现了业务逻辑代码,因为遇到基础中间件不支持的情况,再反过来修改业务逻辑代码!

参考

  • Codis:https://github.com/CodisLabs/codis
  • go-redis: https://github.com/redis/go-redis/blob/f3fe61148b2b8fe0a669dc23620690407f5f92af/pubsub.go#L564

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/504907.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

算法整理:排序

快速排序 首先不妨以第一个数为基准数&#xff0c;在一轮遍历后&#xff0c;使基准数左边的数都小于基准数&#xff0c;基准数右边的数都大于基准数。 当然也可以取中间的数为基准数。 void quick_sort(int q[],int l,int r){if(l>r)return;int i l;int j r;int xq[(lr)/…

类的函数成员(二):析构函数

一.定义 析构函数(destructor) 与构造函数相反&#xff0c;当对象结束其生命周期&#xff0c;如对象所在的函数已调用完毕时&#xff0c;系统自动执行析构函数。析构函数往往用来做“清理善后” 的工作。 例如&#xff0c;在建立对象时用new开辟了一片内存空间&#xff0c;dele…

【LeetCode】三月题解

文章目录 [2369. 检查数组是否存在有效划分](https://leetcode.cn/problems/check-if-there-is-a-valid-partition-for-the-array/)思路&#xff1a;代码&#xff1a; [1976. 到达目的地的方案数](https://leetcode.cn/problems/number-of-ways-to-arrive-at-destination/) 思路…

005 高并发内存池_CentralCache设计

​&#x1f308;个人主页&#xff1a;Fan_558 &#x1f525; 系列专栏&#xff1a;高并发内存池 &#x1f339;关注我&#x1f4aa;&#x1f3fb;带你学更多知识 文章目录 前言本文重点一、构建CentralCache结构二、运用慢开始反馈调节算法三、完成向CentralCache中心缓存申请四…

【讲解下Gitea】

&#x1f308;个人主页:程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

2024.3.30学习笔记

今日学习韩顺平java0200_韩顺平Java_对象机制练习_哔哩哔哩_bilibili 今日学习p295-p314 super关键字 super代表父类的引用&#xff0c;用于访问父类的属性、方法、构造器 super细节和语法 访问父类的属性&#xff0c;但不能访问父类的private属性 super.属性名 访问父类的…

STM32学习笔记(10_2)- I2C通信协议MPU6050简介

无人问津也好&#xff0c;技不如人也罢&#xff0c;都应静下心来&#xff0c;去做该做的事。 最近在学STM32&#xff0c;所以也开贴记录一下主要内容&#xff0c;省的过目即忘。视频教程为江科大&#xff08;改名江协科技&#xff09;&#xff0c;网站jiangxiekeji.com 本期开…

【Java八股学习】Redis持久化 思维导图

说明 文章内容通过学习小林Coding内的优质文章后整理而来&#xff0c;整理成思维导图的方式是为了帮助自己理解、记忆和复习。如若侵权请联系删除&#xff0c;再次对小林Coding内的优质文章表示感谢。参考文章如下&#xff1a; AOF 持久化是怎么实现的&#xff1f;RDB 快照是…

Leaflet使用多面(MultiPolygon)进行遥感影像掩膜报错解决之道

目录 前言 一、问题初诊断 1、山重水复 2、柳暗花明 3、庖丁解牛 4、问题定位 二、解决多面掩膜问题 1、尝试数据修复 2、实际修复 3、最终效果 三、总结 前言 之前一篇讲解遥感影像掩膜实现&#xff1a;基于SpringBoot和Leaflet的行政区划地图掩膜效果实战&#xff0…

CleanMyMac X中文---让Mac焕发新生,Mac优化与清理的终极利器

CleanMyMac X是一款专为Mac用户设计的综合性系统优化工具。通过智能扫描&#xff0c;它能够快速识别并清理Mac磁盘上的垃圾文件、重复文件、无用语言安装包、iTunes缓存、邮件附件等&#xff0c;有效释放磁盘空间&#xff0c;提升Mac电脑的运行速度。此外&#xff0c;CleanMyMa…

【初阶数据结构】——160. 相交链表

文章目录 1. 题目介绍2. 思路1&#xff1a;暴力求解算法思想代码实现 3. 思路2&#xff1a;快慢指针算法思想代码实现 1. 题目介绍 链接: link 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&…

Flutter 全局控制底部导航栏和自定义导航栏的方法

1. 介绍 导航栏在移动应用中扮演着至关重要的角色&#xff0c;它是用户与应用之间进行导航和交互的核心组件之一。无论是简单的页面切换&#xff0c;还是复杂的应用导航&#xff0c;导航栏都能够帮助用户快速找到所需内容&#xff0c;提升用户体验和应用的易用性。 在移动应用…

chatgpt用pygame根据重心坐标 填充三角形

pygame.Surface.set_at(screen, (int(w), int(h)), (int(255zhongxina),int(255zhongxinb),int(255zhongxinc))) 颜色是由三个重心坐标权重abc255求出的 import pygame from pygame.locals import * import sys import mathpygame.init()width, height 800, 600 screen pyga…

关于 C/C++ 1Z(17)开源项目 openppp2 协同程式切换工作流

下述为开源项目 openppp2&#xff08;github&#xff09;构建工作在 C/C 17 的 stackful 有栈协同程式的工作流切换示意图&#xff1a; 在 openppp2 之中采用人工手动方式管理协同程式之间的切换&#xff0c;每个中断过程只是保存线程栈信息&#xff08;如寄存器、当前#PC EIP&…

魔改一个过游戏保护的CE

csdn审核不通过 网易云课堂有配套的免费视频 int0x3 - 主页 文章都传到github了 Notes/外挂/魔改CE at master MrXiao7/Notes GitHub 为什么要编译自己的CE 在游戏逆向的过程中&#xff0c;很多游戏有保护&#xff0c;我们运行原版CE的时候会被检测到 比如我们开着CE运…

C语言——内存函数

前言&#xff1a; C语言中除了字符串函数和字符函数外&#xff0c;还有一些函数可以直接对内存进行操作&#xff0c;这些函数被称为内存函数&#xff0c;这些函数与字符串函数都属于<string.h>这个头文件中。 一.memcpy&#xff08;&#xff09;函数 memcpy是C语言中的…

【OpenGL】(1) 环境搭建:运行简单的 OpenGL 教学示例程序

&#x1f4ad; 写在前面&#xff1a;我们尽可能地让大家以 最简单粗暴且无脑的方式&#xff0c;带大家配置好 OpenGL 环境&#xff0c;并跑出我们第一个示例程序。再次声明&#xff0c;本专栏所有教学都是基于 Windows上使用 VS2022 (X64) 的。本专栏主要内容是关于 3D 计算机图…

用数组模拟单链表、双链表、栈、单调栈、队列、循环队列、单调队列

本文用于记录个人算法竞赛学习&#xff0c;仅供参考 目录 一.模拟单链表 二.双链表 三.栈 四.单调栈 五.队列 六.循环队列 七.单调队列 为什么要用数组模拟而不用现成的STL&#xff0c;因为用数组模拟效率会快一点&#xff0c;比如用结构体指针的方式创建链表&#xff0…

C++ 二叉树OJ题

&#x1f493;博主CSDN主页:麻辣韭菜-CSDN博客&#x1f493;   ⏩专栏分类&#xff1a;C知识分享⏪   &#x1f69a;代码仓库:C高阶&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习更多C知识   &#x1f51d;&#x1f51d; 前言 C二叉搜索树 这篇讲解了搜索二叉…

动态规划1

动态规划问题的五步操作&#xff1a; 动态规划就是把dp表填满&#xff0c;则最终一定有一个数据是我们所需要的数据 下面以一道简单的题目进行讲解 本题其实就是斐波那契数列的一个plus版 &#xff0c;就是利用递推关系求值的过程 1.前期准备&#xff1a;创建dp表(使用一维…