消息队列kafka中间件详解:案例解析(第10天)

系列文章目录

  • 1- 消息队列(熟悉)
  • 2- Kafka的基本介绍(掌握架构,其他了解)
  • 3- Kafka的相关使用(掌握kafka常用shell命令)
  • 4- Kafka的Python API的操作(熟悉)

文章目录

    • 系列文章目录
    • 前言
    • 一、消息队列(熟悉)
      • 1、产生背景
      • 2、消息队列介绍
        • 2.1 常见的消息队列产品
        • 2.2 应用场景
        • 2.3 消息队列中两种消息模型
    • 二、Kafka的基本介绍
      • 1、Kafka基本介绍
      • 2、回顾zookeeper知识
      • 3、Kafka的架构(掌握)
    • 三、Kafka的shell命令使用(掌握)
      • 1、topics操作
      • 2、producer和consumer操作
      • 3、bootstrap-server和zookeeper以及broker-list的区别:
    • 四、kafka tools工具使用(熟悉)
      • 1、连接配置
      • 2、创建主题
      • 3、删除主题
      • 4、主题下的数据查看
      • 5、数据显示问题说明
      • 6、发送消息数据到kafka
    • 五、Kafka的Python API的操作(熟悉)
        • 1、模块安装
        • 2、模块使用
          • 2.1 完成生产者代码
          • 2.2 完成消费者代码


前言

本文主讲述了 消息队列,Kafka的架构,Kafka的相关使用和常用shell命令,Kafka的Python API的操作;


一、消息队列(熟悉)

1、产生背景

消息队列:指的数据在一个容器中,从容器中一端传递到另一端的过程

消息(message): 指的是数据,只不过这个数据存在一定流动状态
队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性
思考: 公共容器需要具备什么特点?
1- 公共性: 各个程序都可以与之对接
2- FIFO特性: 先进先出
3- 具备高效的并发能力: 能够承载海量数据
4- 具备一定的容错能力: 比如支持重新读取消息方案

2、消息队列介绍

2.1 常见的消息队列产品

MQ:message queue消息队列

activeMQ: 出现时期比较早的一款消息队列的中间件产品,在早期使用人群是非常多,目前整个社区活跃度严重下降,使用人群很少了
rabbitMQ: 此款是目前使用人群比较多的一款消息队列的中间件的产品,社区活跃度比较高,主要是应用传统业务领域中
rocketMQ: 是阿里推出的一款消息队列的中间件的产品,目前主要是在阿里系环境中使用,目前支持的客户端比较少,主要是Java中应用较多
Kafka: Apache旗下的顶级开源项目,是一款消息队列的中间件产品项目来源于领英,是大数据体系中目前为止最为常用的一款消息队列的产品

在这里插入图片描述

2.2 应用场景
  • 应用解耦合
  • 异步处理
  • 限流削峰
  • 消息驱动系统
2.3 消息队列中两种消息模型
在Java中, 为了能够集成消息队列的产品, 专门提供了一个消息队列的协议: JMS(Java Message Server)  java消息服务

消息队列中两个角色: 生产者(producer) 和 消费者(consumer)
生产者: 生产/发送消息到消息队列中
消费者: 从消息队列中获取消息

在JMS规范中, 专门规定了两种消息消费模型: 
1- 点对点消费模型: 指的一条消息最终只能被一个消费者所消费。微信聊天的私聊
2- 发布订阅消费模型: 指的一条消息最终被多个消费者所消费。微信聊天的群聊

二、Kafka的基本介绍

1、Kafka基本介绍

​ Kafka是一款消息队列的中间件产品, 来源于领英公司, 后期贡献给了Apache, 目前是Aapche旗下的顶级开源项目, 采用语言是Scala

​ 官方地址: http://kafka.apache.org

kafka的特点:

  • 可靠性:Kafka集群是分布式的,并且有多副本的机制。数据可以自动复制
  • 可扩展性:Kafka集群可以灵活的调整,在线扩容
  • 耐用性:Kafka数据保存在磁盘上面,数据并且有多副本的机制。数据持久化,而且可以一定程度上防止数据丢失
  • 高性能:Kafka可以存储海量的数据,虽然是使用磁盘进行数据存储,但是Kafka有各种优化手段(例如:磁盘的顺序读写、零拷贝等)提高数据的读写速度(吞吐量)

