Flink+Kafka消费

引入jar

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>1.8.0</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.11</artifactId>
	<version>1.8.0</version>
</dependency>
<!-- flink整合kafka_2.11 -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.11</artifactId>
	<version>1.10.0</version>
</dependency>

二、处理逻辑

//2、定义环境 => Env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(9);
env.enableCheckpointing(1000);

FlinkKafkaConsumer<String> consumer = this.getConsumer();//调用下面的方法获取数据源
consumer.setStartFromLatest();//消费最新数据

//2、绑定数据源=> resource
DataStream<String> stream = env.addSource(consumer);


//3、批量读取的方法=>
stream.timeWindowAll(Time.milliseconds(500)) //timeWindowAll:时间滚动窗口,滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠
		.apply(new ReadKafkaFlinkWindowFunction())//使用自己定义的apply来收集
		.addSink(new KafkaBatchSink());//批量的sink方法
env.execute();

2、定义消费者,并且将消费者consumer转成FlinkKafkaConsumer

public FlinkKafkaConsumer<String> getConsumer(){
	//定义消费者信息
	Properties properties = new Properties();
	properties.put("bootstrap.servers", "192.168.131.147:9092");
	properties.put("group.id", "flink-consumer-kafka-group");
	properties.put("auto.offset.reset", "latest");
	properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

	FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), properties);
	return consumer;
}

3、收集数据ReadKafkaFlinkWindowFunction的实现类

4、KafkaBatchSink的实现逻辑

总结

分布式处理引擎Flink使用至少一个【job】调度和至少一个【task】实现分布式处理

有界:就是指flink【消费指定范围内】的数据。例如我定义某个作业间隔时间为0.5秒,则flink已0.5秒为界,进行数据处理。有界数据用在离线数据的处理场景较多

无界:就是指flink始终【监听数据源】里的数据,获取到就处理。无界数据往往用在【实时数据】处理下的场景较多。

我这里结合我们项目的场景来给各位说一下该选那种处理。我们的场景为:

1:尽量支持最多的数据落地
2:数据必须要准确。所以我们最终了有界处理,将flink的界限设置为0.5秒,0.5秒内收集的所有数据整体使用一个算
子消费。保证数据的准确和消费高效性。

1、一定要有抛出异常的机制

我们都知道抛出异常会终止消费,但是为什么要抛出异常呢?这注意是因为如果用户不抛出异常的话,flink会认为当前的数据时正常消费的,这就造成了我们的kafka数据误消费

2、关于并行度parallelism

并行度的配置都是setParallelism,对于env和stream来说,stream的优先级比env高

3、关于checkpoint

我们如果定义程序运行在SPring Boot时,一定要配置检查点这个是flink实现容错的核心配置!

4、关于并行度

我们在设置并行度的时候,将里边的数字设置为多少,最终就会有多少个线程来执行任务。
所以大家一定要清楚对于数据准确性高的数据来说,宁愿牺牲多线程带来的效率提升也要只设置一个线程来执行消费。
可能大家没有注意,如果你不设置flink的并行度为1时。它是以的是系统的线程数来作为并行度!这样顺序是会乱的。

5、saveBatch很好

