flink+kafka实现流数据处理学习

在应用系统的建设过程中,通常都会遇到需要实时处理数据的场景,处理实时数据的框架有很多,本文将以一个示例来介绍flink+kafka在流数据处理中的应用。

1、概念介绍

  • flink:是一个分布式、高可用、高可靠的大数据处理引擎,提供了一种高效、可靠、可扩展的方式来处理和分析实时数据。

  • kafka:是用于构建实时数据管道和流应用程序并具有横向扩展,容错,wicked fast(变态快)等优点的一种消息中间件。

  • flink-connector-kafka:是flink内置的kafka连接器,它允许Flink应用轻松地从Kafka中读取数据流(Source)或将数据流写入到Kafka(Sink)。

2、实现目标

本文主要从下面3个步骤完成流数据的处理:

  • flink作为kafka消费者,从kafka中消费数据并将消费到的数据转换为flink数据流;

  • flink对获取到的数据流进行计算、聚合等操作;

  • flink对处理之后的数据再次写入到kafka中,实现数据的流动。

3、实现步骤

  • 新建maven工程,将依赖添加到环境中

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.20.0</flink.version>
    <flink-kafka.version>3.3.0-1.20</flink-kafka.version>
  </properties>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>21</java.version>
    <flink.version>1.20.0</flink.version>
    <flink-kafka.version>3.3.0-1.20</flink-kafka.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>${flink-kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--    json处理    -->
    <dependency>
      <groupId>com.alibaba.fastjson2</groupId>
      <artifactId>fastjson2</artifactId>
      <version>2.0.53</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>com.google.code.findbugs:jsr305</exclude>
                </excludes>
              </artifactSet>
              <filters>
                <filter>
                  <!-- Do not copy the signatures in the META-INF folder.
                  Otherwise, this might cause SecurityExceptions when using the JAR. -->
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <!-- Replace this with the main class of your job -->
                  <mainClass>org.example.App</mainClass>
                </transformer>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  • kafka生产者负责模拟数据流生成

System.out.println("kafka生产者启动....当前时间为:" + LocalDateTime.now());
KafkaProducerStudy kafkaProducerStudy = new KafkaProducerStudy();
KafkaProducer<String, Object> kafkaProducer = kafkaProducerStudy.createKfkaProducer();
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, kafkaProducerStudy.setKafkaUserValue(i));
kafkaProducer.send(record);
Thread.sleep(1000);
}
kafkaProducer.commitTransaction();
kafkaProducer.close();
System.out.println("kafkaProducer关闭当前时间为:" + LocalDateTime.now());
  • flink从kafka中获取数据流

//构建kafkaSource数据源
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(kafka_server) //指定kafka服务
.setTopics(pub_topic)    //指定topic
.setGroupId(groupId)   //指定groupID
.setStartingOffsets(OffsetsInitializer.latest())    //指定消费数据起始的位置
.setValueOnlyDeserializer(new SimpleStringSchema())     //指定反序列化器
.build();
//kafkaSource能够通过指定不同策略的偏移量
//1、OffsetsInitializer.latest():一定从最早的位置开始消费
//2、OffsetsInitializer.latest():一定从最新的位置开始消费
//3、OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费
//4、OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取
//5、OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常
  • 基于flink基本算子对数据进行加工

map算子:对数据流一对一的加载计算,并返回一个新的对象

sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).print();
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":1,"value":3,"ts":1734832965640,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":3,"value":10,"ts":1734832967645,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":5,"value":2,"ts":1734832969653,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":7,"value":6,"ts":1734832971657,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
//{"id":9,"value":6,"ts":1734832973662,"source":"flink"}

filter算子:对数据流进行过滤,只返回为true的数据

sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer id = jsonObject.getInteger("id");
return id % 2 == 0;
}
}).print();
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}