2、回顾zookeeper知识

Kafka需要使用到zookeeper服务!

  • 启动zookeeper服务
# 三台都需要启动zookeeper服务
[root@node1 ~]# /export/server/zookeeper/bin/zkServer.sh start
[root@node2 ~]# /export/server/zookeeper/bin/zkServer.sh start
[root@node3 ~]# /export/server/zookeeper/bin/zkServer.sh start
  • zookeeper工具连接

把ZooInspector.rar解压然后进入ZooInspector\build双击zookeeper-dev-ZooInspector.jar(资源已经上传博客)

在这里插入图片描述

3、Kafka的架构(掌握)

HDFS写入过程回顾:

在这里插入图片描述

Kafka架构:

在这里插入图片描述

1- Kafka中集群节点叫broker,节点和节点之间没有主从之分,地位是完全一样
2- Topic:主题/话题,是业务层面对消息进行分类的。
3- 一个Topic可以设置多个Partition分区。
4- 同一个Partition分区可以设置多个副本,但是副本数不能超过(>)集群broker节点的个数
5- 虽然broker节点间没有主从之分,但是同一个Partition分区的不同副本间有主从之分,分为了Leader主副本和Follower从副本
6- 生产者将数据首先发送给到Leader主副本,接着是Leader主副本主动的往Follower从副本上同步消息
7- Zookeeper用来管理集群,以及管理元数据信息
8- ISR同步列表。该列表中存放的是与Leader主副本消息同步程度最接近的Follower从副本,也就是消息最小的一个列表。该列表作用,当Leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follower从副本变成Leader主副本,对外提供服务


相关名词:
Kafka Cluster: Kafka集群
Topic: 主题/话题
Broker: Kafka中的节点
Producer: 生产者,负责生产/发送消息到Kafka中
Consumer: 消费者,负责从Kafka中获取消息
Partition: 分区。一个Topic可以设置多个分区,没有数量限制

三、Kafka的shell命令使用(掌握)

​ Kafka本质上就是一个消息队列的中间件的产品,主要负责消息数据的传递。也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据

1、topics操作

注意:

创建topic不指定分区数和副本数,默认都是1个

分区数可以后期通过alter增大,但是不能减小

副本数一旦确定,不能修改!

参数如下:

cd /export/server/kafka/bin

./kafka-topics.sh 参数说明:
	--bootstrap-server: Kafka集群中broker服务器
	--topic: 指定Topic名称
	--partitions: 设置Topic的分区数,可以省略不写
	--replication-factor: 设置Topic分区的副本数,可以省略不写
	
	--create: 指定操作类型。这里是新建Topic
	--delete: 指定操作类型。这里是删除Topic
	--alter: 指定操作类型。这里是修改Topic
	--list: 指定操作类型。这里是查看所有Topic列表
	--describe: 指定操作类型。这里是查看详细且具体的Topic信息
	
  • 1- 创建Topic
# 创建topic,默认1个分区,1个副本
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itcast 
# 注意: 如果副本数超过了集群broker节点个数,就会报错
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 4

在这里插入图片描述

# 把replication-factor改成3以内就能创建成功了
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 3
  • 2- 查看Topic
# --list查看所有topic
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
# --describe 可以查看详细Topic信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe 

# --describe 可以查看具体Topic信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic itheima

在这里插入图片描述

当然也可使用zookeeper客户端查看

在这里插入图片描述

  • 3- 修改Topic
# 增大topic分区
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4
# 注意: partitions分区,只能增大,不能减小。而且没有数量限制
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 1

在这里插入图片描述

# 注意: 副本既不能增大,也不能减小
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4 --replication-factor 2

在这里插入图片描述

  • 4- 删除Topic
# 再创建一个spark主题
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark

/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list

# 删除spark主题

/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --delete --topic spark

/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list

2、producer和consumer操作

消费者要和生产者指定是同一个topic主题,才能接收到消息

参数如下:

cd /export/server/kafka/bin

./kafka-console-producer.sh 参数说明
	--broker-list: Kafka集群中broker服务器
	--topic: 指定Topic
	
