go-zero(十二)消息队列

go zero 消息队列

在微服务架构中,消息队列主要通过异步通信实现服务间的解耦,使得各个服务可以独立发展和扩展。

go-zero中使用的队列组件go-queue,是gozero官方实现的基于Kafka和Beanstalkd 的消息队列框架,我们使用kafka作为演示。

一、kafka简单介绍

Kafka 是一个开源的分布式流处理平台,主要用于构建实时数据管道和流应用。

1.Kafka 的架构

Kafka 的架构通常由以下几个部分构成:

  • Broker(节点):Kafka 集群由多个 broker 实例组成,负责管理消息的存储和处理。
  • Topic(主题):消息以主题的形式组织,每个主题可以有多个分区(partition),以支持高并发和扩展性。
  • Produce(生产者):消息生产者将数据发送到特定的topic中。
  • Consumer(消费者):消费者从topic中读取数据,可以将多个消费者分组以进行负载均衡。

2.Kafka 的关键特性

  1. 高吞吐量
    Kafka 设计上能够处理大量的实时数据流,具备非常高的吞吐量。这使得它能够轻松应对大规模数据流量,适合做日志聚合、监控数据处理等。

  2. 持久性
    Kafka 将消息持久化到磁盘,并提供复制功能,以确保数据的安全性和可靠性。即使在节点出现故障的情况下,也能保证数据不会丢失。

  3. 可扩展性
    Kafka 能够水平扩展,通过增加更多的节点来处理更多的消费者和生产者,这使得它能够应对越来越多的业务需求。

  4. 实时处理
    Kafka 提供低延迟的数据传输,这使得实时处理和分析成为可能。您可以瞬时处理到来的数据流。

  5. 支持多种消息传递模式
    Kafka 支持发布-订阅和点对点的消息传递模式,能够灵活适应不同场景下的需求。

  6. 强大的生态系统
    Kafka 拥有丰富的生态系统,包括 Kafka Streams 和 Kafka Connect,这些工具可以帮助开发者更方便地进行流处理和数据集成。

3.常见应用场景

  1. 日志聚合
    Kafka 可以作为一个集中式的日志聚合器,将分布在不同服务的日志集中到一个地方,方便后续分析和监控。

  2. 实时数据流处理
    使用 Kafka,用户可以实时处理和分析流数据,例如检测异常、生成实时报告等。

  3. 系统监控和事件追踪
    Kafka 经常用于收集和跟踪系统事件(如用户行为、系统状态等),并通过流式处理进行实时监控。

  4. 数据集成
    Kafka 可以作为数据的桥梁,连接不同的数据源和目标系统,方便实现数据的流转和转换。

  5. 消息队列
    Kafka 可用作高效的消息队列,实现服务间的异步通信。例如,在微服务架构中,服务 A 可以将消息发送到 Kafka,而服务 B 可以异步地从 Kafka 中读取处理这些消息。

二、环境部署

1.Docker安装Kafka

配置文件:

version: '3'

######## 项目依赖的环境,启动项目之前要先启动此环境 #######

services:

  #zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    restart: always
    ports:
      - 2181:2181
    networks:
      - gozero_net

  #消息队列 - Message queue
  kafka:
    image: 'bitnami/kafka:3.6.2'
    container_name: kafka
    restart: always
    ulimits:
      nofile:
        soft: 65536
        hard: 65536
    environment:
      - TZ=Asia/Shanghai
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    ports:
      - '9092:9092'
      - '9094:9094'
    volumes:
      - ./volumes/kafka:/bitnami/kafka


networks:
  gozero_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.16.0.0/16

这里的kafka 对外暴露的端口为9094

使用命令执行文件

docker compose up

2.创建topic

进入kafka容器

docker exec -it {{容器ID}} /bin/bash
#或者直接使用容器名
docker exec -it kafka /bin/bash

进入kafka执行命令目录
进入kafka执行命令目录

cd /opt/bitnami/kafka/bin

创建topic

创建名为topic-test的topic

./kafka-topics.sh --create --topic topic-test --bootstrap-server localhost:9092

查看topic信息

./kafka-topics.sh --describe --topic topic-test --bootstrap-server localhost:9092

3.测试topic

使用两个终端,进入kafka执行命令目录,执行下面两个命令:

生产消息

./kafka-console-producer.sh --topic topic-test --bootstrap-server localhost:9092

消费消息

./kafka-console-consumer.sh --topic topic-test --bootstrap-server localhost:9092

测试

在生产者输入消息,会自动同步到消费者
在这里插入图片描述

