2024.1.11 Kafka 消息队列,shell命令,核心原理

目录

 一 . 消息队列

二. Kafka

三 . 启动命令

 四 . Kafka的Shell 命令

五 . Kafka的核心原理

1. Topic的分区和副本机制

2 . 消息存储机制 和 查询机制     

3. Kafka中生产者数据分发策略

 六 . Kafka 之所以具有高速的读写性能,主要有以下几个原因

七. 笔记 


 一 . 消息队列

应用场景:

        应用解耦合:类似单点故障

        异步处理: 减少处理时间

        限流削峰 : 不管流量多大,放到消息队列中,都是按照一定的节奏进行处理

        消息驱动的系统: 消息队列,消息生产者,消费者(负责对消息进行处理)

        消息(message): 指的是数据,只不过这个数据存在一定流动状态
        队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性

消息队列中两个角色:

        生产者producer:生产/发送消息到消息队列中

        消费者consumer: 从消息队列中获取消息

二. Kafka

1. 基本介绍

        kafka是一款消息队列的中间件产品;

        kafka特点:

                可靠性

                可扩展性

                耐用性

                性能

 2. kafka架构

        1. Kafka中集群节点叫broker ;

        2. 集群的节点与节点之间,没有主从之分 ; 

        3. 同一个分区的不同副本间中, 有主从关系 ,主是leader , 从是Follower

        4. 同一个Partitions分区可以设置多个副本 , 但是副本数量不能超过集群broker节点的个数

        5. zookeeper用来管理集群,以及管理元数据信息

        6. Topic 主题 ,是业务层面对消息进行分类的

三 . 启动命令

三台虚拟机启动Zookeeper

        cd /export/server/zookeeper/bin

        ./zkServer.sh start

node1脚本启动kafka

         cd /export/onekey

        ./start-kafka.sh

        ./stop-kafka.sh

 四 . Kafka的Shell 命令

1.  创建Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test02 --partitions 4 --replication-factor 2  

参数: 

        -- bootstrap-server: Kafka集群中broker连接信息

        -- create : 指定操作类型 .这里是新建Topic

        -- topic: 指定要新建的Topic名称

        -- partitions :设置Topic的分区数

        -- relication-factor :设置Topic分区的副本数

 2.  查看Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --list

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --list: 指定操作类型。这里是查看Kafka集群上所有可用的Topic列表

 3. 查看具体Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic test04
参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --describe: 指定操作类型。这里是查看具体Topic信息

 4. 模拟生产者Producer

./kafka-console-producer.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04
参数说明:
    --broker-list: Kafka集群中broker连接信息
    --topic: 指定要将消息发送到哪个具体的Topic

5. 模拟消费者 Consumer

 

./kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --topic: 指定要从哪个Topic中消费消息
    --from-beginning: 指定该参数以后,会从最旧的地方开始消费
    latest: 消费者(默认)从最新的地方开始消费
    --max-messages: 最多消费的条数。满足条数后,就会自动结束
    --group: 指定消费组名称。一个消费者只能属于一个消费组;一个消费组里面可以有多个消费者。同一个Topic中的同一条数据,只能被同一个消费组中的一个消费者所消费
    

6. 修改Topic

 

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 10

分区: 只能增大,不能减小。而且没有数量限制
副本: 既不能增大,也不能减小

减小分区:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 1

 修改副本数:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --replication-factor 2 --partitions 11

7. 删除Topic

 

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --delete --topic test01

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --delete: 指定操作类型。这里是删除Topic
    --topic: 指定要删除哪个Topic

 8. 查看消费组中有多少个消费者

./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_01 --members --describe

 

五 . Kafka的核心原理

1. Topic的分区和副本机制

        分区的作用:

                1- 避免单台服务器容量的限制

                2- 提升Topic的吞吐量

                3 - 分区数量不要超过Kafka集群中的broker节点个数的3倍

        副本的作用:

                1 - 提升数据安全性,但也会导致冗余过多

                2- 副本个数不能超过集群的broker节点个数,推荐副本1-3个

