Flink Kafka[输入/输出] Connector

本章重点介绍生产环境中最常用到的Flink kafka connector。使用Flink的同学,一定会很熟悉kafka,它是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也经常会跟kafka进行一些数据的交换,比如利用kafka consumer读取数据,然后进行一系列的处理之后,再将结果写出到kafka中。这里会主要分两个部分进行介绍,一是Flink kafka Consumer,一个是Flink kafka Producer

Flink 输入输出至 Kafka案例

首先看一个例子来串联下Flink kafka connector。代码逻辑里主要是从 kafka里读数据,然后做简单的处理,再写回到kafka中。首先需要引入 flink-kafka相关的pom.xml依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>1.10.0</version>
</dependency>

分别从如何构造一个Source sinkFunctionFlink提供了现成的构造FlinkKafkaConsumerProducer的接口,可以直接使用。这里需要注意,因为kafka有多个版本,多个版本之间的接口协议会不同。Flink针对不同版本的kafka有相应的版本的ConsumerProducer。例如:针对 08091011版本,Flink对应的consumer分别是FlinkKafkaConsumer 0809010011producer也是。

 package com.zzx.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import scala.Tuple2;
import scala.tools.nsc.transform.patmat.Logic;

import java.util.Properties;

/**
 * @description: Flink 从kafka 中读取数据并写入kafka
 * @author: zzx
 * @createDate: 2020/7/22
 * @version: 1.0
 */
public class FlinkKafkaExample {
    public static void main(String[] args) throws Exception{
        //ParameterTool 从参数中读取数据
        final ParameterTool params = ParameterTool.fromArgs(args);

        //设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使参数在web界面中可用
        env.getConfig().setGlobalJobParameters(params);
        /**  TimeCharacteristic 中包含三种时间类型
         * @PublicEvolving
         * public enum TimeCharacteristic {
         * ​    //以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
         *     ProcessingTime,
         * ​    //以数据进入flink streaming data flow的时间为准
         *     IngestionTime,
         * ​    //以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
         *     EventTime
         * }
         */
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        /**
         * CheckpointingMode:    EXACTLY_ONCE(执行一次)  AT_LEAST_ONCE(至少一次)
         */
        env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);

        //------------------------------------------source start -----------------------------------
        String sourceTopic = "sensor";
        String bootstrapServers = "hadoop1:9092";
        // kafkaConsumer 需要的配置参数
        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上
        props.put("bootstrap.servers", bootstrapServers);
        // 制定consumer group
        props.put("group.id", "test");
        // 是否自动确认offset
        props.put("enable.auto.commit", "true");
        // 自动确认offset的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //从kafka读取数据,需要实现 SourceFunction 他给我们提供了一个
        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<String>(sourceTopic, new SimpleStringSchema(), props);
        //------------------------------------------source end -----------------------------------------

        //------------------------------------------sink start -----------------------------------
        String sinkTopic = "topic";
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<String>(sinkTopic, new SimpleStringSchema(), properties);
        //------------------------------------------sink end --------------------------------------

        //FlinkKafkaConsumer011 继承自 RichParallelSourceFunction
        env.addSource(consumer)
            .map(new MapFunction<String, Tuple2<Long,String>>(){
                @Override
                public Tuple2<Long, String> map(String s) throws Exception {
                    return new Tuple2<>(1L,s);
                }
            })
            .filter(k -> k != null)
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Long, String>>(Time.seconds(5)) {
                @Override
                public long extractTimestamp(Tuple2<Long, String> element) {
                    return element._1;
                }
            })
            .map(k ->k.toString())
            .addSink(producer);

        //执行
        env.execute("FlinkKafkaExample");
    }
}

如下创建代码中涉及的"sensor" Topic

[root@hadoop1 kafka_2.11-2.2.2]# bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --topic sensor --replication-factor 2 --partitions 4

Flink kafka Consumer