4. 拉取依赖

项目中首先要拉取 go-queue 的依赖

go get github.com/zeromicro/go-queue@latest

三、项目演示

1.配置说明

type KqConf struct {
   service.ServiceConf

   // Brokers: Kafka 的多个 Broker 节点
   Brokers []string

   // Group: 消费者组
   Group string

   // Topic: 订阅的 Topic 主题
   Topic string

   // Offset: 如果新的 topic Kafka 没有对应的 offset 信息,或者当前的 offset 无效(历史数据被删除),
   // 需要指定从头(first)消费还是从尾(last)消费
   Offset string `json:",options=first|last,default=last"`

   // Conns: 一个 Kafka queue 对应可对应多个 consumer,Conns 对应 Kafka queue 数量,
   // 可以同时初始化多个 Kafka queue,默认只启动一个
   Conns int `json:",default=1"`

   // Consumers: go-queue 内部起多个 goroutine 从 Kafka 中获取信息写入进程内的 channel,
   // 此参数控制 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
   Consumers int `json:",default=8"`

   // Processors: 当 Consumers 中的多个 goroutine 拉取到 Kafka 消息后,
   // 通过此参数控制当前消费逻辑的并发 goroutine 数量
   Processors int `json:",default=8"`

   // MinBytes: fetch 一次返回的最小字节数,若不够该字节数则会等待
   MinBytes int `json:",default=10240"`    // 10K

   // MaxBytes: fetch 一次返回的最大字节数,若第一条消息大小超过该限制仍会继续拉取,
   // 以确保 consumer 的正常运行。并非绝对配置,消息大小也受 broker 的 message.max.bytes 限制,
   // 以及 topic 的 max.message.bytes 限制
   MaxBytes int `json:",default=10485760"` // 10M

   // Username: Kafka 的账号(可选)
   Username string `json:",optional"`

   // Password: Kafka 的密码(可选)
   Password string `json:",optional"`
}

2.配置

配置文件

在yaml 配置文件中添加当前的 kafka 配置信息,我这里为了省事就都放在一个配置文件下了:

#....

#生产者
KqPusherConf:
  Name: log-producer
  Brokers:
    - 127.0.0.1:9094
  Group: logs-group
  Topic: topic-test
#消费者
KqConsumerConf:
    Name: log-consumer
    Brokers:
      - 127.0.0.1:9094
    Group: logs-group
    Topic: topic-test
    Offset: last
    Consumers: 8
    Processors: 8

config.go

在 internal/config 下的 config.go 中定义 go 映射的配置


type Config struct {

	/*
	.....
	*/
	KqPusherConf   kq.KqConf
	KqConsumerConf kq.KqConf
}

svc注入
在 svc/serviceContext.go 中初始化 pusher 的 kq client

type ServiceContext struct {
	Config  config.Config
	KqPusherClient *kq.Pusher
	
}

func NewServiceContext(c config.Config) *ServiceContext {
	return &ServiceContext{
		Config:  c,
		KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),
	}
}

3. 生产者

在 logic 中写业务逻辑使用 go-queue 的 kq client 发送消息到 kafka,这里我们用登录作为演示,当登录成功后,发送用户信息:


func (l *LoginLogic) Login(req *types.LoginRequest) (resp *types.LoginResponse, err error) {
	// todo: add your logic here and delete this line
	
	/*
	....省略其他代码
	*/
	
	//生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装
	threading.GoSafe(func() {
		
		logData := map[string]any{
			"user":   user.Username,
			"mobile": user.Mobile,
		}
		logs, _ := json.Marshal(logData)
		// 使用Push推送消息,消息为json
		err := l.svcCtx.KqPusherClient.Push(l.ctx, string(logs))
		if err != nil {
			logx.Errorf("KqPusherClient Push Error , err :%v", err)
		}
	})

	// 如果既没有验证码也没有密码
	return nil, errors.New(10010, "未提供有效的登录凭证")	
}

生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装,如果出现panics 会自动恢复。

4. 消费者

internal 下新建一个 mq 文件夹,在 mq 文件夹下新建一个消费者 consumer.go:

package mqs

import (
	"beyond/user/api/internal/svc"
	"context"
	"fmt"
	"github.com/zeromicro/go-zero/core/logc"
	"github.com/zeromicro/go-zero/core/logx"
)

//定义日志消费者
type LogsConsumer struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
}

// 定义构造方法
func NewLogsConsumer(ctx context.Context, svcCtx *svc.ServiceContext) *LogsConsumer {
	return &LogsConsumer{
		ctx:    ctx,
		svcCtx: svcCtx,
	}
}

