大数据学习之Kafka消息队列、Spark分布式计算框架一

Kafka消息队列

章节一.kafka入门

4.kafka入门_消息队列两种模式

5.kafka入门_架构相关名词

Kafka 入门 _ 架构相关名词
事件 记录了世界或您的业务中 发生了某事 的事实。在文档中
也称为记录或消息。当您向 Kafka 读取或写入数据时,您以事件的
形式执行此操作。从概念上讲,事件具有键、值、时间戳和可选的
元数据标头。这是一个示例事件:
事件键: 白富美
事件的值: 向土豪 支付了 520
事件时间戳: “yyyy 05 20 13:14”
生产者 是那些向 Kafka 发布(写入)事件的客户端应用程序。
消费者 是订阅(读取和处理)这些事件的那些客户端应用程
序。在 Kafka 中,生产者和消费者完全解耦并且彼此不可知,这是
实现 Kafka 众所周知的高可扩展性的关键设计元素。例如,生产者
永远不需要等待消费者。 Kafka 提供了各种 保证 ,例如一次性处理
事件的能力。
主题 :事件被组织并持久地存储在 主题 中。 Kafka 中的主题始
终是多生产者和多订阅者:一个主题可以 N(N>=0) 个向其写入事件
的生产者,以及订阅这些事件的 N(N>=0) 个消费者。主题中的事件
可以根据需要随时读取 —— 与传统的消息传递系统不同,事件在消
费后不会被删除。相反,您可以通过每个主题的配置设置来定义
Kafka 应该将您的事件保留多长时间,之后旧事件将被丢弃。 Kafka
的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非
常好的

6.kafka入门_基础架构

7.kafka入门_下载安装一

8.kafka入门_下载安装二

9.kafka入门_集群启停脚本

Kafka 入门 _ 集群启停脚本
[root@node2 opt] # vim /etc/profile
# kafka 的环境变量
export KAFKA_HOME = /opt/kafka
export PATH = $PATH : $KAFKA_HOME /bin
[root@node2 opt] # source /etc/profile
[root@node2 ~] # kafka-topics.sh --version
3 .0.1 (Commit:8e30984f43e64d8b)
kafka-server-start.sh -daemon
/opt/kafka/config/server.properties
[root@node2 opt] # jps
3248 QuorumPeerMain
3761 Jps
3736 Kafka
kafka-server-stop.sh
[root@node2 opt] # cd /root/
11 [root@node2 ~] # mkdir bin/
[root@node2 ~] # cd bin/
[root@node2 bin] # vim kafka.sh
#!/bin/bash
if [ $# -lt 1 ]
then
echo "Please input arg:[start/stop]"
exit
fi
case $1 in
start )
for i in node2 node3 node4
do
   
echo "--------start $i 's kafka--------"
   
ssh $i /opt/kafka/bin/kafka-server-start.sh
-daemon /opt/kafka/config/server.properties
done
;;
stop )
for i in node2 node3 node4
do
   
echo "--------stop $i 's kafka--------"
   
ssh $i /opt/kafka/bin/kafka-server-stop.sh
done
;;
*)
echo "Arg Error Please input arg:
[start/stop]"
exit
;;  
esac

参数 描述

--bootstrap-server
node3:9092
连接的 Kafka Broker 主机名称和端口号
--topic
<String: topic> 比如: topicA
操作的 topic 名称
--list
查看所有主题
--create
创建主题
--delete
删除主题
--alter
修改主题
--describe
查看主题详细描述
--partitions
<Integer: # of partitions>
设置分区数
--replication-factor
<Integer: replication factor>
设置分区副本
--config
<String: name=value>
更新系统默认的配置
--version
查看当前系统 kafka 的版本
添加可执行权限: [root@node2 bin]# chmod +x kafka.sh
启动测试: kafka.sh start 注意:提前启动 zk 集群。
关闭测试: kafka.sh stop

10.kafka入门_Topic命令行操作

11.kafka入门_消息发送和接收

章节二.生产者

12.生产者_发送数据原理剖析一

13.生产者_发送数据原理剖析二

14.生产者_同步发送数据一

15.生产者_同步发送数据二

node2 上开启 Kafka 消费者进行消费
7
运行 SyncCustomProducer
     
prop . put ( ProducerConfig . KEY_SERIALIZER_CL
ASS_CONFIG ,
StringSerializer . class . getName ());
     