但是我建议你先封装一下或者改为批量的保存。可能大家都知道或者说都用过mybatis plus的saveBatch,它能将一个列表的inseert封装为一条sql(insert into a values(a1),(a2),(a3),但是我们一条sql的长度过长的话会存在性能问题。建议在批量处理的时候每隔1000条记录saveBatch一次

为什么flink消费kafka比官方的listener都要快

1、并行度和分区处理: Flink 具有高度的并行度支持

可以为每个 Kafka 分区创建独立的消费者实例,以便并行地处理多个分区。这使得 Flink能够更有效地利用资源,并提高整体的消费速度。相比之下,一些官方 Kafka Consumer 实现可能没有明确的并行度配置或并行处理策略。

2、事件时间处理

Flink 强调【事件时间】处理,支持按照事件的实际发生时间进行【有序处理】。这对于一些需要处理时间相关业务逻辑的应用来说很重要。Flink 可以轻松处理乱序事件,并确保事件按照正确的顺序进行处理。官方 Kafka Consumer 也提供了类似的功能,但 Flink在这方面的设计更加深入和专注。

状态管理

Flink 提供了强大的状态管理机制,对于需要在处理过程中保持状态的任务,这一点非常重要。在消费 Kafka消息时,可能需要追踪某些状态,例如记录已处理的偏移量。Flink 的状态管理可以更好地支持这种场景,而官方 Kafka Consumer可能没有提供类似的状态管理机制。

异步处理模型:

Flink 使用异步处理模型,这使得在处理一条消息时,可以同时进行其他处理而无需等待。这有助于提高整体的处理效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/248301.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp播放 m3u8格式视频 兼容pc和移动端

支持全自动播放、设置参数 自己摸索出来的,花了一天时间,给点订阅支持下,订阅后,不懂的地方可以私聊我。 代码实现 代码实现 1.安装dplayer组件 npm i dplayer2. static/index.html下引入 hls 引入hls.min.js 可以存放在static项目hls下面<script src="/static…

如何连接到 Azure SQL 数据库(下)

在《如何连接到 Azure SQL 数据库&#xff08;上&#xff09;》中&#xff0c;我们已经了解到了以下内容↓↓↓ 开始之前&#xff1a;Azure 连接凭据和防火墙 如何检索 Azure 连接凭据如何配置服务器防火墙使用 SQL Server Management Studio 连接到 Azure使用 dbForge Studio…

最大子数组和java实现【动态规划基础练习】

12.15 最大子数组和 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 示例 1&#xff1a; 输入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4]…

在 Windows PC 上轻松下载并安装 FFmpeg

FFmpeg 是一种开源媒体工具&#xff0c;可用于将任何视频格式转换为您需要的格式。该工具只是命令行&#xff0c;因此它没有图形、可点击的界面。如果您习惯使用常规图形 Windows 程序&#xff0c;安装 FFmpeg 一开始可能看起来很复杂&#xff0c;但不用担心&#xff0c;它;很简…

八大排序(插入排序 | 选择排序 | 冒泡排序)

在我们内存中我们一般会有一些没有顺序的数据&#xff0c;我们成为内排序&#xff0c;而今天分享八大排序的是时间复杂度为O&#xff08;N^2&#xff09;的插入排序&#xff0c;选择排序和教学意义比较强的冒泡排序。 插入排序 这是插入排序的动图&#xff0c;通过动图我们也…

01-EEA电子电器架构

1.背景 汽车正在从传统的机械装置逐步电气化&#xff0c;汽车电子电气功能不断的丰富。越来越多的电气系统和功能被集成到汽车上&#xff0c;传统的原理及线束设计已经远远不能满足。为此&#xff0c;EEA(电子电气架构)应运而生。如何设计电子电气架构&#xff0c;满足日益增长…

jenkins学习19 - pipline 构建项目生成 allure报告并发送邮箱

前言 个人其实一直的不太喜欢用邮箱发送报告&#xff0c;测试报告用邮件通知这都是五六年前的事情了&#xff0c;但有部分小伙伴依然执着于发邮件报告通知。 这里整理了下发邮箱通知的教程。 配置你的邮箱 配置邮箱这一步最繁琐&#xff0c;由于每个人使用的邮箱不一样&…

十六、YARN和MapReduce配置

1、部署前提 &#xff08;1&#xff09;配置前提 已经配置好Hadoop集群。 配置内容&#xff1a; &#xff08;2&#xff09;部署说明 &#xff08;3&#xff09;集群规划 2、修改配置文件 MapReduce &#xff08;1&#xff09;修改mapred-env.sh配置文件 export JAVA_HOM…

Python多态原理及实现

对于弱类型的语言来说&#xff0c;变量并没有声明类型&#xff0c;因此同一个变量完全可以在不同的时间引用不同的对象。当同一个变量在调用同一个方法时&#xff0c;完全可能呈现出多种行为&#xff08;具体呈现出哪种行为由该变量所引用的对象来决定&#xff09;&#xff0c;…

Star 4.1k!Gitee GVP开源项目!新一代桌面应用开发框架 ElectronEgg!

前言 随着现代技术的快速升级迭代及发展&#xff0c;桌面应用开发已经变得越来越普及。然而对于非专业桌面应用开发工程师在面对这项任务时&#xff0c;可能会感到无从下手&#xff0c;甚至觉得这是一项困难的挑战。 本篇文章将分享一种新型桌面应用开发框架 ElectronEgg&…

机器学习支持向量机(SVM)

svm与logstic异同 svm支持向量机&#xff0c;因其英文名为support vector machine&#xff0c;故一般简称SVM&#xff0c;通俗来讲&#xff0c;它是一种二类分类模型&#xff0c;其基本模型定义为特征空间上的间隔最大的线性分类器&#xff0c;其学习策略便是间隔最大化&#x…

探索多功能SQL数据库编辑器 - Richardson Software RazorSQL

在当今数字化时代&#xff0c;SQL数据库的管理和编辑是许多企业和开发人员必不可少的任务。为了提高生产力和简化数据库操作&#xff0c;Richardson Software推出了一款强大而多功能的SQL数据库编辑器 - RazorSQL。 RazorSQL是一款功能全面的数据库管理工具&#xff0c;可适用…

ansible的基本使用

本章主要介绍在RHEL8中如何安装ansible 及 ansible 的基本使用。 ansible是如何工作的在 RHEL8中安装ansible编写ansible.cfg和清单文件ansible 的基本用法 如果管理的服务器很多&#xff0c;如几十台甚至几百台&#xff0c;那么就需要一个自动化管理工具了&#xff0c; ansi…

使用opencv的Laplacian算子实现图像边缘检测

1 边缘检测介绍 图像边缘检测技术是图像处理和计算机视觉等领域最基本的问题&#xff0c;也是经典的技术难题之一。如何快速、精确地提取图像边缘信息&#xff0c;一直是国内外的研究热点&#xff0c;同时边缘的检测也是图像处理中的一个难题。早期的经典算法包括边缘算子方法…

环境搭建及源码运行_java环境搭建_maven

1、介绍 1&#xff09;管理项目依赖和版本 统一的项目依赖和版本管理 2&#xff09;Maven支持多模块项目管理 通过定义父子模块的关系来管理多个子模块的构建和依赖关系。使用Maven可以实现多模块项目的统一管理和构建&#xff0c;从而提高项目的可维护性和可重用性。 3&#x…

初识Python解释器————解释器模式(后续更新...)

学习网页&#xff1a; Welcome to Python.orghttps://www.python.org/https://www.python.org/ Python解释器 Python解释器是用于执行Python代码的程序。Python解释器将Python代码转换为机器语言并执行它。 Python解释器有多种实现&#xff0c;包括CPython、IPython、Jython…

GPT 魔力涌现

GPT 二、Prompt 的典型构成 角色&#xff1a;给 AI 定义一个最匹配任务的角色&#xff0c;比如&#xff1a;「你是一位软件工程师」「你是一位小学老师」指示&#xff1a;对任务进行描述上下文&#xff1a;给出与任务相关的其它背景信息&#xff08;尤其在多轮交互中&#xff…

Feign调用服务报错:Load balancer does not have available server for client:xxx

1.说一下遇到的bug:,&#xff08;黑马程序员springcloud的第30集&#xff0c;基于Feign远程调用&#xff09; 3个服务正常启动&#xff1a; 访问http://localhost:8080/order/101 服务器报错日志&#xff1a;&#xff08;orderservice想调用userservice结果找不到userservice&…

【EMQX】通过EMQX webhook实现转发消息到Python web服务器

EMQX webhook消息转发Web服务器 一、前言二、实现1、EMQX服务器搭建EMQX下载、安装、启动 2、本地Web服务搭建创建Flask项目代码 3、EMQX中创建webhook数据桥接4、EMQX中创建数据转发规则 三、效果 一、前言 需求&#xff1a;获取设备通过mqtt协议发送过来的数据并将数据保存到…

cgal教程 3D Alpha Wrapping

文章目录 3D Alpha Wrapping (3D alpha 包裹)1 介绍2 方法2.1 算法2.2 保证 3 接口4 选择参数4.1 alpha4.2 Offset4.3 关于“双面”包裹的注意事项 5 性能6 例子 3D Alpha Wrapping (3D alpha 包裹) 原文地址: https://doc.cgal.org/latest/Alpha_wrap_3/index.html#Chapter_3D…