// Consume为go zero内置接口, 实现Consume接口方法
func (l *LogsConsumer) Consume(ctx context.Context, key, val string) error {
	
	//logx.Infof("Consumer key :%s , val :%s", key, val)
	logc.Infof(ctx, "Consumer key :%s, val :%s", key, val)
	return nil
}

Consume 为go queue内置接口
在这里插入图片描述

因为消费者可能有多个,在 mq 文件夹下新建一个文件mqs.go用来监听多个消费者,mqs.go 代码如下:

package mqs

import (
	"beyond/user/api/internal/config"
	"beyond/user/api/internal/svc"
	"context"

	"github.com/zeromicro/go-queue/kq"
	"github.com/zeromicro/go-zero/core/service"
)

func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
	// 监听消费者状态变化
	return []service.Service{
		//创建消息队列
		kq.MustNewQueue(c.KqConsumerConf, NewLogsConsumer(ctx, svcCtx)),
	}

}

在 main.go 中启动 consumers 等待消费


func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)

	server := rest.MustNewServer(c.RestConf)
	defer server.Stop()

	svcCtx := svc.NewServiceContext(c)
	handler.RegisterHandlers(server, svcCtx)

	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
	// 因为现在添加了mq,属于多服务状态,所以需要启动mq服务
	//server.Start()  
	//创建新的服务组
	serviceGroup := service.NewServiceGroup()
	defer serviceGroup.Stop()
	// 从mq中获取消费者服务,并添加到服务组中
	for _, mq := range mqs.Consumers(c, context.Background(), svcCtx) {
		serviceGroup.Add(mq)
	}
	//添加原来的server服务
	serviceGroup.Add(server)
	// 启动服务组
	serviceGroup.Start()

}

5.启动项目

我们这里就是简单的输出日志,你也可以拓展成发送邮件或者短信给用户,提示用户注册成功。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/936155.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue绕过rules自定义编写动态校验

今天犯了个低级错误,虽然走了很多弯路,但这个过程还是值得记录一下 例子如下,有两个输入框: 第一个是套餐选择下拉框,可以下拉选择三个内容 第二个要根据上面的套餐选择三个选项来决定怎么显示,使用v-if&…

ABAQUS进行焊接仿真分析(含子程序)

0 前言 焊接技术作为现代制造业中的重要连接工艺,广泛应用于汽车、船舶、航空航天、能源等多个行业。焊接接头的质量和性能直接影响到结构件的安全性、可靠性和使用寿命。因此,在焊接过程中如何有效预测和优化焊接过程中的热效应、应力变化以及材料变形等问题,成为了焊接研…

【efinance一个2k星的库】

efinance 是一个可以快速获取基金、股票、债券、期货数据的 Python 库,回测以及量化交易的好帮手 但没有等比复权的,不用。 import efinance as ef ef.stock.get_quote_history(510880,fqt2)

【考前预习】3.计算机网络—数据链路层

往期推荐 【考前预习】2.计算机网络—物理层-CSDN博客 【考前预习】1.计算机网络概述-CSDN博客 浅谈云原生--微服务、CICD、Serverless、服务网格_云原生cicd-CSDN博客 子网掩码、网络地址、广播地址、子网划分及计算_子网广播地址-CSDN博客 浅学React和JSX-CSDN博客 目录 1.数…

Microsemi Libero SoC免费许可证申请指南(Microchip官网2024最新方法)

点击如下链接: https://www.microchip.com/en-us/products/fpgas-and-plds/fpga-and-soc-design-tools/fpga/licensing 点击右侧,请求免费的License 如果提示登录,请先登录Microchip账号。 点击Request Free License。 选项一年免费的Li…

嵌入式Linux应用层开发——调试专篇(关于使用GDB调试远程下位机开发板的应用层程序办法 + VSCode更好的界面调试体验提升)

环境预备——调试 虽说有正点原子的代码带着,但是,如果我们只是打着printf这种方式进行手动的检查代码错误,还是不太方便的,笔者这里整理了两个上位机调试路线。 路线1:使用GCC7.5,这个路线比较保守&#…

深度学习训练参数之学习率介绍

学习率 1. 什么是学习率 学习率是训练神经网络的重要超参数之一,它代表在每一次迭代中梯度向损失函数最优解移动的步长,通常用 η \eta η 表示。它的大小决定网络学习速度的快慢。在网络训练过程中,模型通过样本数据给出预测值&#xff0…