2 . 消息存储机制 和 查询机制     

        消息存储机制

1-xx.log和xx.index它们的作用是什么?
        答:
        xx.log: 称之为segment片段文件,也就是一个Partition分区的数据,会被分成多个segment(log)片段文件进行存储。
        xx.index: 称之为索引文件,该文件的作用是用来加快对xx.log文件内容检索的速度


2-xx.log和xx.index文件名称的意义?
        答: 这个数字是xx.log文件中第一条消息的offset(偏移量)


3-为什么一个Partition分区的数据要分成多个xx.log(segment片段文件)文件进行存储?
        答:
      1- 如果一个文件的数据量过大,打开和关闭文件都非常消耗资源
      2- 在一个大的文件中,检索内容也会非常消耗资源
      3- Kafka只是用来临时存储消息数据。会定时将过期数据删除。如果数据放在一个文件中,删除的效率低;

        如果数据分成了多个segment片段文件进行存储,删除的时候只需要判断segment文件最后修改时间,如果超过了保留时间,就直接将整个segment文件删除。该保留时间是通过server.properties文件中的log.retention.hours=168进行设置,默认保留168小时(7天)

        

        查询机制

查询步骤:
1- 首先先确定要读取哪个xx.log(segment片段)文件。368776该offset的消息在368769.log文件中
2- 查询xx.log对应的xx.index,查询该条消息的物理偏移量范围
3- 根据消息的物理偏移量范围去读取xx.log文件(底层是基于磁盘的顺序读取)
4- 最终就获取到了具体的消息内容

3. Kafka中生产者数据分发策略

分发策略如下这些:

  • 1- 随机分发策略:将消息发到到随机的某个分区上。Python支持,Java不支持

  • 2- 指定分区策略:将消息发到指定的分区上面。Python支持,Java支持

  • 3- Hash取模策略:对消息的key先取Hash值,再和分区数取模。Python支持,Java支持

  • 4- 轮询策略:在Kafka的2.4及以上版本,已经更名成粘性分发策略。Python不支持,Java支持

  • 5- 自定义分发策略:Python支持,Java支持

 六 . Kafka 之所以具有高速的读写性能,主要有以下几个原因

Kafka之所以具有高速的读写性能,主要有以下几个原因:

  1. 分布式架构:Kafka采用分布式架构,可以通过水平扩展来处理大规模的数据流。它将数据分成多个分区,并将这些分区分布在不同的节点上,实现了数据的并行处理和负载均衡,从而提高了读写性能。

  2. 零拷贝技术:Kafka使用零拷贝技术来减少数据在内存和磁盘之间的拷贝次数。它通过直接内存访问(DMA)技术,将数据从磁盘读取到内存或者从内存写入到磁盘,避免了数据的多次复制,减少了IO操作的开销,提高了读写性能。

  3. 批量写入和压缩:Kafka支持批量写入消息和消息的压缩。它可以将多个消息一次性写入到磁盘,减少了磁盘IO的次数,提高了写入性能。同时,Kafka还支持对消息进行压缩,减小了消息的存储空间,降低了网络传输的开销,进一步提高了读写性能。

  4. 高效的消息索引和存储结构:Kafka使用高效的消息索引和存储结构,例如日志结构和位移索引,可以快速地定位和检索消息。它采用追加写入的方式,顺序写入磁盘,减少了随机写入的开销,提高了读写性能。

综上所述,Kafka通过分布式架构、零拷贝技术、批量写入和压缩、高效的消息索引和存储结构等手段,实现了高速的读写性能,使其成为处理大规模数据流的理想选择。

七. 笔记 

count(1)会记null,

count(0)会记null,

count(*)会记null

 count(字段)不会记null

count (null)得到null

