使用Apache Kafka的Golang实践指南

您是否在寻找构建可扩展、高性能应用程序的方法,这些应用程序可以实时处理流数据?如果是的话,结合使用Apache Kafka和Golang是一个很好的选择。Golang的轻量级线程非常适合编写类似Kafka生产者和消费者的并发网络应用程序。它的内置并发原语,如goroutines和channels,与Kafka的异步消息传递非常匹配。Golang还有一些出色的Kafka客户端库,如Sarama,它们为使用Kafka提供了惯用的API。

在这里插入图片描述

借助Kafka处理分布式消息传递和存储,以及Golang提供的并发和速度,您将获得构建响应式系统的强大技术栈。使用Kafka的发布/订阅语义和Golang的流畅并发,轻松高效地处理永无止境的数据流变得非常简单。通过将这两种技术结合起来,您可以快速构建下一代云原生世界的实时应用程序。所以,今天就开始用Golang和Kafka构建您的流处理管道吧!

Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它最初由LinkedIn开发,后在2011年成为Apache开源项目。

Kafka的用例和能力
  • 流数据管道 - Kafka提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。
  • 实时分析 - Kafka允许使用工具如Kafka Streams和KSQL处理实时数据流,用于构建流式分析和数据处理应用程序。
  • 数据集成 - Kafka可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL非常有用。
  • 事件源 - Kafka提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS模式。
  • 日志聚合 - Kafka通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。

凭借其分布式、可扩展和容错的架构,Kafka是构建大规模实时数据管道和流应用程序的受欢迎选择,被全球数千家公司使用。

总结

Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。将Golang与Apache Kafka结合提供了一个强大的技术栈,用于构建现代应用程序,这得益于它们的性能、可扩展性、并发性、可用性、互操作性、现代设计和开发人员体验。开始使用Kafka和Golang涉及安装Golang,设置Kafka,并使用confluent-kafka-go包构建生产者和消费者。

为什么将Golang与Apache Kafka结合使用

将Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:

  • 性能 - Golang和Apache Kafka都提供高性能。Golang快速、高效和轻量级。Kafka为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。

  • 可扩展性 - Golang的goroutines和Kafka的分区允许应用程序水平扩展以处理大量数据。Kafka轻松处理扩展生产者和消费者。

  • 并发性 - Golang通过goroutines和channels提供了出色的并发编程能力。Kafka并发传递消息并支持并行性。

  • 可用性 - Kafka的分布式架构使其高度可用和容错。Golang应用可以利用这一点来构建弹性系统。

  • 互操作性 - Kafka有多种语言的客户端,允许Golang应用与多语言环境互动。Kafka还使用二进制TCP协议以提高效率。

  • 现代设计 - Kafka和Golang都采用现代设计理念,使它们非常适合云原生和微服务架构。

  • 开发人员体验 - Kafka的客户端库结合Goroutines、channels和接口,使其易于使用。

Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。

开始使用Apache Kafka

在开始使用Golang和Apache Kafka之前,我们必须确保golang已经安装并在我们的机器上运行。如果没有,请查看以下教程来设置golang。

安装Kafka

另一个重要的事情是在我们的本地实例上安装Kafka,对此我发现了官方指南来开始使用Apache Kafka。

您也可以跟随YouTube教程在Windows机器上安装apache kafka。

Apache Kafka的Golang包

您可以使用go get安装confluent-kafka-go包:

go get -u github.com/confluentinc/confluent-kafka-go/kafka

安装后,您可以在Go代码中导入并使用confluent-kafka-go。

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        fmt.Printf("创建生产者失败: %s\n", err)
        return
    }

    // 生产消息到主题,处理交付报告等。

    // 使用后记得关闭生产者
    defer p.Close()
}

构建生产者

Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。

下面是一个Golang应用程序的示例,它生产数据并将其发布到Kafka主题。它还说明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

const (
    kafkaBroker = "localhost:9092"
    topic       = "test-topic"
)

type Message

 struct {
    Key   string `json:"key"`
    Value string `json:"value"`
}