flink将处理之后的数据再次写到kafka中,实现数据的流动

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(kafka_server)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sub_topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
processResult.sinkTo(sink);
  • kafka消费者订阅对应的topic

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "study02-ubuntu:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "iot1");
//        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String,Object> kafkaConsumer = new KafkaConsumer<>(properties);
TopicPartition p0 = new TopicPartition(topic, 0);
TopicPartition p1 = new TopicPartition(topic, 1);
kafkaConsumer.assign(Arrays.asList(p0,p1));
while (true) {
ConsumerRecords<String,Object> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, Object> record : records) {
//todo 处理消息
System.out.println(record.value());
}
}
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
flink接收kafka数据通过算子计算之后再次转发到kafka中完整代码示例:
package com.yanboot.flink.connector;

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KafkaStreamDataProcess {
private final static String kafka_server = "study02-ubuntu:9092";
private final static String pub_topic = "sunlei";
private final static String sub_topic = "sub_sunlei";
private final static String groupId = "kafka-demo";

public static void main(String[] args) throws Exception {
//设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定并行度
env.setParallelism(1);
//构建kafkaSource
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(kafka_server) //指定kafka服务
.setTopics(pub_topic)    //指定topic
.setGroupId(groupId)   //指定groupID
//OffsetsInitializer.latest():一定从最早的位置开始消费
//OffsetsInitializer.latest():一定从最新的位置开始消费
//OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费
//OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取
//OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常
.setStartingOffsets(OffsetsInitializer.latest())    //指定offset的位置
.setValueOnlyDeserializer(new SimpleStringSchema())     //指定反序列化器
.build();
DataStreamSource<String> sou = env.fromSource(kafkaSource, //指定数据源
WatermarkStrategy.noWatermarks(), //指定水位线
"flink kafka source");
SingleOutputStreamOperator<String> processResult = sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer id = jsonObject.getInteger("id");
return id % 2 == 0;
}
});
processResult.print();

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(kafka_server)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sub_topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

processResult.sinkTo(sink);

//启动作业
env.execute();

}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/943701.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

快速掌握Elasticsearch检索之二:滚动查询获取全量数据(golang)

Elasticsearch8.17.0在mac上的安装 Kibana8.17.0在mac上的安装 Elasticsearch检索方案之一&#xff1a;使用fromsize实现分页 1、滚动查询的使用场景 滚动查询区别于上一篇文章介绍的使用from、size分页检索&#xff0c;最大的特点是&#xff0c;它能够检索超过10000条外的…

StableAnimator模型的部署:复旦微软提出可实现高质量和高保真的ID一致性人类视频生成

文章目录 一、项目介绍二、项目部署模型的权重下载提取目标图像的关节点图像&#xff08;这个可以先不看先用官方提供的数据集进行生成&#xff09;提取人脸&#xff08;这个也可以先不看&#xff09;进行图片的生成 三、模型部署报错 一、项目介绍 由复旦、微软、虎牙、CMU的…

【深度学习】Java DL4J基于 CNN 构建车辆识别与跟踪模型

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

如何在短时间内读懂复杂的英文文献?

当我们拿起一篇文献开始阅读时&#xff0c;就像是打开了一扇通往未知世界的大门。但别急着一头扎进去&#xff0c;咱们得像个侦探一样&#xff0c;带着疑问去探险。毕竟&#xff0c;知识的海洋深不可测&#xff0c;不带点“装备”怎么行&#xff1f;今天就聊聊&#xff0c;平时…

uniapp中Nvue白屏问题 ReferenceError: require is not defined

uniapp控制台输出如下 exception function:createInstanceContext, exception:white screen cause create instanceContext failed,check js stack ->Uncaught ReferenceError: require is not defined 或者 exception function:createInstanceContext, exception:white s…

Elasticsearch:使用 Ollama 和 Go 开发 RAG 应用程序

作者&#xff1a;来自 Elastic Gustavo Llermaly 使用 Ollama 通过 Go 创建 RAG 应用程序来利用本地模型。 关于各种开放模型&#xff0c;有很多话要说。其中一些被称为 Mixtral 系列&#xff0c;各种规模都有&#xff0c;而一种可能不太为人所知的是 openbiollm&#xff0c;这…

SpringBoot(Ⅱ)——@SpringBootApplication注解+自动装配原理+约定大于配置

1. SpringBootApplication注解 SpringBootApplication标注在某个类上说明这个类是SpringBoot的主配置类&#xff0c;SpringBoot就通过运行这个类的main方法来启动SpringBoot应用&#xff1b; 并且Configuration注解中也有Component注解&#xff0c;所以这个主启动类/主配置类…

指针与数组:深入C语言的内存操作艺术

数组名的理解 在上⼀个章节我们在使⽤指针访问数组的内容时&#xff0c;有这样的代码&#xff1a; int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0]; 这⾥我们使⽤ &arr[0] 的⽅式拿到了数组…

Python的数字类型

python的数字类型包括&#xff1a;整数&#xff0c;浮点数&#xff0c;复数。 整数 python的整数没有长度限制&#xff0c;无限大&#xff0c;有无限的精度 python的整数除法&#xff0c;即便能整除&#xff0c;结果也是小数&#xff0c;小数 在python中用float类型表示&…