import os
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win
from pyspark.sql.types import StructType, IntegerType, StringType, StructField, FloatType

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder \
        .config('spark.sql.shuffle.partitions', 1) \
        .appName('new_sale') \
        .master('local[*]') \
        .getOrCreate()
    # 使用框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/311247.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

8年经验之谈 —— 服务端性能瓶颈定位思路总结!

01、软件性能测试目标 软件性能测试的目的主要有以下三点: 评价系统当前性能,判断系统是否满足预期的性能需求。 寻找软件系统可能存在的性能问题,定位性能瓶颈并解决问题。 判定软件系统的性能表现,预见系统负载压力&#xff…

Asynchronous FIFO and synchronous FIFO-翻译自外网

Synchronous FIFO 先进先出 (FIFO) 是一种非常流行且有用的设计块,用于模块之间的同步和握手机制。 FIFO 的深度: FIFO 中的槽数或行数称为 FIFO 的深度。 FIFO 的宽度:每个槽或行中可以存储的位数称为 FIFO 的宽度。 在同步 FIFO 中&…

特征工程-特征清洗

特征清洗 在进行玩特征理解后,我们大致理解了面对的数据中包含哪些内容。下一阶段,我么需要对数据中的内容进行进一步分析处理,针对不同数据进行清洗。数据清洗是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在…

基于css实现动画效果

介绍 本文将会基于css&#xff0c;实现各种动画效果&#xff0c;接下来会从简单几个例子入手。 案例 三颗球 <!DOCTYPE html> <html lang"en"><head><meta charset"utf-8" /><title>React App</title><style>…

软件测试|Python requests库的安装和使用指南

简介 requests库是Python中一款流行的HTTP请求库&#xff0c;用于简化HTTP请求的发送和处理&#xff0c;也是我们在使用Python做接口自动化测试时&#xff0c;最常用的第三方库。本文将介绍如何安装和使用requests库&#xff0c;以及一些常见的用例示例。 安装requests库 首…

Fenwick Tree——树状数组

问题陈述&#xff1a; 你得到一个长度为 N 的数组为 a0,a1,a2……an-1。处理以下类型的查询&#xff0c;一共有 Q 次查询。 0 p x : ap⬅ap x 1 l r : 打印 ai ( il 到 ir-1 的 ai 之和) 约束&#xff1a; 1 ≤ N,Q ≤ 500000 0 ≤ ai,x ≤ 1e9 0 ≤ p < N 0 ≤ li <…

算法训练营Day43(完全背包[组合排列])

完全背包理论 正序遍历&#xff0c;先背包先物品都可以&#xff0c; 正序遍历的话&#xff0c;之前的物品价值还在&#xff0c;可以用上。 物品和背包都是有前面推出来&#xff0c;都可以。 但是其他的非纯理论的完全背包问题就要看场景&#xff0c;确定先背包还是先物品了 //先…

Google Pixel 与 iPhone手机:哪个更好?

iPhone稳定可靠&#xff0c;Pixel性价比高且创新。两者各有千秋&#xff0c;满足不同需求 谷歌的 Pixel 手机是 Android 最接近 iPhone 的手机&#xff0c;也是真正原生的Android手机。在iPhone 15 Pro Max 与华为 Mate 60 Pro的比较中不难看出&#xff0c;iPhone依然有着极强…

SAP 获取物料/批次/订单的特性值(学习一)

1、事务码 MSC1N、MSC2N、MSC3N 2、常用表 MCH1、MCHA、AUSP、MCH*开头的几个 3、批次 1、创建批次 BAPI&#xff1a;BAPI_BATCH_CREATE 2、修改批次 BAPI&#xff1a;BAPI_BATCH_CHANGE 3、删除批次 BAPI&#xff1a;BAPI_BATCH_DELETE 4、获取批次明细 BAPI&…

vpp node 及 vpp 多线程