prop . put ( ProducerConfig . VALUE_SERIALIZER_
CLASS_CONFIG ,
StringSerializer . class . getName ());
       
//TODO 3. 声明并实例化生产者对象
       
KafkaProducer < String , String >
producer =
           
new KafkaProducer < String ,
String > ( prop );
       
//TODO 4. 发送消息
       
for ( int i = 0 ; i < 5 ; i ++ ){
           
// 同步发送消息
           
producer . send ( new
ProducerRecord <>
( "topicA" , "sync_msg" + i )). get ();
      }
       
//TODO 5. 关闭生产者
       
producer . close ();
  }
}
[root@node2 ~] # kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
22 8
观察 node2 Kafka 消费者消费消息的情况
生产者 _ 异步发送数据
代码实现
1
创建类 UnSyncCustomProducer
2
编写代码
[root@node2 ~]# kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
sync_msg0
sync_msg1
sync_msg2
sync_msg3
sync_msg4

16.生产者_异步发送数据

17.生产者_异步回调发送数据

代码实现
1
创建类 UnSyncCallBackCustomProducer
2
编写代码
[root@node2 ~] # kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
unsync_msg0
unsync_msg1
unsync_msg2
unsync_msg3
unsync_msg4
package com . itbaizhan . kafka . producer ;
26 import
org . apache . kafka . clients . producer . * ;
import
org . apache . kafka . common . serialization . Stri
ngSerializer ;
import java . util . Properties ;
import
java . util . concurrent . ExecutionException ;
public class UnSyncCallBackCustomProducer
{
   
public static void main ( String [] args )
throws ExecutionException ,
InterruptedException {
       
//TODO 1. 声明并实例化 Kafka Producer
配置文件对象
       
Properties prop = new
Properties ();
       
//TODO 2. 为配置文件对象设置参数
       
// 2.1 配置 bootstrap_servers
     
prop . put ( ProducerConfig . BOOTSTRAP_SERVERS
_CONFIG , "node2:9092,node3:9092,node4:9092"
);
       
// 2.2 配置 key value 的序列化类
     
prop . put ( ProducerConfig . KEY_SERIALIZER_CL
ASS_CONFIG ,
StringSerializer . class . getName ());
27      
prop . put ( ProducerConfig . VALUE_SERIALIZER_
CLASS_CONFIG ,
StringSerializer . class . getName ());
       
//TODO 3. 声明并实例化生产者对象
       
KafkaProducer < String , String >
producer = new KafkaProducer < String ,
String > ( prop );
       
//TODO 4. 发送消息
       
for ( int i = 0 ; i < 5 ; i ++ ){
           
// 异步发送消息 不调用 get() 方法
           
producer . send ( new
ProducerRecord <> ( "topicA" , "unsync_msg" +
i ),
               
new Callback () {
                   
// 如下方法在生产者收到 acks
确认时异步调用
                   
@Override
                   
public void
onCompletion ( RecordMetadata
recordMetadata , Exception e ) {
                       
if ( e == null ){
                           
// 无异常信息,输
出主题和分区信息到控制台
                         
System . out . println ( "topic:" + recordMetadat
a . topic ()
                                 
+ ",partition:" + recordMetadata . partition ()
);
                      } else { // 打印异常信息
28 3
node2 上开启 Kafka 消费者进行消费
4
运行 UnSyncCallBackCustomProducer
5
观察 node2 Kafka 消费者消费消息的情况
6
控制台输出信息
                         
System . out . println ( e . getMessage ());
                      }
                  }
              });
           
Thread . sleep ( 5 );
      }
       
//TODO 5. 关闭生产者
       
producer . close ();
  }
}
[root@node2 ~] # kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
[root@node2 ~] # kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
unsync_msg0
unsync_msg1
unsync_msg2
unsync_msg3
unsync_msg4
29 生产者 _ 拦截器
拦截器 (Interceptor) kafka0.10.0.0 版本中引入的新功能,主
要用于实现 clients 端的定制化控制逻辑。它可以使得用户在消息发
送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比
如修改消息等。同时允许指定多个 Interceptor 按序作用于同一条消
息从而形成一个拦截器链( Interceptor Chain )。
自定义拦截器需要实现
org.apache.kafka.clients.producer.ProducerInterceptor 接口。
topic:topicA,partition:1
topic:topicA,partition:1
topic:topicA,partition:0
topic:topicA,partition:0
topic:topicA,partition:0