./kafka-console-consumer.sh 参数说明
	--bootstrap-server: Kafka集群中broker连接信息
	--topic: 指定Topic
	latest: 消费者(默认)从最新的地方开始消费
	--from-beginning: 指定该参数以后,会从最旧的地方开始消费
	--max-messages: 最多消费的条数。
  • 1- 模拟生产者Producer
# 为了方便演示再创建一个spark
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark

# 模拟生产者给spark发送消息
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark
  • 2- 模拟消费者Consumer
# 模拟消费者从spark获取消息,默认每次拿最新的
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark 

# --from-beginning 会从最旧的地方开始消费
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning

# --max-messages x 可以设置从最旧的地方最大消费次数x
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning --max-messages 5

注意:

我们有时候发现消费者打印出来的消息和生产者生产的顺序不一致,是乱序的。原因如下:

topic有多个分区,底层是多线程来读取数据并进行打印输出。因此会存在乱序现象

3、bootstrap-server和zookeeper以及broker-list的区别:

旧版(<v2.2): kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic ..
注意: 旧版用--zookeeper参数,主机名(或IP)和端口用ZooKeeper的2181,也就是server.properties文件中zookeeper.connect属性的配置值.

新版(>v2.2): kafka-topics.sh --bootstrap-server node1:9092 --create --topic ..
注意: 新版用--bootstrap-server参数,主机名(或IP)和端口用某个节点的即可,即主机名(或主机IP):9092。9092是Kafka的监听端口



broker-list:broker指的是kafka的服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。一般我们再使用console producer的时候,broker-list参数是必备参数,另外一个必备的参数是topic

bootstrap-servers: 指的是kafka集群的服务器地址,这个和broker-list功能是一样的,只不过我们在console producer要求用broker-list,其他地方都采用bootstrap-servers。

四、kafka tools工具使用(熟悉)

可以在可视化的工具通过点击来操作kafka完成主题的创建,分区等操作,资源包已经上传到博客第10天内

在这里插入图片描述

注意: 安装完后桌面不会有快捷方式,需要去电脑上搜索,或者去自己选的安装位置找到发送快捷方式到桌面!

在这里插入图片描述

在这里插入图片描述

1、连接配置

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2、创建主题

在这里插入图片描述

在这里插入图片描述

3、删除主题

在这里插入图片描述

在这里插入图片描述

4、主题下的数据查看

在这里插入图片描述

5、数据显示问题说明

在这里插入图片描述

  • 修改工具的数据显示类型

    在这里插入图片描述

    在这里插入图片描述

在这里插入图片描述

6、发送消息数据到kafka

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

五、Kafka的Python API的操作(熟悉)

1、模块安装

纯Python的方式操作Kafka。

准备工作:在node1的节点上安装一个python用于操作Kafka的库

安装kafka-python 模模块 ,模块中提供了操作kafka的方法

在线安装

在node1上安装就可以,需要保证服务器能够连接网络

安装命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

在这里插入图片描述

离线安装

将kafka_python-2.0.2-py2.py3-none-any.whl安装包上传服务器software目录下进行安装

安装命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl
2、模块使用

API使用的参考文档: https://kafka-python.readthedocs.io/en/master/usage.html#kafkaproducer

模块中封装了两个类,

一个是生成者类KafkaProducer,提供了向kafka写数据的方法

另一个是消费者类KafkaConsumer,提供了读取kafka数据的方法

2.1 完成生产者代码

生成者类KafkaProducer,提供了向kafka写数据的方法

send(topic,valu)方法: 发送消息
topic参数:指定向哪个主题发送消息
value参数:指定发送的消息数据 ,数据类型要求是bytes类型

示例:

# 导包
from kafka import KafkaProducer

# 编写代码
if __name__ == '__main__':
    # 创建生产者对象并指定对应服务器
    producer = KafkaProducer(bootstrap_servers=['node1:9092'])
    # 发送消息
    for i in range(1,101):
        future = producer.send('kafka', f'hi_kafka_{i}'.encode())
        # 获取元数据
        record_metadata = future.get()
        # 从元数据中获取主题,分区,偏移
        print(record_metadata.topic)
        print(record_metadata.partition)
        print(record_metadata.offset)
2.2 完成消费者代码

消费者类KafkaConsumer,提供了读取kafka数据的方法

KafkaConsumer(topic,bootstrap_servers)
第一个参数:指定消费者连接的主题,
第二个参数:指定消费者连接的kafka服务器

