Kafka-快速实战

Kafka介绍

ChatGPT对于Apache Kafka的介绍:

Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发并于2011年开源。它主要用于解决大规模数据的实时流式处理和数据管道问题。

Kafka是一个分布式的发布-订阅消息系统,可以快速地处理高吞吐量的数据流,并将数据实时地分发到多个消费者中。Kafka消息系统由多个broker(服务器)组成,这些broker可以在多个数据中心之间分布式部署,以提供高可用性和容错性。

Kafka的基本架构由生产者、消费者和主题(topic)组成。生产者可以将数据发布到指定的主题,而消费者可以订阅这些主题并消费其中的数据。同时,Kafka还支持数据流的处理和转换,可以在管道中通过Kafka Streams API进行流式计算,例如过滤、转换、聚合等。

Kafka使用高效的数据存储和管理技术,能够轻松地处理TB级别的数据量。其优点包括高吞吐量、低延迟、可扩展性、持久性和容错性等。

Kafka在企业级应用中被广泛应用,包括实时流处理、日志聚合、监控和数据分析等方面。同时,Kafka还可以与其他大数据工具集成,如Hadoop、Spark和Storm等,构建一个完整的数据处理生态系统。

MQ的作用

MQ:MessageQueue,消息队列。 队列,是一种FIFO 先进先出的数据结构。消息则是跨进程传递的数据。一个典型的MQ系统,会将消息消息由生产者发送到MQ进行排队,然后根据一定的顺序交由消息的消费者进行处理。

主要作用:

  • 异步

    异步能提高系统的响应速度、吞吐量。

  • 解耦

    1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。

    2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

  • 削峰

    作用:以稳定的系统资源应对突发的流量冲击。

为什么要用Kafka

典型日志聚合的应用场景:

业务场景决定了产品的特点。

1、数据吞吐量很大: 需要能够快速收集各个渠道的海量日志

2、集群容错性高:允许集群中少量节点崩溃

3、功能不需要太复杂:Kafka的设计目标是高吞吐、低延迟和可扩展,主要关注消息传递而不是消息处理。所以,Kafka并没有支持死信队列、顺序消息等高级功能。

4、允许少量数据丢失:Kafka本身也在不断优化数据安全问题,目前基本上可以认为Kafka可以做到不会丢数据。

Kafka快速上手

实验环境

准备三台CentOS7的虚拟机,预备搭建三台机器的集群。分别配置机器名 worker1,worker2,worker3。

vi /etc/hosts

192.168.146.128 worker1
192.168.146.129 worker2
192.168.146.130 worker3

关闭防火墙(实验环境建议关闭)

firewall-cmd --state   查看防火墙状态
systemctl stop firewalld.service   关闭防火墙

补充:虚拟机centos7遇到问题,bash: jps: 未找到命令... 的解决方案

yum list *openjdk-devel*
#安装适合自己的版本
yum install java-1.8.0-openjdk-devel.x86_64
#安装过程有几个同意步骤,输入y,安装完成测试jps ok!

下载kafka地址:Apache Kafka ,选择kafka_2.13-3.4.0.tgz进行下载。

下载Zookeeper地址 Apache ZooKeeper ,这里选择比较新的3.6.1版本。

下载完成后,将这两个工具包上传到服务器上,解压后,分别放到/app/kafka和/app/zk目录下。并将部署目录下的bin目录路径配置到path环境变量中。

环境变量/etc/profile最终配置:

export ZK_HOME=/app/zk/apache-zookeeper-3.6.4-bin
export KAFKA_HOME=/app/kafka/kafka_2.13-3.4.0
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export PATH=$KAFKA_HOME/bin:$ZK_HOME/bin:$JAVA_HOME/bin:$PATH
export CLASSPATH=:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

单机服务体验

1、启动Kafka之前需要先启动Zookeeper。

#解压命令
tar zxvf apache-zookeeper-3.6.4-bin.tar.gz
tar zxvf kafka_2.13-3.4.0.tgz

#这里用Kafka自带的Zookeeper启动脚本
cd kafka_2.13-3.4.0/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

通过jps指令看到一个QuorumPeerMain进程,确定服务启动成功。zk默认启动在2181端口

启动遇到问题可以查看nohup.out日志文件,注意脚本的执行权限

2、启动Kafka

nohup bin/kafka-server-start.sh config/server.properties &

启动完成后,使用jps指令,看到一个kafka进程,确定服务启动成功。服务默认9092端口

3、简单收发消息

Kafka的基础工作机制:消息发送者将消息发送到kafka上指定的topic,消息消费者从指定的topic上消费消息。

简单收到命令:

#创建Topic
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
#查看Topic
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
#启动一个消息发送者端,往一个名为test的Topic发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#启动一个消息接收者端,接收名为test的Topic消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

生产者端示例:

消费者端示例:

注意:消费者启动命令执行后有几秒钟延迟(启动中接收不到消息),默认处理启动成功后接收到的消息

4、其他消费模式

指定消费进度

