golang—kafka架构原理快速入门以及自测环境搭建(docker单节点部署)

kafka

Apache Kafka 是一个分布式的流处理平台。它具有以下特点:

  • 支持消息的发布和订阅,类似于 RabbtMQ、ActiveMQ 等消息队列
  • 支持数据实时处理
  • 能保证消息的可靠性投递
  • 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错
  • 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量

架构简介

在这里插入图片描述

Messages and Batches

kafka基本数据单元为消息,为了提高网络使用效率,采用批写入方式

Topics and Partitions

topic为kafka消费主题,每个主题下有若干分区(partitions),Kafka 通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上。由于多个partition的特性,kafka无法保证topic范围内的消息顺序,但是可以保证单个分区内消息的顺序

broker

broker 对应着一个 kafka 的进程;一个 kafka 集群会包含多个 broker;同时需要在这些 broker中选举出一个controller,选举是通过 zk 来实现;controller 负责协调管理集群状态,同时也负责 partition 的 leader 选举;

Producers And Consumers
  • 消息的生产者,负责将消息发送到不同的 partition 中;消息的生产需要考虑幂等性、正确性以及安全性;kafka 引入了 ack 机制;ack 为 0,则不需要 kafka 回复,此时可能造成数据丢失;ack为 1, 则需要等待 leader 回复,此时其他 replica 可能还没同步 leader 挂掉,数据安全性没法得到保证;ack 为 -1,则需要等待其他 replica 同步完成后,才回复,此时数据最健壮,但是效率最低;
  • 消息的消费者,负责消费消息;一个 partition 对应一个consumer, 而一个 consumer 可以对应多个 partition;消费同一类消息的高吞吐量,可以设置 consumer group;
副本同步策略

每个分区里有多个副本,这些副本有一个leader。只有副本全部同步完成才发送ack。这里指同步策略,是全量同步,而不是半数以上同步了就认为该数据已经commit。不过也可以设置最少同步副本数提高性能(min.insync.replicas)

ISR

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 producer 发送 ack。如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由 replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。

数据可见性

需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。
在这里插入图片描述

kafka读写机制

producer写流程

producer写入消息流程如下:

  • 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息

  • 连接到对应Leader对应的broker

  • 将消息按批次发送到partition的Leader上

  • 其他Follower从Leader上复制数据

  • 依次返回ACK

  • 直到所有ISR中的数据写完成,才完成提交,整个写过程结束
    在这里插入图片描述

consumer 读流程
  • 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息

  • 连接到对应Leader对应的broker

  • consumer将自己保存的offset发送给Leader

  • Leader根据offset等信息定位到segment(索引文件和日志文件)

  • 根据索引文件中的内容,定位到日志文件中该偏移量对应的开始位置读取相应长度的数据并返回给consumer

kafka集群选举

副本leader选举

只有完全追上Leader数据的follower才能进行选举,Leader发生故障之后,会从ISR中选出一个新的Leader

controller选举

这部分由ZK完成,不过高本版kafka引入kratf,就可以完成去ZK化了。 ratf是一种简单易理解并且严格复合数学归纳的共识算法。

自测环境搭建

zk

docker pull wurstmeister/zookeeper
docker run -itd --name zookeeper -p 2181:2181 wurstmeister/zookeeper

kafka

 docker pull wurstmeister/kafka
 docker run -itd --name kafka -p 9092:9092 -e HOST_IP=10.74.18.61 -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_HOST_NAME=10.74.18.61 --link zookeeper:zookeeper wurstmeister/kafka

go链接kafka生产消费

go版本:1.21
生产者

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

func main() {
	config := sarama.NewConfig()
	// 等待服务器所有副本都保存成功后的响应,对应ack=-1
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 是否等待成功和失败后的响应
	config.Producer.Return.Successes = true

	// 使用给定代理地址和配置创建一个同步生产者
	producer, err := sarama.NewSyncProducer([]string{"10.74.18.61:9092"}, config)
	if err != nil {
		panic(err)
	}

	defer producer.Close()

	//构建发送的消息,
	msg := &sarama.ProducerMessage{
		//Topic: "test",//包含了消息的主题
		Partition: int32(10),                   //
		Key:       sarama.StringEncoder("key"), //
	}

	var value string
	var msgType string
	for {
		_, err := fmt.Scanf("%s", &value)
		if err != nil {
			break
		}
		fmt.Scanf("%s", &msgType)
		fmt.Println("msgType = ", msgType, ",value = ", value)
		msg.Topic = msgType
		//将字符串转换为字节数组
		msg.Value = sarama.ByteEncoder(value)
		//fmt.Println(value)
		//SendMessage:该方法是生产者生产给定的消息
		//生产成功的时候返回该消息的分区和所在的偏移量
		//生产失败的时候返回error
		partition, offset, err := producer.SendMessage(msg)

		if err != nil {
			fmt.Println("Send message Fail", err)
		}
		fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
	}
}