示例:

# 导包
from kafka import KafkaConsumer

# 编写代码
if __name__ == '__main__':

    # 创建消费者对象
    consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092'])
    # 遍历对象
    for message in consumer:

        # 格式化打印,设置相关参数
        # 因为value是二进制,需要decode解码
        print ("主题:%s,分区:%d,偏移:%d : key=%s value=%s"
               % (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))

可能遇到的错误:

在这里插入图片描述

原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/730393.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

拒绝吸烟,远离慢阻肺——朗格力复合营养素助力守护肺部健康

#肺科营养#朗格力#班古营养#复合营养素#肺部营养#肺部健康# 你知道吗?慢阻肺这一疾病在我国的患者数量已突破亿级大关,尤其在40岁以上的成年人中,平均每7个人中就有1位可能受其困扰。然而,很多人对慢阻肺的严重性认识不足,常常将其视为一种普通的咳嗽或喘息,忽视了它潜在的危…

ChatGPT 提示词技巧一本速通

目录 一、基本术语 二、提示词设计的基本原则 三、书写技巧 2.1 赋予角色 2.2 使用分隔符 2.2 结构化输出 2.3 指定步骤 2.4 提供示例 2.5 指定长度 2.6 使用或引用参考文本 2.7 提示模型进行自我判断 2.8 思考问题的解决过程 ​编辑 2.10 询问是否有遗漏 2.11 …

Consul 如何删除不需要的服务

一、找到需要删除的id 二、打开postman 使用put请求 http://ip:port/v1/agent/service/deregister/mc-admin-192-168-0-182-8084三、区域如果要验证输入验证

倍思突破氮化镓快充技术,为用户带来安全舒适体验

氮化镓,这个化学式为GaN的化合物,其高热稳定性和化学稳定性使其在多种极端环境中都能保持优良的性能,从而为其在电子器件领域的应用奠定了坚实的基础。 2018年前后开始,氮化镓快充充电器进入国内市场。作为第三代半导体材料的代表,氮化镓具有宽禁带的特性,其禁带宽度远大于传统…

芋道源码 yudao-cloud 、Boot 文档,开发指南 看全部,破解[芋道快速开发平台 Boot + Cloud]

1、文档全部保存本地部署查看&#xff0c;真香 文档已抓取最新版本&#xff0c;2024.06.21。【唯一遗憾&#xff0c;表结构到2024.04月&#xff0c;已被限制放到知识星球】会员中心&#xff0c;支付中心&#xff0c;CRM&#xff0c;ERP&#xff0c;商城&#xff0c;公众号运行…

代理网络基础设施 101:增强安全性、速度和可扩展性

编辑代理网络在现代网络架构中发挥着重要作用&#xff0c;充当管理和重新路由数据流的中介。它们处理的数据可以是各种类型&#xff0c;包括搜索查询和潜在的敏感客户信息&#xff0c;这凸显了它们在数据安全方面的作用。 然而&#xff0c;代理的好处不仅限于安全性。它们为用…

二分练习题(C. Earning on Bets)

二分练习题&#xff08;C. Earning on Bets&#xff09; 原题链接&#xff1a;点击此处 Earning on Bets 题面翻译 有人提议让您玩一个游戏。在这个游戏中&#xff0c;有 n n n 种可能的结果&#xff0c;对于每一种结果&#xff0c;您都必须下注一定整数的硬币。如果 i …

微信小程序UI组件库合集

文章目录 前言参考地址推荐组件库1.官方WeUI&#xff08;建议使用☆☆☆☆&#xff09;2.ColorUI&#xff08;广告很多&#xff0c;不建议使用&#xff09;3.vantUI又名&#xff1a;ZanUI&#xff08;操作简单&#xff0c;建议使用☆☆☆☆&#xff09;4.MinUI&#xff08;比较…

摊牌了,我不装了~各种Amazon Bedrock小样儿、试用装,今天免费!

探索世界顶级的大模型、智能体、文生图、对话机器人……新手&#xff1f;还是专家&#xff1f;加入我们&#xff0c;解锁精彩内容&#xff1a; l 初体验&#xff1a;在 Amazon Bedrock Playground 直接调用强大的大模型&#xff0c;点亮生成式AI技能树。 l 文生图&#xff1a…