#通过--from-beginning消费之前发的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
#指定从哪一条消息开始消费,offset表示索引/偏移量,索引4也就是第五条消息开始,0号partition
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 4 --topic test

分组消费

kafka中的同一条消息,只能被同一个消费者组下的某一个消费者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。通过--consumer-property group.id=testGroup指定消费者组

#两个消费者实例属于同一个消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
#这个消费者实例属于不同的消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test

查看消费者组的偏移量

#查看消费者组的情况,包括消费进度。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

···命令输出结果示例
Consumer group 'testGroup' has no active members. #没有活跃消费者
    
GROUP       TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
testGroup  test   0          20              20              0    ...			...	  ...
···

描述:
PARTITION		分区
CURRENT-OFFSET	当前消费进度
LOG-END-OFFSET	日志种最大消息进度
LAG				未消费消息数

虽然业务上是通过Topic来分发消息的,但是实际上,消息是保存在Partition这样一个数据结构上

理解Kakfa的消息传递机制

Kafka的消息发送者和消息消费者通过Topic这样一个逻辑概念来进行业务沟通。但是实际上,所有的消息是存在服务端的Partition这样一个数据结构当中的。

  • 客户端Client: 包括消息生产者和消息消费者。

  • 消费者组:每个消费者可以指定一个所属的消费者组,相同消费者组的消费者共同构成一个逻辑消费者组。每一个消息会被多个感兴趣的消费者组消费,但是在每一个消费者组内部,一个消息只会被消费一次。

  • 服务端Broker:一个Kafka服务器就是一个Broker。

  • 话题Topic:这是一个逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端都通过绑定Topic来生产或者消费自己感兴趣的话题。

  • 分区Partition:Topic只是一个逻辑概念,而Partition就是实际存储消息的组件。每个Partiton就是一个queue队列结构。所有消息以FIFO先进先出的顺序保存在这些Partition分区中。

Kafka集群服务

为什么要用集群?

单机服务下,Kafka已经具备了非常高的性能。TPS能够达到百万级别。但是,在实际工作中使用时,单机搭建的Kafka会有很大的局限性。

一方面:消息太多,需要分开保存。Kafka是面向海量消息设计的,一个Topic下的消息会非常多,单机服务很难存得下来。这些消息就需要分成不同的Partition,分布到多个不同的Broker上。这样每个Broker就只需要保存一部分数据。这些分区的个数就称为分区数。

另一方面:服务不稳定,数据容易丢失。单机服务下,如果服务崩溃,数据就丢失了。为了保证数据安全,就需要给每个Partition配置一个或多个备份,保证数据不丢失。Kafka的集群模式下,每个Partition都有一个或多个备份。Kafka会通过一个统一的Zookeeper集群作为选举中心,给每个Partition选举出一个主节点Leader,其他节点就是从节点Follower。主节点负责响应客户端的具体业务请求,并保存消息。而从节点则负责同步主节点的数据。当主节点发生故障时,Kafka会选举出一个从节点成为新的主节点。

最后:Kafka集群中的这些Broker信息,包括Partition的选举信息,都会保存在额外部署的Zookeeper集群当中,这样,kafka集群就不会因为某一些Broker服务崩溃而中断。

Kafka集群架构:

​​​​​​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/233437.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

案例课4——智齿客服

1.公司介绍 智齿科技,一体化客户联络中心解决方案提供商。提供基于「客户联络中心」场景的一体化解决方案,包括公域私域、营销服务、软件BPO的三维一体化。 智齿科技不断整合前沿的人工智能及大数据技术,已构建形成呼叫中心、机器人「在线语音…

46. 全排列

全排列 描述 : 给定一个不含重复数字的数组 nums ,返回其 所有可能的全排列 。你可以 按任意顺序 返回答案。 题目 : LeetCode 46.全排列 : 46. 全排列 分析 : 这里给个非常好的视频 : LeetCode力扣 46. 全排列Permutations_哔哩哔哩_bilibili 解析: class S…

双水平呼吸机算法怎么写?(其实是记录自己写呼吸的心得)

双水平正压呼吸机是什么? 市面上的双水平呼吸机,就是包含有双水平模式的呼吸机,其中一般也会包含单水平模式。其中正压的意思,就是抬高呼吸的压力基线,使吸气顺畅一些。 呼吸机硬件参考 不能给太详细,就给…

机械中常用的一些术语

目录 一、OEMSOP:SOP编写指南 WI(标准作业指导书):标准作业程序 (SOP):SOP和WI的区别:一、PFC、FMEA、PCP、WIPPAP、PSW: 一、OEM 1.OEM: 原始设备制造商OEM(Original Equipment Manufacturer)…

从零开始的C++(二十一)

C11 1.列表初始化: //允许以下代码正确运行int a[]{1,2,3};//效果与int a[]{1,2,3}一致 即允许省略等于号。同时,允许用花括号对所有自定义类型和内置类型进行初始化,而非以前花括号只能对数组进行初始化。利用花括号对自定义类型初始化时…

数据结构和算法-单链表

