nsqd的架构及源码分析

文章目录

一  nsq的整体代码结构

二  回顾nsq的整体架构图

三  nsqd进程的作用

四  nsqd启动流程的源码分析

五  本篇博客总结


在博客 nsq整体架构及各个部件作用详解_YZF_Kevin的博客-CSDN博客 中我们讲了nsq的整体框架,各个部件的大致作用。如果没看过的,建议大家去学习下,不然理解后续的内容会有难度

这篇博客开始我们来看下每个部件的详细功能,从源码入手分析其内部实现原理

一  nsq的整体代码结构

建议大家也下载nsq的代码,一边看博客一边看代码印象更深刻。nsq的官方git代码地址:GitHub - nsqio/nsq: A realtime distributed messaging platform

nsq代码结构如下,图中有注释,大家先有个整体印象,知道各个模块的代码在哪就行

二  回顾nsq的整体架构图

 图中最上面的四个节点就是nsqd进程,至少要有1个,可以多开。我们画了4个,分别是nsq1,nsq2,nsq3,nsq4

注意看nsqd的连接关系,每个nsqd节点和所有客户端都有连接(tcp+http),且每个nsqd节点和所有的nsqlookupd节点也有连接(tcp)

三  nsqd进程的作用

1. topic的创建,清空,暂停,重新激活,删除,持久化(保存到文件,从文件加载),同步给nsqlookupd进程

2. channel的创建,清空,暂停,重新激活,删除,持久化(保存到文件,从文件加载),同步给nsqlookupd进程

3. message的监听,中转,持久化(保存到文件,从文件加载),主动推送消息给各个客户端,超时重发,消息计数

4. 配置修改,运行状态(协程、内存)统计

5. 抽检channel的延迟队列,飞行队列,消息超时的重新入队

6. 统计和上报工作,主要统计topic,channel,消息,各种队列,客户端连接,GC等信息,通过UDP协议上报给指定地址的进程

可以说,nsqd进程是整个nsq平台的核心,消息队列架构简单的话,只有一个nsqd进程就够了。

我画了一个图来概括一个nsqd进程的工作内容,如下

 

四  nsqd启动流程的源码分析

nsqd的代码主要在两块