node 注册 node注册&#xff0c;即宏VLIB_REGISTER_NODE(x, ...)流程&#xff1a; 创建vlib_node_registration_t x&#xff1b;vlib_node_registration_t结构只是存放了用户提供的node相关信息。把x添加到全局变量vlib_global_main中的node_registrations链表中&#xff08;…

本地开发环境请求服务器接口跨域的问题(vue的问题)

上面的这个报错大家都不会陌生&#xff0c;报错是说没有访问权限&#xff08;跨域问题&#xff09;。本地开发项目请求服务器接口的时候&#xff0c;因为客户端的同源策略&#xff0c;导致了跨域的问题。下面先演示一个没有配置允许本地跨域的的情况&#xff1a; 可以看到&…

如何在数学建模竞赛中稳定拿奖

✅作者简介&#xff1a;人工智能专业本科在读&#xff0c;喜欢计算机与编程&#xff0c;写博客记录自己的学习历程。 &#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&…

[三星电子]算法题--两种颜色涂无向图(bfs)

题目 题目描述&#xff1a; 给一无向图中各个节点绘色&#xff0c;一共只有两种颜色&#xff0c;使其满足相邻节点颜色不同&#xff0c;并输出其中一种颜色的节点个数及序号&#xff1b;如果不满足&#xff0c;则输出-1。 示例&#xff1a; 第一行输入节点个数V和边数E&…

数字信号处理实验---Z变换及系统的零极点分析 Matlab代码

一&#xff0e;各种函数的用法 1.tf2zp函数&#xff1a;通常用于将传递函数&#xff08;Transfer Function&#xff09;转换为零极增益形式&#xff08;ZPK form&#xff09;&#xff0c;转换前G(s) num(s) / den(s)&#xff0c;转换后G(s) K * (s - z1) * (s - z2) * ... *…

freeRTOS总结(四)中断管理

1、什么是中断 打断CPU正常运行程序&#xff0c;转而处理紧急的事件&#xff08;中断服务函数&#xff09;。 中断执行机制3步 1、中断请求 2、响应中断 3、退出中断 2 中断优先级 cortex-M使用8位寄存器配置中断优先级 stm32只用到高4位 stm32优先级分为抢占优先级和子优先…

如何测量电源芯片的电压调整率?电源芯片检测系统助力测试

电源芯片电压调整率的测试方法 测试环境&#xff1a; 温度&#xff1a;252℃ 湿度&#xff1a;60%~70% 大气压强&#xff1a;86kPa~106kPa 测试工具&#xff1a;可调电源、可调电子负载、万用表 测试步骤&#xff1a; 1. 设置电子负载&#xff0c;使电源满载输出; 2. 调节电源芯…

LORA的基本原理

本文将介绍如下内容&#xff1a; 什么是Lora高效微调的基本原理LORA的实现方式LORA为何有效&#xff1f; 一、什么是LoRA LoRA 通常是指低秩分解&#xff08;Low-Rank Decomposition&#xff09;算法&#xff0c;是一种低资源微调大模型方法&#xff0c;论文如下: LoRA: Low…

深入理解计算机系统(1):开始

计算机系统是由硬件和系统软件组成的&#xff0c;它们共同工作来运行应用程序。虽然系统的具体实现方式随着时间不断变化&#xff0c;但是系统内在的概念却没有改变。所有计算机系统都有相似的硬件和软件组件&#xff0c;它们又执行着相似的功能。 计算机系统 信息就是位上下…

C++I/O流——(1)I/O流的概念

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 勤奋&#xff0c;机会&#xff0c;乐观…

Nginx配置反向代理实例二

Mac 安装Nginx教程 Nginx配置反向代理实例一 提醒一下&#xff1a;下面实例讲解是在Mac系统演示的&#xff1b; 反向代理实例二实现的效果 使用nginx 反向代理&#xff0c;根据访问的地址跳转到不同端口的服务中 nginx 监听端口为81&#xff1b; 访问地址1&#xff1a;http:/…