(vue3)基于vite+vue3+element-plus项目创建

(vue3)基于vitevue3element-plus项目创建 vue.js官方中文文档&#xff1a;https://cn.vuejs.org/guide/quick-start.html vite官方中文文档&#xff1a;https://cn.vitejs.dev/guide/ element-plus官网&#xff1a;https://element-plus.org/zh-CN/guide/installation.html 第…

python学习笔记-10

面向对象编程-下 1.私有化属性 语法&#xff1a;两个下划线开头&#xff0c;声明该属性为私有&#xff0c;不能在类的外部被使用或直接访问。 使用私有化属性的场景&#xff1a; 1.把特定的一个属性隐藏起来&#xff0c;不让类的外部进行直接调用。 2.不让属性的值随意改变。…

微服务开发与实战Day08 - Elasticsearch

一、初始Elasticsearch 高性能分布式搜索引擎 1. 认识和安装 1.1 认识 Lucene是一个Java语言的搜索引擎类库&#xff0c;是Apache公司的顶级项目&#xff0c;由DougCutting于1999年研发。官网地址&#xff1a;Apache Lucene - Welcome to Apache Lucene Lucene的优势&…

RabbitMQ(七)Shovel插件对比Federation插件

文章目录 Shovel和Federation的主要区别&#xff08;重点&#xff09;一、启用Shovel插件二、配置Shovel三、测试1、测试计划2、测试效果发布消息源节点目标节点 Shovel和Federation的主要区别&#xff08;重点&#xff09; • Shovel更简洁一些 • Federation更倾向于跨集群使…

基于JSP技术的个性化影片推荐系统

开头语&#xff1a;你好呀&#xff0c;我是计算机学长猫哥&#xff01;如果有相关需求&#xff0c;文末可以找到我的联系方式。 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;JSPServlet 工具&#xff1a;MyEclipse、Tomcat、MySQL 系统展示 首页 …

Golang——channel

channel是Go在语言层面提供的协程间的通信方式。通过channel我们可以实现多个协程之间的通信&#xff0c;并对协程进行并发控制。 使用注意&#xff1a; 管道没有缓冲区时&#xff0c;从管道中读取数据会阻塞&#xff0c;直到有协程向管道中写入数据。类似地&#xff0c;向管道…

内网安全[3]-代理Socks协议路由不出网后渗透通讯CS-MSF控制上线

1.环境 隧道技术: 隧道技术是一类网络协议&#xff0c;它是一种数据包封装技术&#xff0c;它将原始IP包&#xff08;其报头包含原始发送者和最终目的地&#xff09;封装在另外一个数据包&#xff08;称为封装的IP包&#xff09;的数据净荷中进行传输&#xff0c;使用隧道的原…

V4和V6双栈处理

现进行双栈 对R1 对R2 对R3 对R4 路由地址配完&#xff0c;起协议 然后起ripng&#xff0c;在R2&#xff0c;R3&#xff0c;R4上都宣告一下 然后在PC1和PC2上都手动配置一下就可以了

第1章 MySQL数据库概述

1.1 基本概念 数据库是什么&#xff1f; 存储数据的地方 DB&#xff1a;数据库&#xff08;Database&#xff09; 为什么要用数据库&#xff1f; 因为应用程序产生的数据是在内存中的&#xff0c;如果程序退出或者是断电了&#xff0c;则数据就会消失。使用数据库是为了…

CVPR上新 | 从新视角合成、视频编解码器、人体姿态估计,到文本布局分析,微软亚洲研究院精选论文

编者按&#xff1a;欢迎阅读“科研上新”栏目&#xff01;“科研上新”汇聚了微软亚洲研究院最新的创新成果与科研动态。在这里&#xff0c;你可以快速浏览研究院的亮点资讯&#xff0c;保持对前沿领域的敏锐嗅觉&#xff0c;同时也能找到先进实用的开源工具。 本周&#xff0…

DLS平台:美联储松绑预期升温,金价飙升至2365美元

摘要 美联储鹰派官员古尔斯比对降息态度有所松动&#xff0c;导致金价一度升至2365美元。市场对美联储未来的货币政策预期有所改变&#xff0c;黄金作为避险资产的吸引力增强。本文将详细分析美联储官员态度变化对金价的影响、当前市场对黄金的预期及其未来走势。 美联储官员态…