Kafka 快速实战及基本原理详解解析-01

一、Kafka 介绍

1. MQ 的作用

消息队列(Message Queue,简称 MQ)是一种用于跨进程通信的技术,核心功能是通过异步消息的方式实现系统之间的解耦。它在现代分布式系统中有着广泛的应用,主要作用体现在以下三个方面:

异步处理

在传统的同步调用中,生产者和消费者需要同时在线,并且生产者在完成任务后才能继续执行其他工作。这种模式限制了系统的性能。而引入消息队列后,生产者可以将任务提交到队列中,消费者按需消费任务,从而提升系统的吞吐量。

  • 示例:快递员送快递到客户家,效率低下。而菜鸟驿站的出现让快递员只需将包裹放置在驿站,客户可以根据自己的时间安排取件。这种方式大大提高了效率。
解耦

解耦是消息队列最重要的功能之一。服务之间通过消息队列传递数据,而不是直接调用对方的服务接口,这样可以有效降低系统的耦合度。

  • 示例:《Thinking in JAVA》原书是英文版,但通过翻译社将内容翻译成多种语言,满足不同读者的需求。翻译社起到了桥梁作用,不同语言之间的沟通不再直接依赖于作者和读者。
削峰填谷

在高并发场景下,系统往往会遇到流量高峰,导致系统负载过重。通过消息队列,可以将流量暂存并按固定速率处理,从而避免系统崩溃。

  • 示例:长江每年都会涨水,但通过三峡大坝的调节,下游的出水速度保持稳定,避免了洪水泛滥。

2. 为什么要用 Kafka

Kafka 是一种高吞吐量、低延迟、分布式的消息队列系统,适合在大规模数据处理场景中使用。以下是 Kafka 的典型使用场景和优势:

日志聚合场景

在大规模分布式系统中,各个服务都会产生大量的日志信息。传统的日志收集方式往往存在以下问题:

  • 数据量大:需要快速收集和处理来自各个渠道的海量日志。
  • 容错性要求高:集群中允许少量节点出现故障而不影响整体服务。
  • 功能专注:Kafka 专注于高吞吐量、低延迟的消息传递,不追求复杂的消息处理功能。
核心优势
  • 高吞吐量:Kafka 能够处理数百万 TPS(每秒事务处理量)。
  • 低延迟:通常在毫秒级别的延迟时间内完成消息传递。
  • 可扩展性:通过增加节点和分区数量,可以线性扩展处理能力。
  • 容错性:通过副本机制保证消息的高可用性。
  • 持久化:Kafka 使用磁盘存储消息,保证消息的持久性。

二、Kafka 快速上手

1. 实验环境准备

要快速上手 Kafka,首先需要搭建实验环境。以下是推荐的实验环境配置:

  • 虚拟机数量:3 台
  • 操作系统:CentOS 7
  • Java 版本:Java 8
环境配置步骤
  1. 下载 Kafka 和 Zookeeper。
  2. 将 Kafka 解压到 /app/kafka 目录,将 Zookeeper 解压到 /app/zookeeper 目录。
  3. 配置环境变量,确保系统能够识别 Kafka 和 Zookeeper 的命令。
  4. 关闭防火墙,以避免端口阻塞:
    systemctl stop firewalld.service
    

2. 单机服务体验

为了更直观地理解 Kafka 的工作原理,我们可以先体验单机版 Kafka 服务。

步骤 1:启动 Zookeeper

Kafka 依赖 Zookeeper 进行元数据管理和选举机制。在实际部署中,通常使用独立的 Zookeeper 集群。

启动 Zookeeper 服务:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

检查 Zookeeper 是否正常启动:

jps

确认输出中有 QuorumPeerMain 进程。

步骤 2:启动 Kafka

启动 Kafka 服务前,需要确保 Zookeeper 服务正常运行。

启动 Kafka 服务:

nohup bin/kafka-server-start.sh config/server.properties &

确认 Kafka 是否正常启动:

jps

检查输出中是否包含 Kafka 进程。

步骤 3:创建和使用 Topic