反序列化数据: 因为kafka中数据都是以二进制byte形式存储的。读到Flink系统中之后,需要将二进制数据转化为具体的javascala对象。具体需要实现一个schema类定义如何序列化和反序列数据。反序列化时需要实现DeserializationSchema
口,并重写deserialize(byte[] message)函数,如果是反序列化kafkakv的数据时,需要实现KeyedDeserializationSchema接口,并重写 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)函数。

另外Flink中也提供了一些常用的序列化反序列化的schema类。例如,SimpleStringSchema,按字符串方式进行序列化、反序列化。TypeInformationSerializationSchema,它可根据FlinkTypeInformation信息来推断出需要选择的schemaJsonDeserializationSchema使用 jackson反序列化 json格式消息,并返回ObjectNode,可以使用get(“property”)方法来访问相应字段。
[点击并拖拽以移动] ​

消费起始位置设置

如何设置作业消费kafka起始位置的数据,这一部分Flink也提供了非常好的封装。在构造好的FlinkKafkaConsumer类后面调用如下相应函数,设置合适的起始位置。
【1】setStartFromGroupOffsets,也是默认的策略,从group offset位置读取数据,group offset指的是kafka broker端记录的某个group的最后一次的消费位置。但是kafka broker端没有该group信息,会根据kafka的参数auto.offset.reset的设置来决定从哪个位置开始消费。
setStartFromEarliest,从kafka最早的位置开始读取。
setStartFromLatest,从kafka最新的位置开始读取。
setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取。Kafka时间戳,是指kafka为每条消息增加另一个时戳。该时戳可以表示消息在proudcer端生成时的时间、或进入到kafka broker时的时间。
setStartFromSpecificOffsets,从指定分区的offset位置开始读取,如指定的offsets中不存某个分区,该分区从group offset位置开始读取。此时需要用户给定一个具体的分区、offset的集合。

一些具体的使用方法可以参考下图。需要注意的是,因为Flink框架有容错机制,如果作业故障,如果作业开启checkpoint,会从上一次 checkpoint状态开始恢复。或者在停止作业的时候主动做savepoint,启动作业时从savepoint开始恢复。这两种情况下恢复作业时,作业消费起始位置是从之前保存的状态中恢复,与上面提到跟kafka这些单独的配置无关。
[点击并拖拽以移动] ​

topic 和 partition 动态发现

实际的生产环境中可能有这样一些需求:
场景一,有一个Flink作业需要将五份数据聚合到一起,五份数据对应五个kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的topic
场景二,作业从一个固定的kafka topic读数据,开始该topic10partition,但随着业务的增长数据量变大,需要对kafka partition个数进行扩容,由10个扩容到20。该情况下如何在不重启作业情况下动态感知新扩容的partition
针对上面的两种场景,首先需要在构建FlinkKafkaConsumer时的properties中设置flink.partition-discovery.interval-millis参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时FlinkKafkaConsumer内部会启动一个单独的线程定期去kafka获取最新的meta信息。针对场景一,还需在构建FlinkKafkaConsumer时,topic的描述可以传一个正则表达式(如下图所示)描述的pattern。每次获取最新kafka meta时获取正则匹配的最新topic列表。针对场景二,设置前面的动态发现参数,在定期获取kafka最新meta信息时会匹配新的partition。为了保证数据的正确性,新发现的partition从最早的位置开始读取。
[点击并拖拽以移动] ​

commit offset 方式

Flink kafka consumer commit offset方式需要区分是否开启了checkpoint。如果checkpoint关闭,commit offset要依赖于kafka客户端的auto commit。 需设置enable.auto.commitauto.commit.interval.ms参数到consumer properties,就会按固定的时间间隔定期auto commit offsetkafka如果开启checkpoint,这个时候作业消费的offsetFlink会在state中自己管理和容错。此时提交offsetkafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和lag情况。此时需要setCommitOffsetsOnCheckpointstrue来设置当checkpoint成功时提交offsetkafka。此时commit offset的间隔就取决于checkpoint的间隔,所以此时从kafka一侧看到的lag可能并非完全实时,如果checkpoint间隔比较长lag曲线可能会是一个锯齿状。
[点击并拖拽以移动] ​

