142. Go操作Kafka(confluent-kafka-go库)

文章目录

  • Apache kafka简介
  • 开始使用Apache Kafka
    • 构建生产者
    • 构建消费者
  • 总结

之前已经有两篇文章介绍过 Go如何操作 kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)

Apache kafka简介

在这里插入图片描述
Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。

Kafka的用例和能力

  • 流数据管道: Kafka提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。

  • 实时分析:Kafka允许使用工具如Kafka StreamsKSQL处理实时数据流,用于构建流式分析和数据处理应用程序。

  • 数据集成 :Kafka可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL非常有用。

  • 事件源 : Kafka提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS模式。

  • 日志聚合 : Kafka通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。

为什么将Golang与Apache Kafka结合使用

Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:

  • 性能 : GolangApache Kafka都提供高性能。Golang快速、高效和轻量级。Kafka为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。

  • 可扩展性 : GolanggoroutinesKafka的分区允许应用程序水平扩展以处理大量数据。Kafka可以轻松扩展生产者和消费者。

  • 并发性 : Golang通过goroutineschannels提供了出色的并发编程能力。Kafka并发传递消息并支持并行性。

  • 可用性 : Kafka的分布式架构使其高度可用和容错。Golang应用可以利用这一点来构建弹性系统。

  • 互操作性 : Kafka有多种语言的客户端,允许Golang应用与多语言环境互动。Kafka还使用二进制TCP协议以提高效率。

  • 现代设计 : KafkaGolang都采用现代设计理念,使它们非常适合云原生和微服务架构。

  • 开发人员体验 : Kafka的客户端库结合Goroutines、channels和接口,使其易于使用。

Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。

开始使用Apache Kafka

在开始使用GolangApache Kafka之前,我们必须确保golangKafka已经安装并在我们的机器上运行。

安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)

Apache Kafka的Golang包
您可以使用go get安装confluent-kafka-go包:

go get -u github.com/confluentinc/confluent-kafka-go/kafka

安装后,您可以在Go代码中导入并使用confluent-kafka-go

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        fmt.Printf("创建生产者失败: %s\n", err)
        return
    }

    // 生产消息到主题,处理交付报告等。

    // 使用后记得关闭生产者
    defer p.Close()
}

构建生产者

Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。

下面是一个Golang应用程序的示例,它生产数据并将其发布到Kafka的具体topic。它还说明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

const (
    kafkaBroker = "localhost:9092"
    topic       = "test-topic"
)

type Message

 struct {
    Key   string `json:"key"`
    Value string `json:"value"`
}

func main() {
    // 创建一个新的Kafka生产者
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})
    if err != nil {
        fmt.Printf("创建生产者失败: %s\n", err)
        return
    }
    defer p.Close()

    // 定义要发送的消息
    message := Message{
        Key:   "example_key",
        Value: "Hello, Kafka!",
    }

    // 序列化消息
    serializedMessage, err := serializeMessage(message)
    if err != nil {
        fmt.Printf("消息序列化失败: %s\n", err)
        return
    }

    // 将消息生产到Kafka主题
    err = produceMessage(p, topic, serializedMessage)
    if err != nil {
        fmt.Printf("消息生产失败: %s\n", err)
        return
    }

    fmt.Println("消息成功生产!")
}

func serializeMessage(message Message) ([]byte, error) {
    // 将消息结构体序列化为JSON
    serialized, err := json.Marshal(message)
    if err != nil {
        return nil, fmt.Errorf("消息序列化失败: %w", err)
    }
    return serialized, nil
}

func produceMessage(p *kafka.Producer, topic string, message []byte) error {
    // 创建一个新的要生产的Kafka消息
    kafkaMessage := &kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          message,
    }

    // 生产Kafka消息
    deliveryChan := make(chan kafka.Event)
    err := p.Produce(kafkaMessage, deliveryChan)
    if err != nil {
        return fmt.Errorf("消息生产失败: %w", err)
    }

    // 等待交付报告或错误
    e := <-deliveryChan
    m := e.(*kafka.Message)

    // 检查交付错误,即生成者方确保发送到Broker的消息不丢失
    // 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error
    // 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理
    if m.TopicPartition.Error != nil {
        return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)
    }

    // 关闭交付频道
    close(deliveryChan)

    return nil
}

步骤解释:

  1. 创建一个Kafka生产者。

  2. 使用json.Marshal函数将自定义消息结构体(Message)序列化为JSON

  3. 使用生产者将序列化的消息生产到Kafka topic

  4. 使用交付报告和错误检查处理错误和重试。

确保将localhost:9092替换为您的Kafka代理地址,将test-topic替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑

构建消费者

Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置。

下面是一个Golang应用程序的示例,它从Kafka主题消费消息。如下代码包括了如何处理消费到的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。

package main

