文章目录
- Apache kafka简介
- 开始使用Apache Kafka
- 构建生产者
- 构建消费者
- 总结
之前已经有两篇文章介绍过
Go
如何操作
kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)
Apache kafka简介
Apache Kafka
是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。
Kafka的用例和能力
-
流数据管道:
Kafka
提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。 -
实时分析:
Kafka
允许使用工具如Kafka Streams
和KSQL
处理实时数据流,用于构建流式分析和数据处理应用程序。 -
数据集成 :
Kafka
可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL
非常有用。 -
事件源 :
Kafka
提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS
模式。 -
日志聚合 :
Kafka
通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。
为什么将Golang与Apache Kafka结合使用
将Golang
这一高效并发的编程语言与Apache Kafka
这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:
-
性能 :
Golang
和Apache Kafka
都提供高性能。Golang
快速、高效和轻量级。Kafka
为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。 -
可扩展性 :
Golang
的goroutines
和Kafka
的分区允许应用程序水平扩展以处理大量数据。Kafka
可以轻松扩展生产者和消费者。 -
并发性 :
Golang
通过goroutines
和channels
提供了出色的并发编程能力。Kafka
并发传递消息并支持并行性。 -
可用性 :
Kafka
的分布式架构使其高度可用和容错。Golang
应用可以利用这一点来构建弹性系统。 -
互操作性 :
Kafka
有多种语言的客户端,允许Golang
应用与多语言环境互动。Kafka
还使用二进制TCP
协议以提高效率。 -
现代设计 :
Kafka
和Golang
都采用现代设计理念,使它们非常适合云原生和微服务架构。 -
开发人员体验 :
Kafka
的客户端库结合Goroutines、channels
和接口,使其易于使用。
Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。
开始使用Apache Kafka
在开始使用Golang
和Apache Kafka
之前,我们必须确保golang
和Kafka
已经安装并在我们的机器上运行。
安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
Apache Kafka的Golang包
您可以使用go get
安装confluent-kafka-go
包:
go get -u github.com/confluentinc/confluent-kafka-go/kafka
安装后,您可以在Go
代码中导入并使用confluent-kafka-go
。
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
fmt.Printf("创建生产者失败: %s\n", err)
return
}
// 生产消息到主题,处理交付报告等。
// 使用后记得关闭生产者
defer p.Close()
}
构建生产者
Kafka
生产者是Apache Kafka
生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka
集群发布(写入)事件。这一部分提供了关于Kafka
生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。
下面是一个Golang
应用程序的示例,它生产数据并将其发布到Kafka
的具体topic
。它还说明了如何在Golang
中为Kafka
消息序列化数据,并演示了如何处理错误和重试。
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
const (
kafkaBroker = "localhost:9092"
topic = "test-topic"
)
type Message
struct {
Key string `json:"key"`
Value string `json:"value"`
}
func main() {
// 创建一个新的Kafka生产者
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})
if err != nil {
fmt.Printf("创建生产者失败: %s\n", err)
return
}
defer p.Close()
// 定义要发送的消息
message := Message{
Key: "example_key",
Value: "Hello, Kafka!",
}
// 序列化消息
serializedMessage, err := serializeMessage(message)
if err != nil {
fmt.Printf("消息序列化失败: %s\n", err)
return
}
// 将消息生产到Kafka主题
err = produceMessage(p, topic, serializedMessage)
if err != nil {
fmt.Printf("消息生产失败: %s\n", err)
return
}
fmt.Println("消息成功生产!")
}
func serializeMessage(message Message) ([]byte, error) {
// 将消息结构体序列化为JSON
serialized, err := json.Marshal(message)
if err != nil {
return nil, fmt.Errorf("消息序列化失败: %w", err)
}
return serialized, nil
}
func produceMessage(p *kafka.Producer, topic string, message []byte) error {
// 创建一个新的要生产的Kafka消息
kafkaMessage := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: message,
}
// 生产Kafka消息
deliveryChan := make(chan kafka.Event)
err := p.Produce(kafkaMessage, deliveryChan)
if err != nil {
return fmt.Errorf("消息生产失败: %w", err)
}
// 等待交付报告或错误
e := <-deliveryChan
m := e.(*kafka.Message)
// 检查交付错误,即生成者方确保发送到Broker的消息不丢失
// 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error
// 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理
if m.TopicPartition.Error != nil {
return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)
}
// 关闭交付频道
close(deliveryChan)
return nil
}
步骤解释:
-
创建一个
Kafka
生产者。 -
使用
json.Marshal
函数将自定义消息结构体(Message
)序列化为JSON
。 -
使用生产者将序列化的消息生产到
Kafka topic
。 -
使用交付报告和错误检查处理错误和重试。
确保将localhost:9092
替换为您的Kafka
代理地址,将test-topic
替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑
。
构建消费者
Kafka
消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置。
下面是一个Golang
应用程序的示例,它从Kafka
主题消费消息。如下代码包括了如何处理消费到的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。
package main
import (
"fmt"
"os"
"os/signal"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
const (
kafkaBroker = "localhost:9092"
topic = "test-topic"
groupID = "test-group"
)
func main() {
// 创建一个新的Kafka消费者
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": kafkaBroker,
"group.id": groupID, // 消费者组标识
"auto.offset.reset": "earliest", // 从头开始消费
})
if err != nil {
fmt.Printf("创建消费者失败: %s\n", err)
return
}
defer c.Close()
// 订阅Kafka主题
err = c.SubscribeTopics([]string{topic}, nil)
if err != nil {
fmt.Printf("订阅主题失败: %s\n", err)
return
}
// 设置一个通道来处理操作系统信号,以便优雅地关闭
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
// 开始消费消息
run := true
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("接收到信号 %v: 正在终止\n", sig)
run = false
default:
// 轮询Kafka消息,1次最多拉取100条消息
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
// 处理消费的消息
fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))
case kafka.Error:
// 处理Kafka错误
fmt.Printf("错误: %v\n", e)
}
}
}
}
步骤解释:
-
创建一个
Kafka
消费者。 -
订阅一个
Kafka
主题。 -
设置一个通道来处理操作系统信号(如
SIGINT
)以优雅地关闭。 -
开始从订阅的
Topic
消费消息。 -
处理消费的消息以及
Kafka
错误。
不同的消费模式:
-
单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自
Topic
的所有消息时,这很有用。 -
消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。
在提供的示例中,group.id
配置设置用于指定消费者组ID
。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka Topic
消费消息。
总结
总之,Apache Kafka
作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang
结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka
的功能和Golang
的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,Kafka
和Golang
提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的解决方案。