1. 代码框架及main函数,目录在 nsq/apps/nsqd/*

2. 实现代码,目录在 nsq/nsqd/*

值得一提的是nsqd,nsqlookupd,nsqadmin这三个进程的框架都使用了go-svc包,这个包很简单,使用者只需实现它的三个函数即可

Init()           配置,初始化等操作

Start()        真正启动

Stop()        结束时的关闭操作

好了,我们看nsqd的入口,也就是main函数,代码在nsq/apps/nsqd/main.go,代码如下(已加注释)

type program struct {
	once 		sync.Once
	nsqd 		*nsqd.NSQD	// nsqd对象
}

// nsqd的启动入口
func main() {
	prg := &program{}
	// Run内部会调用Init(),Start(),监听到这两个系统信号时会调用Stop()
	if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
		logFatal("%s", err)
	}
}

main()函数内部只有一个对象program,也只有一处调用svc.Run(),这个函数内部会调用program.Init()program.Start()

其中program.Init()函数,主要是创建并检测nsqd的配置,然后根据配置创建出一个nsqd实例

重点在program.Start()函数,代码如下(已加注释)

// nsqd的启动,重点在调用的Main()函数
func (p *program) Start() error {
	// 加载元数据,并创建初始化出所有的topic对象,所有的channel对象
	err := p.nsqd.LoadMetadata()
	if err != nil {
		logFatal("failed to load metadata - %s", err)
	}
	// 再持久化元数据到文件(不要觉得奇怪,因为上面的LoadMetadata()函数可能会过滤掉一些无效的topic,channel,这里再重写算是刷新了元数据)
	err = p.nsqd.PersistMetadata()
	if err != nil {
		logFatal("failed to persist metadata - %s", err)
	}
	// 启动一个新协程,专门运行nsqd的Main()循环,注意这个Main()是永不退出的(除非出错)
	go func() {
		err := p.nsqd.Main()
		if err != nil {
			p.Stop()
			os.Exit(1)
		}
	}()
	return nil
}

对上面的代码解释下,program.Start()函数一共干了3件事

1. nsqd.LoadMetadata(), 这个函数根据配置加载旧nsqd元数据。这些元数据包含版本号,topic,channel,过滤掉不合法的topic和channel,合法的topic和channel都创建出对象,并且为每个topic建立处理循环

2. nsqd.PersistMetadata(), 把过滤后的topic和channel再保存到文件nsqd.dat,算是把旧数据过滤了一遍

3. 新启动一个协程,调用nsqd.Main(),这个Main()是nsqd的核心,启动了nsqd的全部服务。除非遇到错误,否则永不退出

接下来看nsqd.Main()的内部实现,代码在nsq/nsqd/nsqd.go,代码如下(已加注释)

// nsqd主协程(内部启动tcp循环,http循环,https循环, 扫描队列池,和nsqlookupd循环),永不退出,除非严重错误
func (n *NSQD) Main() error {
	exitCh := make(chan error)
	var once sync.Once

	// 退出函数(独立协程运行,一直监听,遇到错误
	exitFunc := func(err error) {
		once.Do(func() {
			if err != nil {
				n.logf(LOG_FATAL, "%s", err)
			}
			exitCh <- err
		})
	}

	// TCP服务,独立协程
	n.waitGroup.Wrap(func() {
		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
	})

	// HTTP服务,独立协程
	if n.httpListener != nil {
		httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
		})
	}

	// HTTPS服务,独立协程
	if n.httpsListener != nil {
		httpsServer := newHTTPServer(n, true, true)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
		})
	}

	// 独立协程,抽检扫描各个队列
	n.waitGroup.Wrap(n.queueScanLoop)

	// 独立协程,和nsqlookupd的循环(连接和重连,心跳维持,topic,channel变化通知等)
	n.waitGroup.Wrap(n.lookupLoop)

	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(n.statsdLoop)
	}

	err := <-exitCh
	return err
}

对上面的代码解释下,nsqd.Main()主要干了6件事

1. 开一个新协程,启动tcp服务并一直监听,为客户端提供tcp服务。我们的客户端最常用,因为生产消息,中转消息,处理消息都是这里实现的

2. 开一个新协程,启动http服务并一直监听,为客户端提供htttp服务

3. 开一个新协程,启动https服务并一直监听,为客户端提供htttps服务

4. 开一个新协程,建立并维持扫描池,这些扫描协程会扫描所有channel的延迟队列,飞行队列,如果消息超时了就重新入队。很有意思的是,nsqd作者很大方地承认他抄袭了redis的抽检策略,内部实现也确实是类redis操作,这个我们后面再讲,todo

5. 开一个新协程,和nsqlookupd建立循环,主要是连接和重连,心跳维持,实时报告自己的topic和channel变化

6. 如果你配置了信息上报的地址,nsqd会再开一个新协程,做统计上报操作,统计topic,channel,消息,内存,GC等信息,通过UDP协议上报过去

五  本篇博客总结

1. 给大家看了nsq平台下代码整体结构,建议大家下载源码自己看下,加强印象

2. 讲了nsqd进程提供的功能实现

3. 跟踪了nsqd进程启动流程,最核心的nsqd.Main()建议大家仔细看,后面讲的nsqd内容也都是这几个协程里面干的活

下一篇博客我们开始详解分析nsqd内部各个协程的具体工作

todo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/62674.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【websocket - Tornado】简易聊天应用

1、背景 项目测试的过程中需要自己搭建一个webscoket站点,确保此类服务接入后台系统后访问不受影响。python的服务框架常用的有Flask、Django、Tornado,每个框架的侧重点不同,导致使用的场景就会有所差异。 Flask轻量级,采用常规的同步编程方式,需要安装其他模块辅助,主…

Pytest测试框架2

目录&#xff1a; pytest参数化用例pytest标记测试用例pytest设置跳过、预期失败用例pytest运行用例pytest测试用例调度与运行pytest命令行常用参数python执行pytestpytest异常处理 1.pytest参数化用例 参数化 通过参数的方式传递数据&#xff0c;从而实现数据和脚本分离。…

并网逆变器学习笔记6---三电平SVPWM下的连续和不连续调制

之前在学习中总结过一次DPWM策略选择&#xff1a;并网逆变器学习笔记5---三电平DPWM 但是对于三电平逆变器而言&#xff0c;如何从连续调制切换到不连续调制&#xff0c;存在一些疑惑点&#xff0c;下午闲来无事&#xff0c;把SVPWM下的连续调制和不连续调制的开关状态选择&am…

MyCat核心概念、需求案例讲解、环境准备及分片配置

1.MyCat概念介绍 2.MyCat入门需求 2.1 需求分析 2.2 环境准备 输入以下命令检查服务器防火墙状态 dead代表关闭状态&#xff0c;如果不关闭也可以需要开放特定的端口号&#xff01;&#xff01; systemctl status firewalld接着需要在三台服务器上的MySQL上创建三个数据库db0…

(树) 剑指 Offer 36. 二叉搜索树与双向链表 ——【Leetcode每日一题】

❓ 剑指 Offer 36. 二叉搜索树与双向链表 难度&#xff1a;中等 输入一棵二叉搜索树&#xff0c;将该二叉搜索树转换成一个 排序的循环双向链表。要求不能创建任何新的节点&#xff0c;只能调整树中节点指针的指向。 为了让您更好地理解问题&#xff0c;以下面的二叉搜索树为…

相机传感器格式与镜头光圈参数

相机靶面大小 CCD/CMOS图像传感器尺寸&#xff08;sensor format&#xff09;1/2’‘、1/3’‘、1/4’实际是多大 1英寸——靶面尺寸为宽12.7mm*高9.6mm&#xff0c;对角线16mm。 2/3英寸——靶面尺寸为宽8.8mm*高6.6mm&#xff0c;对角线11mm。 1/2英寸——靶面尺寸为宽6.…

SSE技术和WebSocket技术实现即时通讯

文章目录 一、SSE1.1 什么是SSE1.2 工作原理1.3 特点和适用场景1.4 API用法1.5 代码实现 二、WebSocket2.1 什么是WebSocket2.2 工作原理2.3 特点和适用场景2.4 API用法2.5 代码实现 三、SSE与WebSocket的比较 当涉及到实现实时通信的Web应用程序时&#xff0c;两种常见的技术选…

网络安全【黑客技术】自学

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高&#xff1b; 二、则是发展相对成…

每天五分钟机器学习:梯度下降算法和正规方程的比较

本文重点 梯度下降算法和正规方程是两种常用的机器学习算法,用于求解线性回归问题。它们各自有一些优点和缺点,下面将分别对它们进行详细的讨论。 区别 1. 梯度下降算法是一种迭代的优化算法,通过不断迭代调整参数来逼近最优解。它的基本思想是根据目标函数的梯度方向,沿…

openGauss学习笔记-32 openGauss 高级数据管理-批处理模式

文章目录 openGauss学习笔记-32 openGauss 高级数据管理-批处理模式32.1 语法格式32.2 参数说明32.3 示例 openGauss学习笔记-32 openGauss 高级数据管理-批处理模式 openGauss支持从文本文件执行SQL语句。openGauss提供了gsql工具实现SQL语句的批量处理。 以下场景建议使用批…

测试人员简单使用Jenkins

一、测试人员使用jenkins干什么&#xff1f; 部署测试环境 二、相关配置说明 一般由开发人员进行具体配置 1.Repository URL&#xff1a;填写git地址 2.填写开发分支&#xff0c;测试人员可通过相应分支进行测试环境的构建部署 当多个版本并行时&#xff0c;开发人员可以通过…

【各个突破】Echart的象柱形图数值为0时,图像发生严重偏移,一招即可解决

【各个突破】Echart的象柱形图数值为0时&#xff0c;图像发生严重偏移&#xff0c;一招即可解决 1&#xff0c;问题描述2&#xff0c;解决方法3&#xff0c;最终结果 1&#xff0c;问题描述 当数值是0亩时&#xff0c;圆形图标发生位置偏移&#xff0c;据悉&#xff0c;该bug是…

掌握 JVM 调优命令

点击下方关注我&#xff0c;然后右上角点击...“设为星标”&#xff0c;就能第一时间收到更新推送啦~~~ JVM 日常调优总结起来就是&#xff1a;首先通过 jps 命令查看当前进程&#xff0c;然后根据 pid 通过 jinfo 命令查看和修改 jvm 参数&#xff0c;通过 jstat 命令查看 cla…

漫画 | TCP/IP之大明邮差

后记&#xff1a; 1973年&#xff0c;卡恩与瑟夫开发出了网络中最核心的两个协议&#xff1a;TCP协议和IP协议&#xff0c;随后为了验证两个协议的可用性&#xff0c;他们做了一个实验&#xff0c;在多个异构网络中进行数据传输&#xff0c;数据包在经过近10万公里的旅程后到达…

git删除已经提交的大文件

当你不小心把一个巨大的二进制文件提交到git仓库的时候&#xff0c;此时删除再提交也没有用了&#xff0c;大文件已经在仓库中留底了。另外比如需要删除某个需要保密的文件&#xff0c;都是相同的解决办法。 我本来想着把dll放在三方库里面提交到仓库里&#xff0c;省得在不同…

STM32 低功耗-待机模式

STM32 待机模式 文章目录 STM32 待机模式第1章 低功耗模式简介第2章 待机模式简介2.1 进入待机模式2.1 退出待机模式 第3章 待机模式代码部分总结 第1章 低功耗模式简介 在 STM32 的正常工作中&#xff0c;具有四种工作模式&#xff1a;运行、睡眠、停止和待机模式。 在系统或…

【九】mybatis 缓存模块设计

mybatis 缓存模块设计 简介&#xff1a;MyBatis提供了一级缓存和二级缓存&#xff0c;其中一级缓存基于SqlSession实现&#xff0c;而二级缓存基于Mapper实现。这里我们就来学习一下MyBatis缓存的使用&#xff0c;并分析MyBatis缓存的实现原理。 首先我们找到缓存模块的源码&a…

EVE-NG MPLS L2VPN static lsp

目录 1 拓扑 2 配置步骤 2.1 配置接口IP 和路由协议 2.2 配置MPLS LDP 2.3 配置L2VPN PW 2.4 验证L2VPN 1 拓扑 2 配置步骤 2.1 配置接口IP 和路由协议 PE1 interface LoopBack 0ip address 1.1.1.9 32 quitinterface GigabitEthernet1/0ip address 10.1.1.1 255.255…

【腾讯云 Cloud Studio 实战训练营】基于Cloud Studio构建React完成点餐H5页面

前言 【腾讯云 Cloud Studio 实战训练营】基于Cloud Studio 构建React完成点餐H5页面一、Cloud Studio介绍1.1 Cloud Studio 是什么1.2 相关链接1.3 登录注册 二、实战练习2.1 初始化工作空间2.2 开发一个简版的点餐系统页面1. 安装 antd-mobile2. 安装 less 和 less-loader3. …