Kafka 的基础工作机制是通过 Topic 进行消息的传递。

  1. 创建 Topic

    bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    
  2. 发送消息 启动生产者端并发送消息:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    > 这是一条测试消息
    
  3. 消费消息 启动消费者端并接收消息:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

3. 理解 Kafka 的消息传递机制

Kafka 的消息传递机制可以通过以下核心组件来理解:

  • 生产者(Producer):将消息发送到指定的 Topic。
  • 消费者(Consumer):从指定的 Topic 消费消息。
  • Topic:逻辑概念,表示一类业务消息的集合。
  • Partition:物理概念,实际存储消息的分区。
  • Broker:Kafka 服务器实例,存储和管理 Partition。

Kafka 的设计目标是通过这些组件实现高效、可靠的消息传递,满足企业级数据管道的需求。


四、Kafka 集群服务

1. 为什么要使用集群

单机部署的 Kafka 在性能上虽然已经非常出色,但在实际生产环境中通常需要使用 Kafka 集群来进一步提升数据存储能力和系统的高可用性。集群可以解决以下问题:

1.1 解决海量数据存储问题

单个 Broker 服务器的存储能力有限,当数据量增长到一定程度时,单机难以承载。通过集群部署,可以将数据分散存储在多个 Broker 中,从而提升整体存储能力。

1.2 提高系统容错能力

单机环境中,如果 Broker 崩溃,所有数据都会丢失。而集群环境下,每个 Partition 都有多个副本,即使部分 Broker 节点宕机,系统依然可以正常运行,保证数据的高可用性。


五、理解服务端的 Topic、Partition 和 Broker

Kafka 的核心架构由 Topic、Partition 和 Broker 组成,这三者之间的关系至关重要:

  • Topic:一个逻辑的消息分类,每个 Topic 包含多条消息。
  • Partition:每个 Topic 可以分成多个 Partition,每个 Partition 是一个消息队列。
  • Broker:Kafka 的服务器实例,负责存储 Partition 数据,并处理客户端请求。

5.1 创建分布式 Topic 示例

bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic distributedTopic

5.2 查看 Topic 信息

bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic distributedTopic

六、章节总结:Kafka 集群的整体结构

通过前面的学习,我们可以总结 Kafka 集群的整体结构:

  1. Topic 是逻辑概念,Producer 和 Consumer 通过 Topic 进行消息传递。
  2. Partition 是实际存储单元,保证数据分散存储和负载均衡。
  3. Broker 是 Kafka 的服务器实例,存储 Partition 数据并处理客户端请求。
  4. Zookeeper 管理 Kafka 集群的元数据和选举过程。
  5. Controller 是 Kafka 集群的核心管理节点,负责管理 Topic 和 Partition 的分配。

七、Spring Boot 实现 Kafka 消息有序性

为了保证 Kafka 的消息有序性,可以使用 Spring Boot 和 Kafka 的整合来实现。在 Java 的 Spring Boot 项目中,我们通过指定消息的 Key 和自定义分区器来确保消息发送到相同的 Partition,从而实现有序性。

7.1 依赖配置

在 Maven 项目中,引入 Kafka 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

7.2 配置 KafkaProducer

创建 Kafka 的生产者配置类:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

7.3 发送有序消息

创建一个消息发送服务,确保消息使用相同的 Key 发送到同一个 Partition:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "test_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String key, String message) {
        kafkaTemplate.send(TOPIC, key, message);
    }
}

7.4 自定义分区器(可选)

如果有更复杂的分区逻辑,可以自定义分区器:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区逻辑
        return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

7.5 设置一个 Topic 对应一个 Partition 的方法

如果业务需求是保证某个 Topic 的消息全局有序,可以在创建 Topic 时将 Partition 数量设置为 1,从而保证所有消息存储在同一个 Partition 中,实现全局有序。

创建一个 Partition 的 Topic
bin/kafka-topics.sh --create --topic singlePartitionTopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
在 Spring Boot 中发送消息到该 Topic
@Service
public class KafkaSinglePartitionProducerService {