数据结构和算法-单链表 1. 链表介绍 链表是有序的列表,但是它在内存中是存储如下 图1 单链表示意图 小结: 链表是以节点的方式存储每个节点包含data域,next域,指向下一个节点。如图:发现链表的各个节点不一定是连续存储。比如地…

C语言函数详解

# 函数的概念 对于函数,我想大家应该并不陌生,在数学中就存在函数的概念,比如:一次函数 ykxb ,k和b都是常数,给⼀个任意的x,就能得到⼀个y值。 在C语言中也有函数的概念,函数也被称为…

unity 模型生成PNG图片并导出(可以任意控制方向和大小,本文提供三种方案)

提示:文章有错误的地方,还望诸位大神不吝指教! 文章目录 前言一、插件RuntimePreviewGenerator(方案一)二、unity 官方提供的接口(方案二)三、方法三,可以处理单个模型,也…

STM32基于USB串口通信应用开发

✅作者简介:热爱科研的嵌入式开发者,修心和技术同步精进, 代码获取、问题探讨及文章转载可私信。 ☁ 愿你的生命中有够多的云翳,来造就一个美丽的黄昏。 🍎获取更多嵌入式资料可点击链接进群领取,谢谢支持!…

git自动更新功能

确认权限 因为一般Linux系统网页用的www 或 www-data用户和用户组,所以要实现自动来去,首先要在www用户权限下生成ssh密钥,不然没有权限,其次就是,要把用root用户拉去的代码,批量改成www用户 1. 给www权限 vi /etc/sudoers www ALL=(ALL) NOPASSWD:/bin/chow…

整体式雨水收集pp模块可根据需求承重可达30到60吨每平方米

整体式雨水收集pp模块的承重能力主要取决于其设计和制造工艺,以及所使用的材料。一般来说,模块的尺寸越大,承重能力也越大。同时,模块的设计和制造工艺也会影响其承重能力。 在设计和制造整体式雨水收集pp模块时,需要…

unity 2d 入门 飞翔小鸟 小鸟碰撞 及死亡(九)

1、给地面,柱体这种添加2d盒装碰撞器,小鸟移动碰到就不会动了 2、修改小鸟的脚本(脚本命名不规范,不要在意) using System.Collections; using System.Collections.Generic; using UnityEngine;public class Fly : Mo…

CompletableFuture异步多任务最佳实践

简介 CompletableFuture相比于Java8的并行流,对于处理并发的IO密集型任务有着得天独厚的优势: 在流式编程下,支持构建任务流时即可执行任务。CompletableFuture任务支持提交到自定义线程池,调优方便。 本文所有案例都会基于这样…

向日葵远程控制鼠标异常的问题

​ 在通过向日葵进行远程控制的时候,可能会遇到鼠标位置异常的问题。此时,不管怎么移动鼠标,都会停留在屏幕最上方,而无法点击到正确的位置。如图: 此时,如果启用了“被控端鼠标”功能,可以正…

ChatGLM3-6B和langchain阿里云部署

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、ChatGLM3-6B部署搭建环境部署GLM3 二、Chatglm2-6blangchain部署三、Tips四、总结 前言 提示:这里可以添加本文要记录的大概内容: …

回顾2023 亚马逊云科技 re_Invent,创新AI,一路同行

作为全球云计算龙头企业的亚马逊云科技于2023年11月27日至12月1日在美国拉斯维加斯举办了2023 亚马逊云科技 re:Invent,从2012年开始举办的亚马逊云科技 re:Invent 全球大会,到现如今2023 亚马逊云科技 re:Invent,回顾历届re:Invent大会,亚马…

imutils库介绍及安装学习

目录 本机环境 安装 函数及属性 列举imutils库信息 属性和函数介绍及使用 属性 常用函数 方法使用 图像平移 图像缩放 图像旋转 骨架提取 通道转换 OPenCV版本的检测 综合测试 介绍 imutils 是一个用于图像处理和计算机视觉任务的 Python 工具包。它提供了一系…

Python爬虫-实现批量抓取王者荣耀皮肤图片并保存到本地

前言 本文是该专栏的第12篇,后面会持续分享python爬虫案例干货,记得关注。 本文以王者荣耀的英雄皮肤为例,用python实现批量抓取“全部英雄”的皮肤图片,并将图片“批量保存”到本地。具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。注意,这里抓取的图片…

centos7中的计划任务

一次调度执行-----at 安装&#xff1a; [rootzaotounan ~]# yum -y install at ​ 启动&#xff1a; [rootzaotounan ~]# systemctl start atd ​ 开机自启动&#xff1a; [rootzaotounan ~]# systemctl enbale atd ​ 语法&#xff1a; at <时间规格> 时间规格参数&…

Linux下C++动态链接库的生成以及使用

目录 一.前言二.生成动态链接库三.使用动态链接库 一.前言 这篇文章简单讨论一下Linux下如何使用gcc/g生成和使用C动态链接库&#xff08;.so文件&#xff09;。 二.生成动态链接库 先看下目录结构 然后看下代码 //demo.h#ifndef DEMO_H #define DEMO_H#include<string&…