import (
    "fmt"
    "os"
    "os/signal"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

const (
    kafkaBroker = "localhost:9092"
    topic       = "test-topic"
    groupID     = "test-group"
)

func main() {
    // 创建一个新的Kafka消费者
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  kafkaBroker,
        "group.id":           groupID, // 消费者组标识
        "auto.offset.reset":  "earliest", // 从头开始消费
    })
    if err != nil {
        fmt.Printf("创建消费者失败: %s\n", err)
        return
    }
    defer c.Close()

    // 订阅Kafka主题
    err = c.SubscribeTopics([]string{topic}, nil)
    if err != nil {
        fmt.Printf("订阅主题失败: %s\n", err)
        return
    }

    // 设置一个通道来处理操作系统信号,以便优雅地关闭
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, os.Interrupt)

    // 开始消费消息
    run := true
    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("接收到信号 %v: 正在终止\n", sig)
            run = false
        default:
            // 轮询Kafka消息,1次最多拉取100条消息
            ev := c.Poll(100) 
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:
                // 处理消费的消息
                fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))
            case kafka.Error:
                // 处理Kafka错误
                fmt.Printf("错误: %v\n", e)
            }
        }
    }
}

步骤解释

  1. 创建一个Kafka消费者。

  2. 订阅一个Kafka主题。

  3. 设置一个通道来处理操作系统信号(如SIGINT)以优雅地关闭。

  4. 开始从订阅的Topic消费消息。

  5. 处理消费的消息以及Kafka错误。

不同的消费模式:

  • 单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自Topic的所有消息时,这很有用。

  • 消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。

在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka Topic消费消息。

总结

总之,Apache Kafka作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,KafkaGolang提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/872189.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vscode +STM32 VS CODE EXTENSION

stm32 vs code extersion 1.0.0版本可以直接导入cubeide的工程&#xff0c;之后版本不可以&#xff0c;所以为了省事&#xff0c;使用stm32 vs code extersion 1.0.0插件。 安装完stm32 vs code extersion插件&#xff0c;会默认把相关插件一起安装。但是需要手动安装Ninja&am…

借老系统重构机会我写了个groovy规则引擎

公司老系统的重构计划早就有了&#xff0c;为了对Java硬编码的各种校验规则进行重构&#xff0c;特地参考了相关技术&#xff0c;最终选择了groovy进行了系统的学习&#xff0c;并编写了一个即插即用的轻量级规则引擎。 文章目录 项目背景技术选型groovy的性能groovy脚本执行线…

AI科学家:自动化科研的未来之路

随着人工智能&#xff08;AI&#xff09;技术的不断进步&#xff0c;AI已经在众多领域中展现了强大的潜力&#xff0c;尤其是在科研方面的应用正在引起广泛关注。最近&#xff0c;Sakana AI与牛津大学和不列颠哥伦比亚大学联合推出了一款被称为“AI科学家”的自动化科研工具&am…

提升LLM结果:何时使用知识图谱RAG

通过知识图谱增强 RAG 可以帮助检索&#xff0c;使系统能够更深入地挖掘数据集以提供详细的响应。 有时&#xff0c;检索增强生成 (RAG) 系统无法深入文档集以找到所需的答案。我们可能会得到泛泛的或肤浅的回复&#xff0c;或者我们可能会得到回复&#xff0c;其中 RAG 系统检…

IA——网络操作设备VRP简介

一&#xff0c;VRP简介 二&#xff0c;网络设备的管理 &#xff08;1&#xff09;console口&#xff1a; &#xff08;2&#xff09;talnet: &#xff08;3&#xff09;SSH: 安全的远程登陆 &#xff08;4&#xff09;通过WEB页面登录&#xff1a; 三&#xff0c;命令行常见…

周星驰《大话大话西游》开机,“悟空热”席卷短剧市场?

继《黑神话&#xff1a;悟空》掀起文娱行业的“悟空热”之后&#xff0c;对市场变化最为敏感的短剧行业也赶上了这波热潮。 9月2日&#xff0c;由丽泽影业承制、周星驰九五二七剧场的第二部短剧《大话大话西游》正式开机。 从剧名就不难看出&#xff0c;《大话大话西游》是周星…

MySQL数据库安装(详细)—>Mariadb的安装(day21)

该网盘链接有效期为7天&#xff0c;有需要评论区扣我&#xff1a; 通过网盘分享的文件&#xff1a;mariadb-10.3.7-winx64.msi 链接: https://pan.baidu.com/s/1-r_w3NuP8amhIEedmTkWsQ?pwd2ua7 提取码: 2ua7 1 双击打开安装软件 本次安装的是mariaDB&#xff0c;双击打开mar…

OPC DA

默认端口号: TCP 135 参考https://wenku.baidu.com/view/8f2b18a229f90242a8956bec0975f46527d3a7e4.html?_wkts_1725526157944&bdQuery%E6%9F%A5%E7%9C%8B%E8%A5%BF%E9%97%A8%E5%AD%90opcDA%E7%AB%AF%E5%8F%A3%E5%8F%B7 OPC DA ,OPC UA简介https://www.cnblogs.com/mi…