消费者

package main

import (
	"fmt"
	"sync"

	"github.com/IBM/sarama"
)

var (
	wg sync.WaitGroup
)

func main() {
	// 根据给定的代理地址和配置创建一个消费者
	consumer, err := sarama.NewConsumer([]string{"10.74.18.61:9092"}, nil)
	if err != nil {
		panic(err)
	}
	//Partitions(topic):该方法返回了该topic的所有分区id
	partitionList, err := consumer.Partitions("test")
	if err != nil {
		panic(err)
	}

	for partition := range partitionList {
		//ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
		//如果该分区消费者已经消费了该信息将会返回error
		//sarama.OffsetNewest:表明了为最新消息
		pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
		if err != nil {
			panic(err)
		}
		defer pc.AsyncClose()
		wg.Add(1)
		go func(sarama.PartitionConsumer) {
			defer wg.Done()
			//Messages()该方法返回一个消费消息类型的只读通道,由代理产生
			for msg := range pc.Messages() {
				fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
		}(pc)
	}
	wg.Wait()
	consumer.Close()
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/202770.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SaaS模式C/S检验科LIS系统源码

适用于医院检验科实际需要的管理系统, 实现检验业务全流程的计算机管理。从检验申请、标本编号、联机采集、中文报告单的生成与打印、质控图的绘制和数据的检索与备份。通过将所有仪器自身提供的端口与科室LIS系统中的工作站点连接,实现与医院HIS系统的对接。 通过门诊医生和住…

TEMU和SHEIN平台的卖家了解测评吗?

最近越来越多的商家都入驻了temu和shein平台,为了让自己的产品能更具有优势,就会用到测评来做一些产品销量和评论,那么测评应该怎么做呢? 测评就是卖家通过测评平台,社区,红人等等这些联系国外的买家&…

el-select多选框,数据拼接

将多选框数据 按照逗号拼接为字符串 getTagIds(data, type) {if (type "array") {let array data.join(",")return array} else {let string data.split(",");return string}}, 在调用这个方法时需要,另外传一个字符串type,以此来…

面试篇之微服务(一)

目录 概览 1.什么是微服务? 2.微服务带来了哪些挑战? 3.现在有哪些流行的微服务解决方案? 这三种方案有什么区别吗? 4.说下微服务有哪些组件? 注册中心 5.注册中心是用来干什么的? 6.SpringCloud可…

Embassy 库下载代码示例

解决方案: swift import Embassy let downloader Downloader() // 使用代理主机和端口 downloader.useProxy(proxyHost: ") // 下载 URL 的内容 let content downloader.download(from: "") // 输出下载的内容 print(content) 这个程序首先…

Spring Cloud Stream如何屏蔽不同MQ带来的差异性?

引言 在当前的微服务架构下,使用消息队列(MQ)技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性,我们需要避免对单一开源技术的依赖。 市面上有多种消息队列技术,如 Kafka、RocketMQ、Rabbit…

从零构建属于自己的GPT系列1:文本数据预处理、文本数据tokenizer、逐行代码解读

🚩🚩🚩Hugging Face 实战系列 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在PyCharm中进行 本篇文章配套的代码资源已经上传 从零构建属于自己的GPT系列1:文本数据预处理 从零构建属于自己的GPT系列2:语…

【精选】Spring整合MyBatis,Junit 及Spring 事务Spring AOP面向切面详解

Spring整合MyBatis 搭建环境 我们知道使用MyBatis时需要写大量创建SqlSessionFactoryBuilder、SqlSessionFactory、SqlSession等对象的代码,而Spring的作用是帮助我们创建和管理对象,所以我们可以使用Spring整合MyBatis,简化MyBatis开发。 …

【Web端CAD/CAE文字标注】webgl+canvas 2d实现文字标注功能

一、需求背景 在CAD/CAE领域经常会遇到显示节点编号这种需求,效果如下图: 本文介绍如何在WebGL中实现文字的显示,对于如何在OpenGL中实现请绕路。 二、实现原理 Canvas是HTML5提供的元素,用于在网页上绘制图形,其支…

elasticsearch DSL语句

目录 一、DSL查询文档1.1 DSL查询分类1.2 全文检索查询1.3 精确查询1.4 地理坐标查询1.5 复合查询1.5.1 相关性算分1.5.2 算分函数查询1.5.3 布尔查询 二、搜索结果处理2.1 排序2.2 分页2.3 高亮2.4 总结 三、RestClient查询文档3.1 查询所有3.2 match查询3.3 精确查询3.4 布尔…

PyMuPDF---Python处理PDF的宝藏库详解

1、PyMuPDF简介 1.1 介绍 在介绍PyMuPDF之前,先来了解一下MuPDF,从命名形式中就可以看出,PyMuPDF是MuPDF的Python接口形式。 MuPDF MuPDF 是一个轻量级的 PDF、XPS和电子书查看器。MuPDF 由软件库、命令行工具和各种平台的查看器组成。 …

C语言进阶之笔试题详解(2)

前言 这里的内容包括二维数组笔试题和指针笔试题,供给读者对这部分知识进行加深和巩固。 ✨ 猪巴戒:个人主页✨ 所属专栏:《C语言进阶》 🎈跟着猪巴戒,一起学习C语言🎈 目录 前言 笔试题 二维数组 题目…

借助文档控件Aspose.Words,在 C# 中比较两个 PDF 文件

在当今的数字世界中,管理和比较文档是一项至关重要的任务,尤其是在商业和法律领域。在 C# 中处理 PDF 文档时,Aspose.Words for .NET 提供了用于比较 PDF 文档的强大解决方案。在这篇博文中,我们将探讨如何在 C# 应用程序中比较 P…

MySQL进阶-读写分离

✨作者:猫十二懿 ❤️‍🔥账号:CSDN 、掘金 、语雀 、Github 🎉公众号:猫十二懿 一、MySQL 读写分离介绍 读写分离,简单地说是把对数据库的读和写操作分开,以对应不同的数据库服务器。主数据库提供写操作&…

从零开始的c语言日记day38——数组参数,指针参数

一维数组传参 要把数组或者指针传给函数,那函数参数如何设计? 上面各写法有问题嘛? 第一个没问题 第二个没问题 第三个没问题 第四个没问题 第五个解析:定义int*arr2[20]为20个int*类型的数组,test2之后用的是ar…

Kubernetes(K8s)资源管理-03

资源管理 资源管理介绍 在kubernetes中,所有的内容都抽象为资源,用户需要通过操作资源来管理kubernetes。 kubernetes的本质上就是一个集群系统,用户可以在集群中部署各种服务,所谓的部署服务,其实就是在kubernetes集…

NoSQL大数据存储技术思考题及参考答案

思考题及参考答案 第1章 绪论 1. NoSQL和关系型数据库在设计目标上有何主要区别? (1)关系数据库 优势:以完善的关系代数理论作为基础,具有数据模型、完整性约束和事务的强一致性等特点,借助索引机制可以实现高效的查询&#xf…

Clickhouse Join

ClickHouse中的Hash Join, Parallel Hash Join, Grace Hash Join https://www.cnblogs.com/abclife/p/17579883.html 总结 本文描述并比较了ClickHouse中基于内存哈希表的3种连接算法。 哈希连接算法速度快,是最通用的算法,支持所有连接类型和严格性设…

TCP/IP封装

数据如何通过网络发送?为什么 OSI 模型需要这么多层? 下图显示了数据在网络传输时如何封装和解封装。 步骤1:当设备A通过HTTP协议通过网络向设备B发送数据时,首先在应用层添加HTTP头。 步骤2:然后将TCP或UDP标头添加…

Hadoop入门学习笔记

视频课程地址:https://www.bilibili.com/video/BV1WY4y197g7 课程资料链接:https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd5ay8 这里写目录标题 一、VMware准备Linux虚拟机1.1. VMware安装Linux虚拟机1.1.1. 修改虚拟机子网IP和网关1.1.2. 安装…