Timestamp Extraction/Watermark 生成

我们知道当Flink作业内使用EventTime属性时,需要指定从消息中提取时间戳和生成水位的函数。FlinkKakfaConsumer构造的source后直接调用assignTimestampsAndWatermarks函数设置水位生成器的好处是此时是每个partition一个watermark assigner,如下图。source生成的时戳为多个partition时戳对齐后的最小时戳。此时在一个source读取多个partition,并且partition之间数据时戳有一定差距的情况下,因为在 sourcewatermarkpartition级别有对齐,不会导致数据读取较慢partition数据丢失。
[点击并拖拽以移动] ​

Flink kafka Producer

【1】Producer分区: 使用FlinkKafkaProducerkafka中写数据时,如果不单独设置partition策略,会默认使用FlinkFixedPartitioner,该 partitioner分区的方式是task所在的并发idtopicpartition数取余:parallelInstanceId % partitions.length
○ 此时如果sink4paritition1,则4task往同一个partition中写数据。但当sink task < partition个数时会有部分partition没有数据写入,例如sink task2partition总数为4,则后面两个partition将没有数据写入。
○ 如果构建FlinkKafkaProducer时,partition设置为null,此时会使用kafka producer默认分区方式,非key写入的情况下,使用round-robin的方式进行分区,每个task都会轮循的写下游的所有partition。该方式下游的partition数据会比较均衡,但是缺点是partition个数过多的情况下需要维持过多的网络连接,即每个task都会维持跟所有partition所在broker的连接。
[点击并拖拽以移动] ​

容错

Flink kafka 09010版本下,通过setLogFailuresOnlyfalsesetFlushOnCheckpointtrue, 能达到at-least-once语义。setLogFailuresOnly默认为false,是控制写kafka失败时,是否只打印失败的log不抛异常让作业停止。setFlushOnCheckpoint,默认为true,是控制是否在 checkpointfluse数据到kafka,保证数据已经写到kafka。否则数据有可能还缓存在kafka客户端的buffer中,并没有真正写出到kafka,此时作业挂掉数据即丢失,不能做到至少一次的语义。
Flink kafka 011版本下,通过两阶段提交的sink结合kafka事务的功能,可以保证端到端精准一次。
[点击并拖拽以移动] ​

疑问与解答

【问题一】:Flink consumer的并行度的设置:是对应topicpartitions个数吗?要是有多个主题数据源,并行度是设置成总体的 partitions数吗?
【解答】: 这个并不是绝对的,跟topic的数据量也有关,如果数据量不大,也可以设置小于partitions个数的并发数。但不要设置并发数大于partitions总数,因为这种情况下某些并发因为分配不到partition导致没有数据处理。
【问题二】: 如果partitionernull的时候是round-robin发到每一个partition ?如果有key的时候行为是kafka那种按照key分布到具体分区的行为吗?
【解答】: 如果在构造FlinkKafkaProducer时,如果没有设置单独的partitioner,则默认使用FlinkFixedPartitioner,此时无论是带key的数据,还是不带key。如果主动设置partitionernull时,不带key的数据会round-robin轮询的方式写出到partition,带key的数据会根据key,相同key数据分区的相同的partition
【问题三】: 如果checkpoint时间过长,offset未提交到kafka,此时节点宕机了,重启之后的重复消费如何保证呢?
【解答】: 首先开启checkpointoffsetFlink通过状态state管理和恢复的,并不是从kafkaoffset位置恢复。在checkpoint机制下,作业从最近一次checkpoint恢复,本身是会回放部分历史数据,导致部分数据重复消费,Flink引擎仅保证计算状态的精准一次,要想做到端到端精准一次需要依赖一些幂等的存储系统或者事务操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/276122.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录刷题题Day24

刷题的第二十四天&#xff0c;希望自己能够不断坚持下去&#xff0c;迎来蜕变。&#x1f600;&#x1f600;&#x1f600; 刷题语言&#xff1a;C Day24 任务 ● 491.递增子序列 ● 46.全排列 ● 47.全排列 II 1 递增子序列 491.递增子序列 思路&#xff1a; 本题求自增子序…