18.生产者_拦截器

19.生产者_拦截器二

20.生产者_消息序列化一

21.生产者_消息序列化二

添加依赖
  }
   
public void setName ( String name ) {
       
this . name = name ;
  }
   
public int getAge () {
       
return age ;
  }
   
public void setAge ( int age ) {
       
this . age = age ;
  }
   
public String getAddress () {
       
return address ;
  }
   
public void setAddress ( String address )
{
       
this . address = address ;
  }
}
39 3
编写自定义序列化类
<dependency>
<groupId> org.codehaus.jackson </groupId>
   
<artifactId> jackson-mapper
asl </artifactId>
   
<version> 1.9.13 </version>
</dependency>
package com . itbaizhan . kafka . producer ;
import
org . apache . kafka . common . serialization . Seri
alizer ;
import
org . codehaus . jackson . map . ObjectMapper ;
import java . io . IOException ;
import java . nio . charset . StandardCharsets ;
import java . util . Map ;
public class UserSerializer implements
Serializer < UserVo > {
   
private ObjectMapper objectMapper ;
   
@Override
   
public void configure ( Map < String , ?>
configs , boolean isKey ) {
       
objectMapper = new ObjectMapper ();
     
//Serializer.super.configure(configs,
isKey);
  }
40 4
编写生产者程序
   
@Override
   
public byte [] serialize ( String topic ,
UserVo data ) {
       
byte [] ret = null ;
       
try {
           
ret =
objectMapper . writeValueAsString ( data )
                 
. getBytes ( StandardCharsets . UTF_8 );
      } catch ( IOException e ) {
           
throw new
SerializationException ( "Error when
serializing UserVo to byte[],exception is
" + e . getMessage ());
      }
       
return ret ;
  }
   
@Override
   
public void close () {
       
objectMapper = null ;
       
//Serializer.super.close();
  }
}
package com . itbaizhan . kafka . producer ;
import
org . apache . kafka . clients . producer . * ;
41 import
org . apache . kafka . common . serialization . Stri
ngSerializer ;
import java . util . Properties ;
import
java . util . concurrent . ExecutionException ;
public class UserSerProducer {
   
public static void main ( String [] args )
throws ExecutionException ,
InterruptedException {
       
//TODO 1. 声明并实例化 Kafka Producer
配置文件对象
       
Properties prop = new
Properties ();
       
//TODO 2. 为配置文件对象设置参数
       
// 2.1 配置 bootstrap_servers
     
prop . put ( ProducerConfig . BOOTSTRAP_SERVERS
_CONFIG , "node2:9092,node3:9092,node4:9092"
);
       
// 2.2 配置 key value 的序列化类
     
prop . put ( ProducerConfig . KEY_SERIALIZER_CL
ASS_CONFIG ,
StringSerializer . class . getName ());
     
prop . put ( ProducerConfig . VALUE_SERIALIZER_
CLASS_CONFIG ,
UserSerializer . class . getName ());
42        
//TODO 3. 声明并实例化生产者对象 注意
value 的泛型类型
       
KafkaProducer < String , UserVo >
producer = new KafkaProducer < String ,
UserVo > ( prop );
       
//TODO 4. 发送消息
       
UserVo userVo = new
UserVo ( "tuhao" , 18 , " 北京 " );
       
producer . send ( new
ProducerRecord < String , UserVo > ( "topicA" ,
userVo ),
           
new Callback () {
               
// 如下方法在生产者收到 acks 确认
时异步调用
               
@Override
               
public void
onCompletion ( RecordMetadata
recordMetadata , Exception e ) {
                   
if ( e == null ){
                       
// 无异常信息,输出主题
和分区信息到控制台
                     
System . out . println ( "topic:" + recordMetadat
a . topic ()
                             
+ ",partition:" + recordMetadata . partition ()
);
                  } else { // 打印异常信息
                     
System . out . println ( e . getMessage ());
                  }
              }
43 5
node2 上开启 Kafka 消费者进行消费
6
运行 UserSerProducer
7
观察 node2 Kafka 消费者消费消息的情况
实时效果反馈
1. 关于 Kafka 生产者消息序列化的描述,正确的是:
A
默认提供了序列化类,如 BytesSerializer
IntegerSerializer StringSerializer 等。
B
自定义序列化类需要实现
org.apache.kafka.common.serialization.Serializer
C
生产者序列化机制使用起来比较简单,需要在构造 producer
对象之前指定参数 key.serializer value.serializer
          });
       
Thread . sleep ( 50 );
       
//TODO 5. 关闭生产者
       
producer . close ();
  }
}