func main() {
    // 创建一个新的Kafka生产者
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})
    if err != nil {
        fmt.Printf("创建生产者失败: %s\n", err)
        return
    }
    defer p.Close()

    // 定义要发送的消息
    message := Message{
        Key:   "example_key",
        Value: "Hello, Kafka!",
    }

    // 序列化消息
    serializedMessage, err := serializeMessage(message)
    if err != nil {
        fmt.Printf("消息序列化失败: %s\n", err)
        return
    }

    // 将消息生产到Kafka主题
    err = produceMessage(p, topic, serializedMessage)
    if err != nil {
        fmt.Printf("消息生产失败: %s\n", err)
        return
    }

    fmt.Println("消息成功生产!")
}

func serializeMessage(message Message) ([]byte, error) {
    // 将消息结构体序列化为JSON
    serialized, err := json.Marshal(message)
    if err != nil {
        return nil, fmt.Errorf("消息序列化失败: %w", err)
    }
    return serialized, nil
}

func produceMessage(p *kafka.Producer, topic string, message []byte) error {
    // 创建一个新的要生产的Kafka消息
    kafkaMessage := &kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          message,
    }

    // 生产Kafka消息
    deliveryChan := make(chan kafka.Event)
    err := p.Produce(kafkaMessage, deliveryChan)
    if err != nil {
        return fmt.Errorf("消息生产失败: %w", err)
    }

    // 等待交付报告或错误
    e := <-deliveryChan
    m := e.(*kafka.Message)

    // 检查交付错误
    if m.TopicPartition.Error != nil {
        return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)
    }

    // 关闭交付频道
    close(deliveryChan)

    return nil
}

这个示例演示了如何:

  1. 创建一个Kafka生产者。
  2. 使用json.Marshal函数将自定义消息结构体(Message)序列化为JSON。
  3. 使用生产者将序列化的消息生产到Kafka主题。
  4. 使用交付报告和错误检查处理错误和重试。

确保将localhost:9092替换为您的Kafka代理地址,将test-topic替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑。

构建消费者

Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置旋钮。准备好提升构建可扩展数据驱动应用程序的技能了吗!

下面是一个Golang应用程序的示例,它从Kafka主题消费消息。它包括了如何处理和处理消费的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。

package main

import (
    "fmt"
    "os"
    "os/signal"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

const (
    kafkaBroker = "localhost:9092"
    topic       = "test-topic"
    groupID     = "test-group"
)

func main() {
    // 创建一个新的Kafka消费者
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  kafkaBroker,
        "group.id":           groupID,
        "auto.offset.reset":  "earliest",
    })
    if err != nil {
        fmt.Printf("创建消费者失败: %s\n", err)
        return
    }
    defer c.Close()

    // 订阅Kafka主题
    err = c.SubscribeTopics([]string{topic}, nil)
    if err != nil {
        fmt.Printf("订阅主题失败: %s\n", err)
        return
    }

    // 设置一个通道来处理操作系统信号,以便优雅地关闭
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, os.Interrupt)

    // 开始消费消息
    run := true
    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("接收到信号 %v: 正在终止\n", sig)
            run = false
        default:
            // 轮询Kafka消息
            ev := c.Poll(100)
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:
                // 处理消费的消息
                fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))
            case kafka.Error:
                // 处理Kafka错误
                fmt.Printf("错误: %v\n", e)
            }
        }
    }
}

这个示例演示了如何:

  1. 创建一个Kafka消费者。
  2. 订阅一个Kafka主题。
  3. 设置一个通道来处理操作系统信号(如SIGINT)以优雅地关闭。
  4. 开始从订阅的主题消费消息。
  5. 处理和处理消费的消息以及Kafka错误。

不同的消费模式:

  • 单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自主题的所有消息时,这很有用。
  • 消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。

在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka主题消费消息。

结论:

总之,Apache Kafka作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,Kafka和Golang提供了一个赢得组合,使开发人员能够轻松构建创新和可扩展的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/440753.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