【连续学习之SS-IL算法】2021年CPVR会议论文Ss-il:Separated softmax for incremental learning

1 介绍 年份&#xff1a;2021 期刊&#xff1a; 2021CPVR Ahn H, Kwak J, Lim S, et al. Ss-il: Separated softmax for incremental learning[C]//Proceedings of the IEEE/CVF International conference on computer vision. 2021: 844-853. 本文提出的SS-IL&#xff08…

3.BMS系统原理图解读

一、BMS电池板 (1)电池的连接关系&#xff1a;串联 (2)采样控制点&#xff1a;CELL0 - CELL5 (3)端子P1和P3&#xff1a;BAT和BAT- (4)开关S1&#xff1a;控制充放电回路的机械开关 二、BMS控制板 (1)主控MCU 电源 复位 晶振 (2)LED指示灯&#xff1a;4电量指示 1调试指…

洛谷P5250 【深基17.例5】木材仓库(c嘎嘎)

题目链接&#xff1a;P5250 【深基17.例5】木材仓库 - 洛谷 | 计算机科学教育新生态 题目难度&#xff1a;普及/提高 解题心得:本题借鉴了大佬的做法&#xff08;因为没想多好的处理方法~~&#xff09;&#xff0c;本题可以用map&#xff0c;对于操作1&#xff0c;存的话直接另…

pyqt和pycharm环境搭建

安装 python安装&#xff1a; https://www.python.org/downloads/release/python-3913/ python3.9.13 64位(记得勾选Path环境变量) pycharm安装&#xff1a; https://www.jetbrains.com/pycharm/download/?sectionwindows community免费版 换源&#xff1a; pip config se…

ArcGIS Pro地形图四至角图经纬度标注与格网标注

今天来看看ArcGIS Pro 如何在地形图上设置四至角点的经纬度。方里网标注。如下图的地形图左下角经纬度标注。 如下图方里网的标注 如下为本期要介绍的例图&#xff0c;如下&#xff1a; 图片可点击放大 接下来我们来介绍一下 推荐学习&#xff1a;GIS入门模型构建器Arcpy批量…

深度学习与图像处理(国产深度学习框架——飞桨官方指定教材)

计算机视觉从小白到大师之路 《深度学习与图像处理&#xff08;PaddlePaddle版&#xff09;》这一本就够了 1.引言 随着人工智能技术的飞速发展&#xff0c;各行各业对深度学习、图像处理相关领域的人才需求日益迫切。本书旨在通过系统的理论讲解与丰富的实战案例&#xff0…

Bluetooth Spec【0】蓝牙核心架构

蓝牙核心系统由一个主机、一个主控制器和零个或多个辅助控制器组成蓝牙BR/ EDR核心系统的最小实现包括了由蓝牙规范定义的四个最低层和相关协议&#xff0c;以及一个公共服务层协议&#xff1b;服务发现协议&#xff08;SDP&#xff09;和总体配置文件要求在通用访问配置文件&a…

代码随想录Day51 99. 岛屿数量,99. 岛屿数量,100. 岛屿的最大面积。

1.岛屿数量深搜 卡码网题目链接&#xff08;ACM模式&#xff09;(opens new window) 题目描述&#xff1a; 给定一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的矩阵&#xff0c;你需要计算岛屿的数量。岛屿由水平方向或垂直方向上相邻的陆地连接…

【机器学习与数据挖掘实战】案例06:基于Apriori算法的餐饮企业菜品关联分析

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈机器学习与数据挖掘实战 ⌋ ⌋ ⌋ 机器学习是人工智能的一个分支,专注于让计算机系统通过数据学习和改进。它利用统计和计算方法,使模型能够从数据中自动提取特征并做出预测或决策。数据挖掘则是从大型数据集中发现模式、关联…

突破传统,探索单页网站的强大潜力!

单页网站简单、直接&#xff0c;而且设计通常令人惊叹&#xff0c;非常适合展示关键信息而不会让访问者不知所措。 然而&#xff0c;构建单页网站有其自身的挑战&#xff0c;尤其是在 SEO 方面。由于内容数量有限且针对特定关键字的页面较少&#xff0c;可能很难在 SERP 中进行…

攻防世界web新手第四题easyphp

<?php highlight_file(__FILE__); $key1 0; $key2 0;$a $_GET[a]; $b $_GET[b];if(isset($a) && intval($a) > 6000000 && strlen($a) < 3){if(isset($b) && 8b184b substr(md5($b),-6,6)){$key1 1;}else{die("Emmm...再想想&quo…