22.生产者_分区的优势

23.生产者_分区策略

24.生产者_分区实战一

25.生产者_分区实战二

26.生产者_自定义分区机制一(250129更新)

27.生产者_自定义分区机制二

28.生产者_

29.生产者_

30.生产者_

31.生产者_

32.生产者_

章节三.BROKER

章节四.消费者

Spark分布式计算框架一

章节一.概述

章节二.运行模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/960966.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

爱书爱考平台说明

最近我开发了一个综合性的考试平台&#xff0c;内容包括但不限于职业资格证考试、成人教育、国家公务员考试等内容。目前1.0版本已经开发完成&#xff0c;其他的功能陆续完善中。 微信小程序搜索"爱书爱考" 微信小程序图标如下图: 目前维护了java相关的面试题的考题…

Docker/K8S

文章目录 项目地址一、Docker1.1 创建一个Node服务image1.2 volume1.3 网络1.4 docker compose 二、K8S2.1 集群组成2.2 Pod1. 如何使用Pod(1) 运行一个pod(2) 运行多个pod 2.3 pod的生命周期2.4 pod中的容器1. 容器的生命周期2. 生命周期的回调3. 容器重启策略4. 自定义容器启…

2023年吉林省职业院校技能大赛网络系统管理样题-网络配置(华三代码)

目录 附录1:拓扑图 附录2:地址规划表 1.S1 2.S3 3.S4 4.S5 5.S7 6.S8 7.S9 8.R1 9.R2 10.R3 11.EG1 12.EG2 13.AC1 14.AC2 附录1:拓扑图 编号 型号

HTML-新浪新闻-实现标题-排版

标题排版 图片标签&#xff1a;<img> src&#xff1a;指定图片的url&#xff08;绝对路径/相对路径&#xff09; width&#xff1a;图片的宽度&#xff08;像素/相对于父元素的百分比&#xff09; heigth&#xff1a;图片的高度&#xff08;像素/相对于父元素的百分比&a…

基于物联网的智能环境监测系统(论文+源码)

1系统的功能及方案设计 本课题为基于物联网的智能环境监测系统的设计与实现&#xff0c;整个系统采用stm32f103单片机作为主控制器&#xff0c;通过DHT11传感器实现智能环境监测系统温度和湿度的检测&#xff0c;通过MQ传感器实现CO2浓度检测&#xff0c;通过光照传感器实现光照…

全面解析文件上传下载删除漏洞:风险与应对

在数字化转型的时代&#xff0c;文件上传、下载与删除功能已经成为各类应用程序的标准配置&#xff0c;从日常办公使用的协同平台&#xff0c;到云端存储服务&#xff0c;再到社交网络应用&#xff0c;这些功能在给用户带来便捷体验、显著提升工作效率的同时&#xff0c;也隐藏…

1.2第1章DC/DC变换器的动态建模-1.2Buck-Boost 变换器的交流模型--电力电子系统建模及控制 (徐德鸿)--读书笔记

1.2 Buck-Boost 变换器的交流模型 Buck- Boost变换器是一种典型的DC/DC变换器&#xff0c;具有升压和降压功能其输出电压的极性与输入电压相反&#xff0c;见图1-4a。当电感L的电流i(t)连续时一个开关周期可以分为两个阶段。在阶段1&#xff0c;开关在位置1时&#xff0c;即&am…

06-AD向导自动创建P封装(以STM32-LQFP48格式为例)

自动向导创建封装 自动向导创建封装STM32-LQFP48Pin封装1.选则4排-LCC或者QUAD格式2.计算焊盘相定位长度3.设置默认引脚位置(芯片逆时针)4.特殊情况下:加额外的标记 其他问题测量距离:Ctrl M测量 && Ctrl C清除如何区分一脚和其他脚?芯片引脚是逆时针看的? 自动向导…

若依基本使用及改造记录

若依框架想必大家都了解得不少&#xff0c;不可否认这是一款及其简便易用的框架。 在某种情况下&#xff08;比如私活&#xff09;使用起来可谓是快得一匹。 在这里小兵结合自身实际使用情况&#xff0c;记录一下我对若依框架的使用和改造情况。 一、源码下载 前往码云进行…

面试经典150题——图