动态规划DP之背包问题3---多重背包问题

目录 DP分析&#xff1a; 优化&#xff1a; 二进制优化 例题&#xff1a; 01背包是每个物品只有一个&#xff0c;完全背包问题是每个物品有无限个。 那么多重背包问题就是 每个物品有有限个。 有 N 种物品和一个容量是 V 的背包。 第 i 种物品最多有 si 件&#xff0c;每件体…

Mysql学习笔记之事务详解(读未提交、读以提交、可重复读、串行化读)

在这个博主的基础上&#xff0c;增加两种情况的对比&#xff1a;https://blog.csdn.net/llllllkkkkkooooo/article/details/108068919 可重复读中幻读现象&#xff08;未使用MVCC&#xff09; 设置可重复读的隔离级别 set global transaction isolation level repeatable read…

代码随想录算法训练营第day40|343. 整数拆分 、 96.不同的二叉搜索树

a.343. 整数拆分 题目链接 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 示例 1: 输入: n 2 输出: 1 解释: 2 1 1, 1 1 1。 示例 2: 输入: …

掼蛋的牌型与规律(上篇)

掼蛋是一项配合类的棋牌竞技游戏&#xff0c;掼蛋的最大魅力以及最集中的特点在于变化&#xff0c;在于组牌的变数。有的掼蛋新手往往先把牌配死&#xff0c;并且直接决定好出牌计划&#xff0c;然后守株待兔。掼蛋的取胜之道在于静态组合加上动态变化。本文主要介绍一下掼蛋的…

python基础篇--学习记录2

1.深浅拷贝 l1 ["张大仙","徐凤年",["李淳刚","邓太阿"]] # 变量名对应的就是内存地址,这里就是将l1的内存地址给了l2 # 现在两个变量指向同一个内存地址,l1变化l2也会变化 l2 l1 现在的需求是l2是l1的拷贝版本,但是两者是完全分割…

七彩虹@电脑cpu频率上不去问题@控制中心性能模式cpu频率上不去@代理服务器超时@账户同步设置失败

文章目录 windows电脑cpu频率上不去新电脑的系统时间问题系统时间不准造成的具体问题举例代理超时vscode同步请求失败自动校准时间 windows电脑cpu频率上不去 问题描述,标压处理器的笔记本,cpu频率上不去 如果cpu没问题的话,就应该是系统限制了功耗导致的有的笔记本有控制中心…

本鲸:打造科技招商新引擎、实现政企资源高效对接

在当今这个快速变化的时代&#xff0c;科技创新已成为推动社会进步和经济发展的核心动力。本鲸&#xff0c;作为科技创新创业服务的平台&#xff0c;正以其独特的视角和专业服务&#xff0c;为政府和企业提供一站式科技招商解决方案&#xff0c;助力构建创新驱动的经济发展新模…

b站小土堆pytorch学习记录—— P27-P29 完整的模型训练套路

文章目录 一、定义模型&#xff08;放在model.py文件中&#xff09;二、训练三、测试四、完整的训练和测试代码 一、定义模型&#xff08;放在model.py文件中&#xff09; import torch from torch import nnclass Guodong(nn.Module):def __init__(self):super(Guodong,self)…

在vue2中使用tailwindcss(完整教程)

如果你看过好多教程之后&#xff0c;还是报错&#xff0c;无法使用tailwindcss&#xff0c;我希望本教程可以让你成功上岸。 环境要求 node&#xff1a;>v14.17.0 安装tailwindcss 由于最新的tailwind css使用post css 8版本&#xff0c;vue2框架暂时还不支持&#xff0…

Springboot + Vue用户管理系统

Springboot Vue用户管理系统 主要实现了管理员的登录&#xff0c;用户管理&#xff0c;用户的增删改查等操作&#xff0c; 技术实现&#xff0c;前端采用Vue 后端采用Springboot ,前后端分离系统&#xff0c;数据库使用mysql 还用到了redis,mybatis-plus。。。。。。。。。…