蒙特卡洛模拟(Monte Carlo Simulation)详解

简介:个人学习分享,如有错误,欢迎批评指正。 历史背景 蒙特卡洛模拟的名称来源于摩纳哥的蒙特卡洛赌场,因其依赖于随机性和概率,与赌博中的随机过程有相似之处。该方法的雏形可以追溯到20世纪40年代,二战期…

Git-分支(branch)常用命令

分支 我们在做项目开发的时候,无论是软件项目还是其他机械工程项目,我们为了提高效率以及合理的节省时间等等原因,现在都不再是线性进行,而是将一个项目抽离出诸进行线,每一条线在git中我们就叫做分支,bran…

《数据结构之美-- 单链表》

引言: 首先由上次我们实现的顺序表聊起,我们在实现顺序表的时候会发现,在每次插入数据时当空间不够时就会涉及到扩容,而顺序表的扩容一般都是呈二倍的形式来进行,因此这就有可能造成空间的浪费,那该如何解…

NVR小程序接入平台/设备EasyNVR深度解析H.265与H.264编码视频接入的区别

随着科技的飞速发展和社会的不断进步,视频压缩编码技术已经成为视频传输和存储中不可或缺的一部分。在众多编码标准中,H.265和H.264是最为重要的两种。今天我们来将深入分析H.265与H.264编码的区别。 一、H.265与H.264编码的区别 1、比特率与分辨率 H.…

JPG 转 PDF:免费好用的在线图片转 PDF 工具

JPG 转 PDF:免费好用的在线图片转 PDF 工具 在日常工作和生活中,我们经常需要将图片转换为 PDF 格式。无论是制作电子文档、准备演示材料,还是整理照片集,将图片转换为 PDF 都是一个常见的需求。今天为大家介绍一款完全免费、无需…

RabbitMQ 整合 SpringBoot

概述 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力、流量削峰消息服务中两个重要概念: 消息代理(`message broker`)和目的地(`destination`) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。消息队列主要有两种形…

Docker的初识

目录 1. 容器技术发展史1.1 Jail 时代1.2 云时代1.3 云原生时代1.3.1 Google & Docker 竞争1.3.2 k8s 成为云原生事实标准 2. 虚拟化和容器化的概念2.1 什么是虚拟化、容器化2.2 为什么要虚拟化、容器化?2.3 虚拟化实现方式2.3.1 应用程序执行环境分层2.3.2 虚拟…

MySQL 索引解析:让查询速度飙升

1.前言 之前几篇文章,小编和大家分享了mysql innodb的内存结构,这次小编准备用两篇文章来和大家分享下mysql innodb的索引: mysql的基础知识 和 基于索引的sql优化 。 2. 什么是索引? 定义:索引是数据库中用于快速查找数据的机…

记录 idea 启动 tomcat 控制台输出乱码问题解决

文章目录 问题现象解决排查过程1. **检查 idea 编码设置**2. **检查 tomcat 配置**3.检查 idea 配置文件4.在 Help 菜单栏中,修改Custom VM Options完成后保存,并重启 idea 问题现象 运行 tomcat 后,控制台输出乱码 解决排查过程 1. 检查 id…

MySQL有哪些高可用方案?

大家好,我是锋哥。今天分享关于【MySQL有哪些高可用方案?】面试题。希望对大家有帮助; MySQL有哪些高可用方案? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 MySQL 高可用方案旨在确保数据库系统的高可靠性、低宕机时间、以及在硬件故障…

基于STM32的火灾烟雾报警器设计开题报告

开题报告 题目:基于STM32的火灾烟雾报警器Proteus仿真设计 一、研究背景与意义 随着现代城市化进程的加快,火灾安全问题日益凸显,火灾的早期预警对于减少人员伤亡和财产损失至关重要。传统的火灾报警系统往往依赖于烟雾或温度的单一检测&a…

《机器学习》3.7-4.3end if 启发式 uci数据集klda方法——非线性可分的分类器

目录 uci数据集 klda方法——非线性可分的分类器 计算 步骤 1: 选择核函数 步骤 2: 计算核矩阵 步骤 4: 解广义特征值问题 と支持向量机(svm) 目标: 方法: 核技巧的应用: 区别: 使用 OvR MvM 将…

【蓝桥杯选拔赛真题93】Scratch青蛙过河 第十五届蓝桥杯scratch图形化编程 少儿编程创意编程选拔赛真题解析

目录 Scratch青蛙过河 一、题目要求 编程实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、python资料 Scratc…