文章目录 1、岛屿数量1.1 题目链接1.2 题目描述1.3 解题代码1.4 解题思路 2、被围绕的区域2.1 题目链接2.2 题目描述2.3 解题代码2.4 解题思路 3、克隆图3.1 题目链接3.2 题目描述3.3 解题代码3.4 解题思路 4、除法求值4.1 题目链接4.2 题目描述4.3 解题代码4.4 解题思路 5、课…

基于SpringBoot电脑组装系统平台系统功能实现六

一、前言介绍&#xff1a; 1.1 项目摘要 随着科技的进步&#xff0c;计算机硬件技术日新月异&#xff0c;包括处理器&#xff08;CPU&#xff09;、主板、内存、显卡等关键部件的性能不断提升&#xff0c;为电脑组装提供了更多的选择和可能性。不同的硬件组合可以构建出不同类…

万字长文总结前端开发知识---JavaScriptVue3Axios

JavaScript学习目录 一、JavaScript1. 引入方式1.1 内部脚本 (Inline Script)1.2 外部脚本 (External Script) 2. 基础语法2.1 声明变量2.2 声明常量2.3 输出信息 3. 数据类型3.1 基本数据类型3.2 模板字符串 4. 函数4.1 具名函数 (Named Function)4.2 匿名函数 (Anonymous Fun…

MySQL(单表访问)

今天是新年&#xff0c;祝大家新年快乐&#xff0c;但是生活还是得继续。 后面也会持续更新&#xff0c;学到新东西会在其中补充。 建议按顺序食用&#xff0c;欢迎批评或者交流&#xff01; 缺什么东西欢迎评论&#xff01;我都会及时修改的&#xff01; 大部分截图和文章采…

HarmonyOS简介:应用开发的机遇、挑战和趋势

问题 更多的智能设备并没有带来更好的全场景体验 连接步骤复杂数据难以互通生态无法共享能力难以协同 主要挑战 针对不同设备上的不同操作系统&#xff0c;重复开发&#xff0c;维护多套版本 多种语言栈&#xff0c;对人员技能要求高 多种开发框架&#xff0c;不同的编程…

【Elasticsearch】Elasticsearch的查询

Elasticsearch的查询 DSL查询基础语句叶子查询全文检索查询matchmulti_match 精确查询termrange 复合查询算分函数查询bool查询 排序分页基础分页深度分页 高亮高亮原理实现高亮 RestClient查询基础查询叶子查询复合查询排序和分页高亮 数据聚合DSL实现聚合Bucket聚合带条件聚合…

使用 KNN 搜索和 CLIP 嵌入构建多模态图像检索系统

作者&#xff1a;来自 Elastic James Gallagher 了解如何使用 Roboflow Inference 和 Elasticsearch 构建强大的语义图像搜索引擎。 在本指南中&#xff0c;我们将介绍如何使用 Elasticsearch 中的 KNN 聚类和使用计算机视觉推理服务器 Roboflow Inference 计算的 CLIP 嵌入构建…

LogicFlow 一款流程图编辑框架

LogicFlow是什么 LogicFlow是一款流程图编辑框架&#xff0c;提供了一系列流程图交互、编辑所必需的功能和灵活的节点自定义、插件等拓展机制。LogicFlow支持前端自定义开发各种逻辑编排场景&#xff0c;如流程图、ER图、BPMN流程等。在工作审批流配置、机器人逻辑编排、无代码…

几种K8s运维管理平台对比说明

目录 深入体验**结论**对比分析表格**1. 功能对比****2. 用户界面****3. 多租户支持****4. DevOps支持** 细对比分析1. **Kuboard**2. **xkube**3. **KubeSphere**4. **Dashboard****对比总结** 深入体验 KuboardxkubeKubeSphereDashboard 结论 如果您需要一个功能全面且适合…

《HelloGitHub》第 106 期

兴趣是最好的老师&#xff0c;HelloGitHub 让你对编程感兴趣&#xff01; 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等&#xff0c;涵盖多种编程语言 Python、…

R语言学习笔记之高效数据操作

一、概要 数据操作是R语言的一大优势&#xff0c;用户可以利用基本包或者拓展包在R语言中进行复杂的数据操作&#xff0c;包括排序、更新、分组汇总等。R数据操作包&#xff1a;data.table和tidyfst两个扩展包。 data.table是当前R中处理数据最快的工具&#xff0c;可以实现快…