Translation翻译插件

Translation插件是为IntelliJ IDEA开发的&#xff0c;因此只能在IntelliJ IDEA中使用。但是&#xff0c;如果你需要在其他软件中进行翻译&#xff0c;可以考虑使用其他的翻译工具或服务。例如&#xff0c;一些在线翻译网站&#xff08;如Google翻译、百度翻译等&#xff09;提供…

由浅入深走进Python异步编程【协程与yield】(含代码实例讲解 || 迭代器、生成器、协程、yield from)

写在前面 从底层到第三方库&#xff0c;全面讲解python的异步编程。这节讲述的是python异步编程的底层原理第一节&#xff0c;详细了解需要配合下一节观看哦。纯干货&#xff0c;无概念&#xff0c;代码实例讲解。 本系列有6章左右&#xff0c;点击头像或者专栏查看更多内容&…

C++学习实践(一)高频面试问题总结(附详细答案)

文章目录 一、基础常见面试题1、数组和链表区别2、深拷贝和浅拷贝相关问题的区别3、a和a区别4、c内存模型5、四种强制转换和应用场景 二、指针相关1、指针和引用的区别2、函数指针和指针函数3、传指针、引用和值4、常量指针和指针常量5、野指针6、智能指针的用法 三、关键字作用…

YOLOv8可视化:引入多种可视化CAM方法,为科研保驾护航

💡💡💡本文内容:调用pytorch下的CAM可视化库,支持十多种可视化方法,打开“黑盒”,让YOLOv8变得相对可解释性 收录 YOLOv8原创自研 https://blog.csdn.net/m0_63774211/category_12511737.html?spm=1001.2014.3001.5482 💡💡💡全网独家首发创新(原创),适…

桥接模式-举例

概叙&#xff1a;桥接模式用一种巧妙的方式处理多层继承存在的问题&#xff0c; 用抽象关联取代了传统的多层继承&#xff0c; 将类之间的静态继承关系转换为动态的对象组合关系&#xff0c; 使得系统更加灵活&#xff0c;并易于扩展&#xff0c; 同时有效控制了系统中类的个数…

企业如何购买腾讯云服务器?(详细指南)

腾讯云服务器购买流程直接在官方秒杀活动上购买比较划算&#xff0c;在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵&#xff0c;但是自定义购买云服务器CPU内存带宽配置选择范围广&#xff0c;活动上购买只能选择固定的活动机&#xff0c;选择范围窄&#xff0c;但是…

专题四:前缀和

前缀和 一.一维前缀和(模板)&#xff1a;1.思路一&#xff1a;暴力解法2.思路二&#xff1a;前缀和思路 二. 二维前缀和(模板)&#xff1a;1.思路一&#xff1a;构造前缀和数组 三.寻找数组的中心下标&#xff1a;1.思路一&#xff1a;前缀和 四.除自身以外数组的乘积&#xff…

php最常出现的错误

目录 1. E_WARNING&#xff1a;为 foreach &#xff08;&#xff09;提供的参数无效 2. PDOException&#xff1a;拒绝SQLSTATEHY000连接 3.错误使用empty函数 1. E_WARNING&#xff1a;为 foreach &#xff08;&#xff09;提供的参数无效 PHP foreach构造在PHP 4中引入&am…

web3方向产品调研

每次互联网形态的改变&#xff0c;都会对世界产生很大的影响&#xff0c;上一次对社会产生重大影响的互联网形态&#xff08;Web2.0&#xff09;催生了一批改变人类生活和信息交互方式的企业。 目录 概述DAO是什么&#xff1f;为什么我们需要DAO? 金融服务金融桥接及周边服务D…

Unity中Shader 齐次坐标

