在应用系统的建设过程中,通常都会遇到需要实时处理数据的场景,处理实时数据的框架有很多,本文将以一个示例来介绍flink+kafka在流数据处理中的应用。
1、概念介绍
-
flink:是一个分布式、高可用、高可靠的大数据处理引擎,提供了一种高效、可靠、可扩展的方式来处理和分析实时数据。
-
kafka:是用于构建实时数据管道和流应用程序并具有横向扩展,容错,wicked fast(变态快)等优点的一种消息中间件。
-
flink-connector-kafka:是flink内置的kafka连接器,它允许Flink应用轻松地从Kafka中读取数据流(Source)或将数据流写入到Kafka(Sink)。
2、实现目标
本文主要从下面3个步骤完成流数据的处理:
-
flink作为kafka消费者,从kafka中消费数据并将消费到的数据转换为flink数据流;
-
flink对获取到的数据流进行计算、聚合等操作;
-
flink对处理之后的数据再次写入到kafka中,实现数据的流动。
3、实现步骤
-
新建maven工程,将依赖添加到环境中
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<flink-kafka.version>3.3.0-1.20</flink-kafka.version>
</properties>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>21</java.version>
<flink.version>1.20.0</flink.version>
<flink-kafka.version>3.3.0-1.20</flink-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- json处理 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.53</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- Replace this with the main class of your job -->
<mainClass>org.example.App</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
-
kafka生产者负责模拟数据流生成
System.out.println("kafka生产者启动....当前时间为:" + LocalDateTime.now());
KafkaProducerStudy kafkaProducerStudy = new KafkaProducerStudy();
KafkaProducer<String, Object> kafkaProducer = kafkaProducerStudy.createKfkaProducer();
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, kafkaProducerStudy.setKafkaUserValue(i));
kafkaProducer.send(record);
Thread.sleep(1000);
}
kafkaProducer.commitTransaction();
kafkaProducer.close();
System.out.println("kafkaProducer关闭当前时间为:" + LocalDateTime.now());
-
flink从kafka中获取数据流
//构建kafkaSource数据源
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(kafka_server) //指定kafka服务
.setTopics(pub_topic) //指定topic
.setGroupId(groupId) //指定groupID
.setStartingOffsets(OffsetsInitializer.latest()) //指定消费数据起始的位置
.setValueOnlyDeserializer(new SimpleStringSchema()) //指定反序列化器
.build();
//kafkaSource能够通过指定不同策略的偏移量
//1、OffsetsInitializer.latest():一定从最早的位置开始消费
//2、OffsetsInitializer.latest():一定从最新的位置开始消费
//3、OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费
//4、OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取
//5、OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常
-
基于flink基本算子对数据进行加工
map算子:对数据流一对一的加载计算,并返回一个新的对象
sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).print();
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":1,"value":3,"ts":1734832965640,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":3,"value":10,"ts":1734832967645,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":5,"value":2,"ts":1734832969653,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":7,"value":6,"ts":1734832971657,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
//{"id":9,"value":6,"ts":1734832973662,"source":"flink"}
filter算子:对数据流进行过滤,只返回为true的数据
sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer id = jsonObject.getInteger("id");
return id % 2 == 0;
}
}).print();
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
flink将处理之后的数据再次写到kafka中,实现数据的流动
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(kafka_server)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sub_topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
processResult.sinkTo(sink);
-
kafka消费者订阅对应的topic
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "study02-ubuntu:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "iot1");
// properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String,Object> kafkaConsumer = new KafkaConsumer<>(properties);
TopicPartition p0 = new TopicPartition(topic, 0);
TopicPartition p1 = new TopicPartition(topic, 1);
kafkaConsumer.assign(Arrays.asList(p0,p1));
while (true) {
ConsumerRecords<String,Object> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, Object> record : records) {
//todo 处理消息
System.out.println(record.value());
}
}
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
flink接收kafka数据通过算子计算之后再次转发到kafka中完整代码示例:
package com.yanboot.flink.connector;
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaStreamDataProcess {
private final static String kafka_server = "study02-ubuntu:9092";
private final static String pub_topic = "sunlei";
private final static String sub_topic = "sub_sunlei";
private final static String groupId = "kafka-demo";
public static void main(String[] args) throws Exception {
//设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定并行度
env.setParallelism(1);
//构建kafkaSource
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(kafka_server) //指定kafka服务
.setTopics(pub_topic) //指定topic
.setGroupId(groupId) //指定groupID
//OffsetsInitializer.latest():一定从最早的位置开始消费
//OffsetsInitializer.latest():一定从最新的位置开始消费
//OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费
//OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取
//OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常
.setStartingOffsets(OffsetsInitializer.latest()) //指定offset的位置
.setValueOnlyDeserializer(new SimpleStringSchema()) //指定反序列化器
.build();
DataStreamSource<String> sou = env.fromSource(kafkaSource, //指定数据源
WatermarkStrategy.noWatermarks(), //指定水位线
"flink kafka source");
SingleOutputStreamOperator<String> processResult = sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer id = jsonObject.getInteger("id");
return id % 2 == 0;
}
});
processResult.print();
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(kafka_server)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sub_topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
processResult.sinkTo(sink);
//启动作业
env.execute();
}
}