Vue3 + Ts + Vite项目 websoket封装使用

文章目录 一、安装二、封装三、请求地址配置3.1 将接口地址放到 public3.2 引入 ipconfig.js 文件3.3 全局类型声明 四、页面使用4.1 引用4.2 注册 五、说明 一、安装 npm npm install websocket --save-devpnpm pnpm install websocket --save-dev二、封装 在 /src/utils …

类的加载过程与初始化小记

//部分内容来自“狂神说java” 代码验证 解释 1.加载类的信息&#xff0c;加载到内存中&#xff0c;如例子&#xff0c;将Test05和A类的信息加载到方法区&#xff0c; 2.加载完成后&#xff0c;立马生成一个class对象&#xff0c;如例 java.lang.class对象代表Test05类..., 3…

软件测试-Selenium+python自动化测试

目录 会用到谷歌浏览器Chrome测试,需要下载一个Chromedriver(Chrome for Testing availability)对应自己的浏览器版本号选择。 一、元素定位 对html网页中的元素进行定位,同时进行部分操作。 1.1一个简单的模板 from selenium import webdriver from selenium.webdrive…

Elastic Stack-ES集群常用的API

前言&#xff1a;本博客仅作记录学习使用&#xff0c;部分图片出自网络&#xff0c;如有侵犯您的权益&#xff0c;请联系删除 学习B站博主教程笔记&#xff1a; 最新版适合自学的ElasticStack全套视频&#xff08;Elk零基础入门到精通教程&#xff09;Linux运维必备—Elastic…

Origin2024中如何添加误差带?直观查看数据的变化范围

误差线是通常用于统计或科学绘图中&#xff0c;本期给大家分享Origin中绘制带填充区的误差带图&#xff0c;可以直观显示数据的变化范围&#xff0c;填充区域也可以增加视觉效果和美观性 操作步骤&#xff1a; 1、打开Origin2024软件&#xff0c;然后在Book1中输入如下示例数…

街机 SNK NeoGeo 中英文名字与驱动对照表

Part.I 简介 本文列举了街机 NeoGeo 中游戏的中英文名字与其驱动的对照&#xff0c;以帮助诸位更快地找到自己想玩的游戏。 注意&#xff1a;汉化版的街机模拟器 Kawaks 中游戏的中文名字是根据英文直译的&#xff0c;并不是习惯性的中文叫法。比如『三国志』英文名为『Warrio…

深度学习5从0到1理解RNN(包括LTSM,GRU等):内容丰富(上)

循环神经网络&#xff08;Recurrent Neural Network, RNN&#xff09; 是一种经典的深度学习网络结构&#xff0c;具有广泛的应用。其中&#xff0c;槽填充&#xff08;Slot Filling&#xff09;&#xff08;即识别自然语言中的特定信息&#xff09; 是其中一个应用场景&#x…

OpenSCAD 基础教程

OpenSCAD 基础教程 文章目录 OpenSCAD 基础教程1. 引言2. 安装与设置3. OpenSCAD 基本概念与语法3.1 基础形状3.2 变换操作3.4 布尔运算3.4 控制流3.5 特殊功能 4. 实践案例&#xff1a;创建一个简单的机械部件5. 高级技巧6. 导出与3D打印7. 常见问题与解决方案8. 结语 1. 引言…

虚拟机的安装-详细教程

目录 新建虚拟机 选择典型 安装操作系统 选择CentOS7 64位版本 虚拟机存放位置 磁盘容量 完成 编辑虚拟机 修改内存大小 设置处理器个数 选择镜像 开启虚拟机 进入界面&#xff0c;回车 选择语言 安装类型 磁盘分区 开启网络 设置密码和用户 重启 接受许可…

python进阶篇-day07-进程与线程

day06进程与线程 一. 进程 每个软件都可以看作是一个进程(数据隔离) 软件内的多个任务可以看作是多个线程(数据共享) 单核CPU: 宏观并行, 微观并发 真正的并行必须有多核CPU 多任务介绍 概述 多任务指的是, 多个任务"同时"执行 目的 节约资源, 充分利用CPU资源, …

unreal engine 5.4.4 runtime 使用PCG

Unreal PCG Runtime runtime环境下控制PCG PCG Graph 这里简单的在landscape上Spawn Static Mesh 和 Spawn Actor GraphSetting 自定义的参数&#xff0c;方便修改 场景 这里新建了一个蓝图Actor PCG_Ctrl, 用来runtime的时候控制PCG生成 Construct 获取场景中的PCGVolum…

开源还是封闭?人工智能的两难选择

这篇文章于 2024 年 7 月 29 日首次出现在 The New Stack 上。人工智能正处于软件行业的完美风暴中&#xff0c;现在马克扎克伯格 &#xff08;Mark Zuckerberg&#xff09; 正在呼吁开源 AI。 关于如何控制 AI 的三个强大观点正在发生碰撞&#xff1a; 1 . 所有 AI 都应该是开…