    private static final String TOPIC = "singlePartitionTopic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

通过这种方式,所有发送到 singlePartitionTopic 的消息都会进入同一个 Partition,确保消息顺序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/949701.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开源数据集成平台白皮书重磅发布《Apache SeaTunnel 2024用户案例合集》!

2025年新年临近&#xff0c;Apache SeaTunnel 社区用户案例精选&#x1f4d8;也跟大家见面啦&#xff01;在过去的时间里&#xff0c;SeaTunnel 社区持续成长&#xff0c;吸引了众多开发者的关注与支持。 为了致谢一路同行的伙伴&#xff0c;也为了激励更多人加入技术共创&…

Milvus×合邦电力:向量数据库如何提升15%电价预测精度

01. 全球能源市场化改革下的合邦电力 在全球能源转型和市场化改革的大背景下&#xff0c;电力交易市场正逐渐成为优化资源配置、提升系统效率的关键平台。电力交易通过市场化手段&#xff0c;促进了电力资源的有效分配&#xff0c;为电力行业的可持续发展提供了动力。 合邦电力…

Day21补代码随想录_20241231_669.修剪二叉搜索树|108.将有序数组转换为二叉搜索树|538.把二叉搜索树转换为累加树

669.修剪二叉搜索树 题目 【比增加和删除节点难的多】 给你二叉搜索树的根节点 root &#xff0c;同时给定最小边界 low 和最大边界 high。通过修剪二叉搜索树&#xff0c;使得所有节点的值在 [low, high]中。修剪树 不应该 改变保留在树中的元素的相对结构 (即&#xff0c;…

机场安全项目|基于改进 YOLOv8 的机场飞鸟实时目标检测方法

目录 论文信息 背景 摘要 YOLOv8模型结构 模型改进 FFC3 模块 CSPPF 模块 数据集增强策略 实验结果 消融实验 对比实验 结论 论文信息 《科学技术与工程》2024年第24卷第32期刊载了中国民用航空飞行学院空中交通管理学院孔建国, 张向伟, 赵志伟, 梁海军的论文——…

【USRP】教程:在Macos M1(Apple芯片)上安装UHD驱动(最正确的安装方法)

Apple芯片 前言安装Homebrew安装uhd安装gnuradio使用b200mini安装好的路径下载固件后续启动频谱仪功能启动 gnu radio关于博主 前言 请参考本文进行安装&#xff0c;好多人买了Apple芯片的电脑&#xff0c;这种情况下&#xff0c;可以使用UHD吗&#xff1f;答案是肯定的&#…

【C++数据结构——内排序】希尔排序(头歌实践教学平台习题)【合集】

目录&#x1f60b; 任务描述 相关知识 1. 排序算法基础概念 2.插入排序知识 3. 间隔序列&#xff08;增量序列&#xff09;的概念 4. 算法的时间复杂度和空间复杂度分析 5. 代码实现技巧&#xff08;如循环嵌套、索引计算&#xff09; 测试说明 我的通关代码: 测试结…

每天看一个Fortran文件(9)

最后的输出变量是f 这里面调用了一个关键的子程序&#xff0c;spectral_nudging_filter_fft_2d_ncar 这是一个谱逼近的二维快速傅里叶变换过滤的程序。 二维的滤波这个还不是很清楚&#xff0c;找找技术文件看下 超详细易懂FFT&#xff08;快速傅里叶变换&#xff09;及代码…

Centos源码安装MariaDB 基于GTID主从部署(一遍过)

MariaDB安装 安装依赖 yum install cmake ncurses ncurses-devel bison 下载源码 // 下载源码 wget https://downloads.mariadb.org/interstitial/mariadb-10.6.20/source/mariadb-10.6.20.tar.gz // 解压源码 tar xzvf mariadb-10.5.9.tar.gz 编译安装 cmake -DCMAKE_INSTA…

【通俗理解】AI的两次寒冬:从感知机困局到深度学习前夜

AI的两次寒冬&#xff1a;从感知机困局到深度学习前夜 引用&#xff08;中英双语&#xff09; 中文&#xff1a; “第一次AI寒冬&#xff0c;是因为感知机局限性被揭示&#xff0c;让人们失去了对算法可行性的信心。” “第二次AI寒冬&#xff0c;则是因为专家系统的局限性和硬…

数据结构9.3 - 文件基础(C++)

目录 1 打开文件字符读写关闭文件 上图源自&#xff1a;https://blog.csdn.net/LG1259156776/article/details/47035583 1 打开文件 法 1法 2ofstream file(path);ofstream file;file.open(path); #include<bits/stdc.h> using namespace std;int main() {char path[]…

下载ffmpeg执行文件

打开网址&#xff1a;Download FFmpeg 按下面步骤操作 解压文件就可以看到ffmpeg的执行文件了&#xff0c;需要通过命令行进行使用&#xff1a; ffmpeg命令行使用参考&#xff1a; ffmpeg 常用命令-CSDN博客

网络安全抓包

#知识点&#xff1a; 1、抓包技术应用意义 //有些应用或者目标是看不到的&#xff0c;这时候就要进行抓包 2、抓包技术应用对象 //app,小程序 3、抓包技术应用协议 //http&#xff0c;socket 4、抓包技术应用支持 5、封包技术应用意义 总结点&#xff1a;学会不同对象采用…

国产编辑器EverEdit - 两种删除空白行的方法

1 使用技巧&#xff1a;删除空白行 1.1 应用场景 用户在编辑文档时&#xff0c;可能会遇到很多空白行需要删除的情况&#xff0c;比如从网页上拷贝文字&#xff0c;可能就会存在大量的空白行要删除。 1.2 使用方法 1.2.1 方法1&#xff1a; 使用编辑主菜单 选择主菜单编辑 …

可以输入的下拉框(下拉框数据过大,页面卡死)

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 在项目中&#xff0c;有些下拉框的数据过于庞大&#xff0c;这样页面有时候会卡死&#xff0c;在vue3中常用的组件库element-puls中有个组件可以避免 在项目中&#xff0c;有些需求要求下拉框选择的同…

基于Python的音乐播放器 毕业设计-附源码73733

摘 要 本项目基于Python开发了一款简单而功能强大的音乐播放器。通过该音乐播放器&#xff0c;用户可以轻松管理自己的音乐库&#xff0c;播放喜爱的音乐&#xff0c;并享受音乐带来的愉悦体验。 首先&#xff0c;我们使用Python语言结合相关库开发了这款音乐播放器。利用Tkin…

谷粒商城-高级篇完结-Sleuth+Zipkin 服务链路追踪

1、基本概念和整合 1.1、为什么用 微服务架构是一个分布式架构&#xff0c;它按业务划分服务单元&#xff0c;一个分布式系统往往有很多个服务单元。由于服务单元数量众多&#xff0c;业务的复杂性&#xff0c;如果出现了错误和异常&#xff0c;很难去定位 。主要体现在&#…

ollama+FastAPI部署后端大模型调用接口

ollamaFastAPI部署后端大模型调用接口 记录一下开源大模型的后端调用接口过程 一、ollama下载及运行 1. ollama安装 ollama是一个本地部署开源大模型的软件&#xff0c;可以运行llama、gemma、qwen等国内外开源大模型&#xff0c;也可以部署自己训练的大模型 ollama国内地…

pandas系列----DataFrame简介

DataFrame是Pandas库中最常用的数据结构之一&#xff0c;它是一个类似于二维数组或表格的数据结构。DataFrame由多个列组成&#xff0c;每个列可以是不同的数据类型&#xff08;如整数、浮点数、字符串等&#xff09;。每列都有一个列标签&#xff08;column label&#xff09;…

Unity【Colliders碰撞器】和【Rigibody刚体】的应用——小球反弹效果

目录 Collider 2D 定义&#xff1a; 类型&#xff1a; Rigidbody 2D 定义&#xff1a; 属性和行为&#xff1a; 运动控制&#xff1a; 碰撞检测&#xff1a; 结合使用 实用检测 延伸拓展 1、在Unity中优化Collider 2D和Rigidbody 2D的性能 2、Unity中Collider 2D…

Java实现UDP与TCP应用程序

三、Java实现UDP应用程序 3.1 InetAddress类 java.net.InteAddress类是用于描述IP地址和域名的一个Java类&#xff1b; 常用方法如下&#xff1a; public static InetAddress getByName(String host)&#xff1a;根据主机名获取InetAddress对象public String getHostName()…