文章目录 前言一、什么是齐次坐标二、齐次坐标增加分量 w 的意义1、当 w ≠ \neq  0时&#xff1a;2、当 w 0时&#xff1a;3、用方程组&#xff0c;直观的看一下w的意义 前言 在之前的文章中&#xff0c;我们进行了正交相机视图空间转化到裁剪空间的推导。 Unity中Shade…

3DMAX 中的 VR 渲染器如何设置局部区域渲染?

3DMAX 中的 VR 渲染器如何设置局部渲染&#xff1f; 首先我们要得打开渲染设置&#xff0c;在3damx里按F10&#xff0c;调出渲染设置。选定渲染器为Vary渲染器&#xff1a; 设置VR的局部渲染&#xff0c;需要打开帧缓冲&#xff0c;我们在V-ary项下&#xff0c;打开帧缓冲(点击…

腾讯云服务器怎么买划算?最新优惠价格表

2023腾讯云轻量应用服务器优惠价格表&#xff0c;12月最新报价&#xff0c;腾讯云轻量2核2G3M带宽62元一年、2核2G4M轻量服务器118元一年&#xff0c;540元三年、2核4G5M带宽218元一年&#xff0c;756元三年、4核8G12M轻量服务器646元15个月&#xff0c;CVM云服务器S5实例2核2G…

六、Redis 分布式系统

六、Redis 分布式系统 六、Redis 分布式系统6.1 数据分区算法6.1.1 顺序分区6.1.2 哈希分区 6.2 系统搭建与运行6.2.1 系统搭建6.2.2 系统启动与关闭 6.3 集群操作6.3.1 连接集群6.3.2 写入数据6.3.3 集群查询6.3.4 故障转移6.3.5 集群扩容6.3.6 集群收缩 6.4 分布式系统的限制…

vue整理面试题

1. v-if/v-show的区别? v-if"表达式" 当表达式值true&#xff0c;v-if所作用的元素显示 否则隐藏 v-show"表达式" 当表达式值true&#xff0c;v-if所作用的元素显示 否则隐藏 理解&#xff1a; v-if控制元素显示与隐藏&#xff0c;通过js创建dom元素或删除…

统信UOS linux下opencv应用编译时的头文件和库文件路径查找设置方法

☞ ░ 前往老猿Python博客 ░ https://blog.csdn.net/LaoYuanPython 一、引言 老猿原来进行的C和C开发主要是基于windows环境的&#xff0c;目前要在统信UOS操作系统环境下编译opencv应用程序&#xff0c;其环境设置与windows环境下变化很多&#xff0c;今天就来介绍一下在统…

AtCoder Beginner Contest 333

B - Pentagon 没什么好讲的&#xff0c;pass int a[N]; int len[6] { 0,1,2,2,1 }; void solve() {char s1, s2, t1, t2; cin >> s1 >> s2 >> t1 >> t2;if (s2 < s1) swap(s1, s2);if (t2 < t1) swap(t1, t2);int d1 s2 - s1, d2 t2 - t1;if…

设计模式——适配器模式(Adapter Pattern)

概述 适配器模式可以将一个类的接口和另一个类的接口匹配起来&#xff0c;而无须修改原来的适配者接口和抽象目标类接口。适配器模式(Adapter Pattern)&#xff1a;将一个接口转换成客户希望的另一个接口&#xff0c;使接口不兼容的那些类可以一起工作&#xff0c;其别名为包装…

第三方软件测试公司有哪些服务形式?如何收费?

由于软件企业的增多&#xff0c;企业更加注重软件开发&#xff0c;因此会将软件测试工作交由第三方软件测试公司进行。第三方软件测试公司也就是专门做软件测评的外包公司&#xff0c;主要是发现软件漏洞和缺陷以便公正、客观评估软件质量&#xff0c;再出具一份软件测试报告。…

verilog rs232串口模块

前面发了个发送模块&#xff0c;这次补齐&#xff0c;完整。 串口计数器&#xff0c;波特率适配 uart_clk.v module uart_clk(input wire clk,input wire rst_n,input wire tx_clk_en,input wire rx_clk_en,input wire[1:0] baud_sel,output wire tx_clk,output wire rx_clk )…