10大主流压力/负载/性能测试工具推荐

在移动应用和Web服务正式发布之前&#xff0c;除了进行必要的功能测试和安全测试&#xff0c;为了保证互联网产品的服务交付质量&#xff0c;往往还需要做压力/负载/性能测试。然而很多传统企业在试水互联网的过程中&#xff0c;往往由于资源或产品迭代速度等原因忽视了这一块工…

【Python】科研代码学习:三 PreTrainedModel, PretrainedConfig, PreTrainedTokenizer

【Python】科研代码学习&#xff1a;三 PreTrainedModel, PretrainedConfig, PreTrainedTokenizer 前言Models : PreTrainedModelPreTrainedModel 中重要的方法 tensorflow & pytorch 简单对比Configuration : PretrainedConfigPretrainedConfig 中重要的方法 Tokenizer : …

influxdb2.0插入数据字段类型出现冲突问题解决

一、问题出现 一个学校换热站自控系统&#xff0c;会定时从换热站获取测点数据&#xff0c;并插入到influxdb数据库中。influxdb插入数据时&#xff0c;报错提示&#xff1a; com.influxdb.exceptions.UnprocessableEntityException: failure writing points to database: par…

组合逻辑电路(二)(译码器和编码器)

目录 译码器 简单逻辑门译码器 二进制译码器 2线-4线译码器 3线-8线译码器 二-十进制译码器 4线-10线译码器 七段显示译码器 编码器 二进制普通编码器 二-十进制普通编码器&#xff08;8421BCD码编码器&#xff09; 优先编码器&#xff08;Priority Encoder&#xff09; 译…

《解密云计算:企业之选》

前言 在当今数字化时代&#xff0c;企业面临着巨大的数据处理压力和信息化需求&#xff0c;传统的IT架构已经无法满足日益增长的业务需求。在这样的背景下&#xff0c;越来越多的企业开始转向云计算&#xff0c;以实现灵活、高效和可扩展的IT资源管理和利用。 云计算 云计算是…

【QT中如何生成导出.exe可执行文件并打包给其他人使用】

1、将QT的部署设置改成Release编译模式。 2、运行项目生成release文件夹&#xff0c;其中包含.exe文件。 3、新建空文件夹&#xff0c;将release文件夹中的.exe文件复制到里面去。&#xff08;此处新建了hellofile空文件夹来存放hello.exe文件&#xff09; 4、在QT终端里&#…

SpringBoot学习之自定义注解和AOP 切面统一保存操作日志(二十九)

一、定义一个注解 这个注解是用来控制是否需要保存操作日志的自定义注解(这个类似标记或者开关) package com.xu.demo.common.anotation;import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; i…

Filter过滤器+JWT令牌实现登陆验证

一、背景 我们需要在客户端访问服务器的时候给定用户一定的操作权限&#xff0c;比如没有登陆时就不能进行其他操作。如果他需要进行其他操作&#xff0c;而在这之前他没有登陆过&#xff0c;服务端则需要将该请求拦截下来&#xff0c;这就需要用到过滤器&#xff0c;过滤器可以…

【YOLO v5 v7 v8 v9小目标改进】AFPN 渐进式特征金字塔网络:解决多尺度特征融合中,信息在传递过程丢失

AFPN 渐进式特征金字塔网络&#xff1a;解决多尺度特征融合中&#xff0c;信息在传递过程丢失 提出背景AFPN 多尺度特征金字塔 非邻近层次的直接特征融合 自适应空间融合操作 小目标涨点YOLO v5 魔改YOLO v7 魔改YOLO v8 魔改YOLO v9 魔改 提出背景 论文&#xff1a;https:…

复试人工智能前沿概念总结

1.大模型相关概念&#xff08;了解即可&#xff09; 1.1 GPT GPT&#xff0c;全称为Generative Pre-training Transformer&#xff0c;是OpenAI开发的一种基于Transformer的大规模自然语言生成模型。GPT模型采用了自监督学习的方式&#xff0c;首先在大量的无标签文本数据上进…