监听DB配置变更之go-broadcast简单实现

文章目录

  • 1. 前言
  • 2. 分析
  • 3. 实现
  • 4. 问题
  • 5. 小结
  • 6. 参考

1. 前言

之前遇到一个需求,因为配置的查找是基于db的,而db的更改却无法实时通知到具体利用到这条数据的使用方,为了实现db数据变动时,能够尽快让使用方知道这条数据发生了变更,从而进行后续数据变更等相关逻辑的运行,就需要实现db数据变动时的通知。

在观察者模式中,因为观察者模式是一种一对多的关系模式,即多个观察者观察同一个主题对象,当主题对象发生变化时,会通知所有的观察者对象。

2. 分析

使用观察者模式来实现的话,则需要实现如下四个部分的结构:

  1. 抽象主题
  2. 具体主题
  3. 抽象观察者
  4. 具体观察者

举个例子,在我们日常使用微信公众号中,当你关注了一个公众号,这个公众号如果有更新的话,则会推送给每一个关注过这个公众号的用户。此时我们可以将具体的部分的接收映射到微信公众号中,即:

  1. 抽象主题:公众号,具备订阅、取消订阅和发送消息的功能
  2. 具体主题:具体某一个公众号
  3. 抽象观察者:用户(泛指使用微信公众号的用户受众)
  4. 具体观察者:某一个具体的用户

分析了以上四个结构之后,我们需要实现的功能部分就清楚了。即我们需要实现一个抽象主题,这个主题需要有提供注册、取消注册以及提交信息的能力,当提交信息到抽象主题的时候,抽象主题需要将这个消息通知到所有已经注册过的具体观察者。

3. 实现

在明确了需求之后, 就开始进行功能的实现,因为使用的是go语言,则第一时间肯定是希望通过chan这样的功能来实现,因为chan天生具备监听的能力,我们可以通过监听注册到抽象主题的chan,从而实现抽象主题消息的实时监听。

但秉持着“你需要的功能,基本都有人实现过”的方针,第一时间还是上到了github,看看是否有现成的开源方案,经过一番查找,还真发现了一个开源库可以使用,这个库的名称是go-broadcast。

下面就来说下broadcaster是如何实现上面的功能逻辑的,broadcaster这个库的代码很简单,主体实现逻辑只有110行代码左右,但符合我们的功能逻辑实现需要。

type broadcaster struct {
	input chan interface{}
	reg   chan chan<- interface{}
	unreg chan chan<- interface{}

	outputs map[chan<- interface{}]bool
}

// The Broadcaster interface describes the main entry points to
// broadcasters.
type Broadcaster interface {
	// Register a new channel to receive broadcasts
	Register(chan<- interface{})
	// Unregister a channel so that it no longer receives broadcasts.
	Unregister(chan<- interface{})
	// Shut this broadcaster down.
	Close() error
	// Submit a new object to all subscribers
	Submit(interface{})
	// Try Submit a new object to all subscribers return false if input chan is fill
	TrySubmit(interface{}) bool
}

首先定义了一个接口叫做Broadcaster,然后定义了一个broadcaster实现了Broadcaster的所有方法逻辑。

func (b *broadcaster) Register(newch chan<- interface{}) {
	b.reg <- newch
}

func (b *broadcaster) Unregister(newch chan<- interface{}) {
	b.unreg <- newch
}

func (b *broadcaster) Close() error {
	close(b.reg)
	close(b.unreg)
	return nil
}

// Submit an item to be broadcast to all listeners.
func (b *broadcaster) Submit(m interface{}) {
	if b != nil {
		b.input <- m
	}
}
  • Register方法主要实现了将注册的chan直接放入到reg这个chan中,用于后续注册
  • Register方法主要实现了将注册的chan直接让如到ureg这个chan中,用于后续注销
  • Close方法主要是关闭reg和ureg两个chan
  • Submit方法主要实现对抽象主题broadcaster发送消息,将消息放入input这个chan中

上面的方法都是基于chan作为通信的,而chan中有了数据,后续需要消费数据。

// NewBroadcaster creates a new broadcaster with the given input
// channel buffer length.
func NewBroadcaster(buflen int) Broadcaster {
	b := &broadcaster{
		input:   make(chan interface{}, buflen),
		reg:     make(chan chan<- interface{}),
		unreg:   make(chan chan<- interface{}),
		outputs: make(map[chan<- interface{}]bool),
	}

	go b.run()

	return b
}

这里的run()方法则是消费所有chan数据的地方。

func (b *broadcaster) broadcast(m interface{}) {
	for ch := range b.outputs { // 遍历所有注册的chan,将消息发送到注册的chan中
		ch <- m
	}
}

func (b *broadcaster) run() {
	for {
		select {
		case m := <-b.input: // 如果有消息输入,则广播出去
			b.broadcast(m)
		case ch, ok := <-b.reg: // 如果有新注册的,则进行output的添加
			if ok {
				b.outputs[ch] = true
			} else {
				return
			}
		case ch := <-b.unreg: // 如果有注销的,则进行output的删除
			delete(b.outputs, ch)
		}
	}
}

整体的运行图如下:

