go zero 消息队列
在微服务架构中,消息队列主要通过异步通信实现服务间的解耦,使得各个服务可以独立发展和扩展。
go-zero中使用的队列组件go-queue
,是gozero官方实现的基于Kafka和Beanstalkd 的消息队列框架,我们使用kafka作为演示。
一、kafka简单介绍
Kafka 是一个开源的分布式流处理平台,主要用于构建实时数据管道和流应用。
1.Kafka 的架构
Kafka 的架构通常由以下几个部分构成:
- Broker(节点):Kafka 集群由多个 broker 实例组成,负责管理消息的存储和处理。
- Topic(主题):消息以主题的形式组织,每个主题可以有多个分区(partition),以支持高并发和扩展性。
- Produce(生产者):消息生产者将数据发送到特定的topic中。
- Consumer(消费者):消费者从topic中读取数据,可以将多个消费者分组以进行负载均衡。
2.Kafka 的关键特性
-
高吞吐量:
Kafka 设计上能够处理大量的实时数据流,具备非常高的吞吐量。这使得它能够轻松应对大规模数据流量,适合做日志聚合、监控数据处理等。 -
持久性:
Kafka 将消息持久化到磁盘,并提供复制功能,以确保数据的安全性和可靠性。即使在节点出现故障的情况下,也能保证数据不会丢失。 -
可扩展性:
Kafka 能够水平扩展,通过增加更多的节点来处理更多的消费者和生产者,这使得它能够应对越来越多的业务需求。 -
实时处理:
Kafka 提供低延迟的数据传输,这使得实时处理和分析成为可能。您可以瞬时处理到来的数据流。 -
支持多种消息传递模式:
Kafka 支持发布-订阅和点对点的消息传递模式,能够灵活适应不同场景下的需求。 -
强大的生态系统:
Kafka 拥有丰富的生态系统,包括 Kafka Streams 和 Kafka Connect,这些工具可以帮助开发者更方便地进行流处理和数据集成。
3.常见应用场景
-
日志聚合:
Kafka 可以作为一个集中式的日志聚合器,将分布在不同服务的日志集中到一个地方,方便后续分析和监控。 -
实时数据流处理:
使用 Kafka,用户可以实时处理和分析流数据,例如检测异常、生成实时报告等。 -
系统监控和事件追踪:
Kafka 经常用于收集和跟踪系统事件(如用户行为、系统状态等),并通过流式处理进行实时监控。 -
数据集成:
Kafka 可以作为数据的桥梁,连接不同的数据源和目标系统,方便实现数据的流转和转换。 -
消息队列:
Kafka 可用作高效的消息队列,实现服务间的异步通信。例如,在微服务架构中,服务 A 可以将消息发送到 Kafka,而服务 B 可以异步地从 Kafka 中读取处理这些消息。
二、环境部署
1.Docker安装Kafka
配置文件:
version: '3'
######## 项目依赖的环境,启动项目之前要先启动此环境 #######
services:
#zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
environment:
# 时区上海 - Time zone Shanghai (Change if needed)
TZ: Asia/Shanghai
restart: always
ports:
- 2181:2181
networks:
- gozero_net
#消息队列 - Message queue
kafka:
image: 'bitnami/kafka:3.6.2'
container_name: kafka
restart: always
ulimits:
nofile:
soft: 65536
hard: 65536
environment:
- TZ=Asia/Shanghai
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
ports:
- '9092:9092'
- '9094:9094'
volumes:
- ./volumes/kafka:/bitnami/kafka
networks:
gozero_net:
driver: bridge
ipam:
config:
- subnet: 172.16.0.0/16
这里的kafka 对外暴露的端口为9094
使用命令执行文件
docker compose up
2.创建topic
进入kafka容器
docker exec -it {{容器ID}} /bin/bash
#或者直接使用容器名
docker exec -it kafka /bin/bash
进入kafka执行命令目录
进入kafka执行命令目录
cd /opt/bitnami/kafka/bin
创建topic
创建名为topic-test
的topic
./kafka-topics.sh --create --topic topic-test --bootstrap-server localhost:9092
查看topic信息
./kafka-topics.sh --describe --topic topic-test --bootstrap-server localhost:9092
3.测试topic
使用两个终端,进入kafka执行命令目录,执行下面两个命令:
生产消息
./kafka-console-producer.sh --topic topic-test --bootstrap-server localhost:9092
消费消息
./kafka-console-consumer.sh --topic topic-test --bootstrap-server localhost:9092
测试
在生产者输入消息,会自动同步到消费者
4. 拉取依赖
项目中首先要拉取 go-queue 的依赖
go get github.com/zeromicro/go-queue@latest
三、项目演示
1.配置说明
type KqConf struct {
service.ServiceConf
// Brokers: Kafka 的多个 Broker 节点
Brokers []string
// Group: 消费者组
Group string
// Topic: 订阅的 Topic 主题
Topic string
// Offset: 如果新的 topic Kafka 没有对应的 offset 信息,或者当前的 offset 无效(历史数据被删除),
// 需要指定从头(first)消费还是从尾(last)消费
Offset string `json:",options=first|last,default=last"`
// Conns: 一个 Kafka queue 对应可对应多个 consumer,Conns 对应 Kafka queue 数量,
// 可以同时初始化多个 Kafka queue,默认只启动一个
Conns int `json:",default=1"`
// Consumers: go-queue 内部起多个 goroutine 从 Kafka 中获取信息写入进程内的 channel,
// 此参数控制 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
Consumers int `json:",default=8"`
// Processors: 当 Consumers 中的多个 goroutine 拉取到 Kafka 消息后,
// 通过此参数控制当前消费逻辑的并发 goroutine 数量
Processors int `json:",default=8"`
// MinBytes: fetch 一次返回的最小字节数,若不够该字节数则会等待
MinBytes int `json:",default=10240"` // 10K
// MaxBytes: fetch 一次返回的最大字节数,若第一条消息大小超过该限制仍会继续拉取,
// 以确保 consumer 的正常运行。并非绝对配置,消息大小也受 broker 的 message.max.bytes 限制,
// 以及 topic 的 max.message.bytes 限制
MaxBytes int `json:",default=10485760"` // 10M
// Username: Kafka 的账号(可选)
Username string `json:",optional"`
// Password: Kafka 的密码(可选)
Password string `json:",optional"`
}
2.配置
配置文件
在yaml 配置文件中添加当前的 kafka 配置信息,我这里为了省事就都放在一个配置文件下了:
#....
#生产者
KqPusherConf:
Name: log-producer
Brokers:
- 127.0.0.1:9094
Group: logs-group
Topic: topic-test
#消费者
KqConsumerConf:
Name: log-consumer
Brokers:
- 127.0.0.1:9094
Group: logs-group
Topic: topic-test
Offset: last
Consumers: 8
Processors: 8
config.go
在 internal/config 下的 config.go 中定义 go 映射的配置
type Config struct {
/*
.....
*/
KqPusherConf kq.KqConf
KqConsumerConf kq.KqConf
}
svc注入
在 svc/serviceContext.go 中初始化 pusher 的 kq client
type ServiceContext struct {
Config config.Config
KqPusherClient *kq.Pusher
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),
}
}
3. 生产者
在 logic 中写业务逻辑使用 go-queue 的 kq client 发送消息到 kafka,这里我们用登录作为演示,当登录成功后,发送用户信息:
func (l *LoginLogic) Login(req *types.LoginRequest) (resp *types.LoginResponse, err error) {
// todo: add your logic here and delete this line
/*
....省略其他代码
*/
//生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装
threading.GoSafe(func() {
logData := map[string]any{
"user": user.Username,
"mobile": user.Mobile,
}
logs, _ := json.Marshal(logData)
// 使用Push推送消息,消息为json
err := l.svcCtx.KqPusherClient.Push(l.ctx, string(logs))
if err != nil {
logx.Errorf("KqPusherClient Push Error , err :%v", err)
}
})
// 如果既没有验证码也没有密码
return nil, errors.New(10010, "未提供有效的登录凭证")
}
生产者需要异步执行,threading.GoSafe()
是go zero官方对 go func()
的安全封装,如果出现panics
会自动恢复。
4. 消费者
在 internal
下新建一个 mq
文件夹,在 mq
文件夹下新建一个消费者 consumer.go
:
package mqs
import (
"beyond/user/api/internal/svc"
"context"
"fmt"
"github.com/zeromicro/go-zero/core/logc"
"github.com/zeromicro/go-zero/core/logx"
)
//定义日志消费者
type LogsConsumer struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
// 定义构造方法
func NewLogsConsumer(ctx context.Context, svcCtx *svc.ServiceContext) *LogsConsumer {
return &LogsConsumer{
ctx: ctx,
svcCtx: svcCtx,
}
}
// Consume为go zero内置接口, 实现Consume接口方法
func (l *LogsConsumer) Consume(ctx context.Context, key, val string) error {
//logx.Infof("Consumer key :%s , val :%s", key, val)
logc.Infof(ctx, "Consumer key :%s, val :%s", key, val)
return nil
}
Consume
为go queue内置接口
因为消费者可能有多个,在 mq
文件夹下新建一个文件mqs.go
用来监听多个消费者,mqs.go
代码如下:
package mqs
import (
"beyond/user/api/internal/config"
"beyond/user/api/internal/svc"
"context"
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/service"
)
func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
// 监听消费者状态变化
return []service.Service{
//创建消息队列
kq.MustNewQueue(c.KqConsumerConf, NewLogsConsumer(ctx, svcCtx)),
}
}
在 main.go 中启动 consumers 等待消费
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
server := rest.MustNewServer(c.RestConf)
defer server.Stop()
svcCtx := svc.NewServiceContext(c)
handler.RegisterHandlers(server, svcCtx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
// 因为现在添加了mq,属于多服务状态,所以需要启动mq服务
//server.Start()
//创建新的服务组
serviceGroup := service.NewServiceGroup()
defer serviceGroup.Stop()
// 从mq中获取消费者服务,并添加到服务组中
for _, mq := range mqs.Consumers(c, context.Background(), svcCtx) {
serviceGroup.Add(mq)
}
//添加原来的server服务
serviceGroup.Add(server)
// 启动服务组
serviceGroup.Start()
}
5.启动项目
我们这里就是简单的输出日志,你也可以拓展成发送邮件或者短信给用户,提示用户注册成功。