在这里插入图片描述

  • 对应chan通过reg进行注册,注册后的chan记录在outputs中
  • 对应chan通过ureg进行注销,注销后的chan从output中移除
  • 对应的信息通过input输入,输入后的msg通过遍历outputs注册列表,从而通知到每一个注册者

4. 问题

在使用go-broadcast的过程中,看到之前有个pr加了一个TrySubmit的逻辑,这个逻辑主要是解决当input被装满了以后,broadcast会被阻塞,这个时候如果有新的消息进来,如何办呢?

// TrySubmit attempts to submit an item to be broadcast, returning
// true iff it the item was broadcast, else false.
func (b *broadcaster) TrySubmit(m interface{}) bool {
	if b == nil {
		return false
	}
	select {
	case b.input <- m:
		return true
	default:
		return false
	}
}

解决办法是采用select的方法尝试去塞入,塞入不成功则意味着消息提交失败,返回false,让使用者根据消息提交的结果进行后续的逻辑处理。

但这里还存在另外一个问题,库中给了一个样本case,这个样本case基于的条件都是消息传递给chan的时候没有阻塞。如下代码所示:

func (b *broadcaster) broadcast(m interface{}) {
	for ch := range b.outputs {
		ch <- m
	}
}

但一旦有注册的chan消费的时候阻塞了,这时候就会产生问题,会导致其它正常消费的chan因为一个异常chan而全部被阻塞住,导致其他chan都无法正常消费。

这个时候就会导致在input没有满的时候,即消息可以放入,但是消息无法被正常的消费,进而又反向导致input逐渐被塞满,最终导致input无法被塞入,消息也无法被发送到对应的chan中,导致run方法逻辑卡在broadcast中,导致整个运行出现问题。

解决办法:

func (b *broadcaster) broadcast(m interface{}) {
	for ch := range b.outputs {
		// if exist one output consume the chan message is too slow,
		// will block other output receive the msg.
		select {
		case ch <- m:
		default:
		}
	}
}

但这种虽然解决了一个chan满消费block其他chan的问题,随之也引入了丢消息的问题了,即有些消费慢的chan,由于chan消费慢导致无法接收新的消息,进而导致新消息丢失的问题。

5. 小结

因为需要实时监听db配置的变更,所以去探寻了一下方案,最终采用了go-broadcast的方案,但在使用go-broadcast的过程中,发现在broadcast消息的时候存在阻塞的行为,为了保证整个服务不被某个chan阻塞而停止运行,在broadcast消息的时候添加了select default条件来规避这个问题。

6. 参考

  • go-broadcast

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/696268.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数仓建模中的一些问题

​​​在数仓建设的过程中&#xff0c;由于未能完全按照规范操作&#xff0c; 从而导致数据仓库建设比较混乱&#xff0c;常见有以下问题&#xff1a; 数仓常见问题 ● 数仓分层不清晰&#xff1a;数仓的分层没有明确的逻辑&#xff0c;难以管理和维护。 ● 数据域划分不明确…

排序题+贪心

排序力扣题 一&#xff1a;合并区间 56. 合并区间 方法一&#xff1a;先排序再合并 如图&#xff0c;把区间按照起点从小到达排序&#xff0c;如果起点相同那么按照终点小的优先排序 然后每次记录一个区间&#xff0c;访问下一个区间&#xff1a; 如果下一个区间的起点<前…

使用NetAssist网络调试助手在单台计算机上配置TCP服务器和客户端

要使用NetAssist网络调试助手在同一台计算机上配置一个实例作为服务器&#xff08;server&#xff09;和另一个实例作为客户端&#xff08;client&#xff09;&#xff0c;可以按照以下步骤进行操作&#xff1a; 前提条件 确保已经安装NetAssist网络调试助手&#xff0c;并了…

【十大排序算法】归并排序

归并排序&#xff0c;如同秋日落叶&#xff0c;分散而细碎&#xff0c; 然而风吹叶动&#xff0c;自然而有序&#xff0c; 彼此相遇&#xff0c;轻轻合拢&#xff0c; 最终成就&#xff0c;秩序之谧。 文章目录 一、归并排序二、发展历史三、处理流程四、算法实现五、算法特性…

LLVM Cpu0 新后端4

想好好熟悉一下llvm开发一个新后端都要干什么&#xff0c;于是参考了老师的系列文章&#xff1a; LLVM 后端实践笔记 代码在这里&#xff08;还没来得及准备&#xff0c;先用网盘暂存一下&#xff09;&#xff1a; 链接: https://pan.baidu.com/s/1yLAtXs9XwtyEzYSlDCSlqw?…

数据结构和算法之数组和链表

一、数组 数组是一种线性数据结构&#xff0c;它是由一组连续的内存单元组成的&#xff0c;用于存储相同类型的数据。在JavaScript中&#xff0c;数组可以包含任意类型的数据&#xff0c;不只限于基本数据类型。 1.存储方式 在内存中&#xff0c;数组的元素是连续存储的&…

芒果YOLOv10改进38:写作篇:一文了解YOLOv10如何打印FPS指标

只需订阅这一个专栏即可阅读:芒果YOLOv10所有改进内容 💡🚀🚀🚀本博客内含改进源代码,按步骤操作运行改进后的代码即可 💡更方便的统计更多实验数据,方便写作 新增YOLOv10打印FPS指标 完善(一键YOLOv10打印FPS指标) 文章目录 完善(一键YOLOv10打印FPS指标)YOLO…

欧美北美南美国外媒体投稿和东南亚中东亚洲媒体海外新闻发稿软文推广营销策略有哪些?

在当今全球化的浪潮中&#xff0c;中国品牌正积极拓展海外市场&#xff0c;寻求更广阔的发展空间。面对国际竞争&#xff0c;有效的海外媒体发稿营销策略对于品牌国际化至关重要。以下是一些关键点和建议&#xff0c;以帮助品牌在海外市场取得成功。 深入了解目标市场&#xf…

吴恩达神经网络学习笔记1

代码解释 并不是全部代码&#xff0c;思路的流程 import numpy as np# 如何判断咖啡豆是烤好了 # 假设此神经网络由2层构成###### 这部分代码只是如何建立2层网络&#xff0c; ###### 并不包含如何加载神经网络中的参数 w 和 b######################## 第1层网络# x 是…

运维小妙招:如何让系统信息随登录自动展现?

在日常运维工作中&#xff0c;及时获取系统的基本信息对于维护系统的稳定性和安全性至关重要。通过一个简单的登录脚本&#xff0c;我们可以在用户每次登录时自动显示系统的关键信息&#xff0c;这不仅提高了工作效率&#xff0c;还能快速定位问题。本文将介绍如何编写这样一个…

ELK组件

资源列表 操作系统 IP 主机名 Centos7 192.168.10.51 node1 Centos7 192.168.10.52 node2 部署ELK日志分析系统 时间同步 chronyc sources -v 添加hosts解析 cat >> /etc/hosts << EOF 192.168.10.51 node1 192.168.10.52 node2 EOF 部署Elasticsea…

双Token方案实现Token自动续期(基于springboot+vue前后端分离项目)

文章目录 前言一、双Token方案介绍1. 令牌类型与功能2.双Token方案的优点3.实现流程 二、具体实现1.后端实现1.1 jwt工具类1.2 响应工具类1.3 实体类1.4 过滤器1.5 controller1.6 启动类 2、前端实现2.1 登录页面2.2 index页面2.3 请求拦截器和响应拦截器 效果展示 前言 更多j…

rce漏洞试试看 buuctf的pingpingping 试试ctf的rce怎么样

打开靶机开始操作 然后我们先知道一些知识点&#xff1a;下面这些是常用的 |这个管道符也就是上一条的命令的输出结果作为下一条命令的输入&#xff1b;这个是跟sql的堆叠注入是一样的|| || 当前面的执行出错时&#xff08;为假&#xff09;执行后面的 & 将任务置于后台执…

基于pytoch卷积神经网络水质图像分类实战

具体怎么学习pytorch&#xff0c;看b站刘二大人的视频。 完整代码&#xff1a; import numpy as np import os from PIL import Image import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data…

模板显式、隐式实例化和(偏)特化、具体化的详细分析

最近看了<The C Programing Language>看到了模板的特化&#xff0c;突然想起来<C Primer>上说的显式具体化、隐式具体化、特化、偏特化、具体化等概念弄得头晕脑胀&#xff0c;我在网上了找了好多帖子&#xff0c;才把概念给理清楚。 看着这么多叫法&#xff0c;其…

晨控CK-UR12-E01与欧姆龙NX/NJ系列EtherNet/IP通讯手册

晨控CK-UR12-E01与欧姆龙NX/NJ系列EtherNet/IP通讯手册 晨控CK-UR12-E01 是天线一体式超高频读写器头&#xff0c;工作频率默认为902MHz&#xff5e;928MHz&#xff0c;符合EPC Global Class l Gen 2&#xff0f;IS0-18000-6C 标准&#xff0c;最大输出功率 33dBm。读卡器同时…

C语言怎样初始化图形模式?

一、问题 在C语⾔中&#xff0c;initgraph( ) 函数⽤于初始化图形模式。初始化时&#xff0c;那么多参数都是⼲什么的&#xff1f;怎样设置&#xff1f; 二、解答 initgraph( ) 函数⽤于初始化图形模式&#xff0c;其语法格式如下。 void far initgraph(int far * gdriver, i…

0基础学习区块链技术——入门

大纲 区块链构成区块链相关技术Hash算法区块链区块链交易 参考资料 本文力求简单&#xff0c;不讨论任何技术细节&#xff0c;只是从简单的组成来介绍区块链技术&#xff0c;以方便大家快速入门。同时借助一些可视化工具&#xff0c;辅助大家有直观的认识。 区块链构成 顾名思…

python导入非当前目录(如:父目录)下的内容

在开发python项目时&#xff0c;通常会划分不同的目录&#xff0c;甚至不同层级的目录&#xff0c;这时如果直接导入不在当前目录下的内容时&#xff0c;会报如下的错误&#xff1a;ModuleNotFoundError: No module named miniai其实这里跟操作系统的环境变量很类似的&#xff…