Flink常见面试题

1、Flink 的四大特征(基石)

2、Flink 中都有哪些 Source,哪些 Sink,哪些算子(方法)

预定义Source

基于本地集合的source(Collection-based-source)

基于文件的source(File-based-source)

基于网络套接字(socketTextStream)

自定义Source

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

3、什么是侧道输出流,有什么用途

侧输出-SideOutput
Flink 通过watermark在短时间内允许了乱序到来的数据

通过延迟数据处理机制,可以处理长期迟到的数据。

但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。

对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?

不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来

保存成为一个DataStream,然后,交由开发人员自行处理。

那么这个机制就叫做侧输出机制(Side Output)

4、Flink 中两个流如何合并为一个流

Union

union可以合并多个同类型的流

将多个DataStream 合并成一个DataStream

【注意】:union合并的DataStream的类型必须是一致的

connect

connect可以连接2个不同类型的流(最后需要处理后再输出)

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化【一国两制】,两个流相互独立, 作为对比Union后是真的变成一个流了。

和union类似,但是connect只能连接两个流,两个流之间的数据类型可以同,对两个流的数据可以分别应用不同的处理逻辑.

5、Flink 中两个流如何 join

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

1.2 滑动窗口Join 

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

6、Flink 中都有哪些 window,什么是滑动,滚动窗口

Window可以分成两类:

CountWindow:按照指定的数据条数生成一个Window,与时间无关。

滚动计数窗口,每隔N条数据,统计前N条数据

滑动计数窗口,每隔N条数据,统计前M条数据

TimeWindow:按照时间生成Window。

滚动时间窗口,每隔N时间,统计前N时间范围内的数据,窗口长度N,滑动距离N

滑动时间窗口,每隔N时间,统计前M时间范围内的数据,窗口长度M,滑动距离N

会话窗口,按照会话划定的窗口

7、flink 中都有哪些时间语义,对于 event_time 中数据迟到的处理(数据乱序)

EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间。

IngestionTime:摄入时间,是事件/数据到达流处理系统的时间。

ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间。

迟到处理:

水印:对于迟到数据不长;

allowedLateness: 迟到时间很长;

侧道输出:对于迟到时间特别长。 

8、flink 中的状态指的是什么?有哪些状态,你使用过哪些状态,哪个项目使用到了状态

有状态计算和无状态计算

  • 无状态计算:
  • 不需要考虑历史数据, 相同的输入,得到相同的输出!如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
  • 有状态计算:
  • 需要考虑历史数据, 相同的输入,可能会得到不同的输出!
    • 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)

注意: Flink默认已经支持了无状态和有状态计算!

例如WordCount代码:已经做好了状态维护, 输入hello,输出(hello,1),再输入hello,输出(hello,2)。

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。

两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。

托管状态
   - KeyedState ( 在keyBy之后可以使用状态 )
      - ValueState  (存储一个值)
      - ListState   (存储多个值)
      - MapState    (存储key-value) 
   - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]
 - 原生状态 (不用)

9、flink 中 checkpoint 是什么,如何设置。

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。

一句话概括: Checkpoint就是State的快照。

可使用以下方法来设置:

package com.bigdata.day06;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-24 09:18:30
 **/
public class _01CheckPointDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。
        System.setProperty("HADOOP_USER_NAME", "root");
        // 在这个基础之上,添加快照
        // 第一句:开启快照,每隔1s保存一次快照
        env.enableCheckpointing(1000);
        // 第二句:设置快照保存的位置
        env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
        // 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] arr = s.split(",");
                return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
            }
        });
        //3. transformation-数据处理转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);


        result.print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

10、flink 中的重启策略 (流式计算中的重启策略)

重启策略的意义:流式数据是不可能停止的,假如有一条错误数据导致程序直接退出,后面的大量数据是会丢失的,对公司来讲,意义是重大的,损失是惨重的。

重启策略是一个单独的策略,如果你配置了 checkpoint 含有重启策略的,如果你没有 checkpoint 也可以自行配置重启策略,总之重启策略和 checkpoint 没有必然联系。

注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

11、什么是维表 join,如何实现,你在哪个项目中使用过维表 join

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果,那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。

实现:

通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在kafka流map()方法中与维表数据进行关联。

RichMapFunction中open方法里加载维表数据到内存的方式特点如下:

  • 优点:实现简单
  • 缺点:因为数据存于内存,所以只适合小数据量并且维表数据更新频率不高的情况下。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况。另外,维表是变化慢,不是一直不变的,只是变化比较缓慢而已。

以前的方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:

  • 优点:维度数据量不受内存限制,可以存储很大的数据量。
  • 缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。

使用cache来减轻访问压力

可以使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用Guava Cache。维表一般的特点是变化比较慢。在智慧城市项12目使用过。用它来存储一些预热的数据在内存中方便取出。

12、flinksql 如何读取 kafka 或者 mysql 的数据。

可通过以下代码直接实现:


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-28 11:00:51
 **/
public class _02KafkaConnectorDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 如果是建表语句:executeSql  这个返回值是TableResult
        // 如果是查询语句:sqlQuery    这个返回的是Table (有用)
        // 新建一个表,用于存储 kafka消息
        TableResult tableResult = tEnv.executeSql("CREATE TABLE table1 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topic1',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");

        // 新建一个表,用于存储kafka中的topic2中的数据
        tEnv.executeSql("CREATE TABLE table2 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topic2',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "  'format' = 'json'\n" +
                ")");

        tEnv.executeSql("insert into table2 select * from table1 where status ='success'");

        // 以上代码已经写完了,下面是两个步骤分开的写法
        //TODO 3.transformation/查询
        // Table result = tEnv.sqlQuery("select user_id,page_id,status from table1 where status='success'");
        //输出到Kafka    DDL
        // tEnv.executeSql("insert into table2 select * from " + result);


        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出
        //5. execute-执行
        // env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/929266.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BERT和RoBERTa;双向表示与单向的简单理解

目录 BERT和RoBERTa大型预训练语言模型 BERT的原理 RoBERTa的原理 举例说明 双向表示与单向的简单理解 除了预训练语言模型,还有什么模型 一、模型类型与结构 二、训练方式与数据 三、应用场景与功能 四、技术特点与优势 BERT和RoBERTa大型预训练语言模型 BERT(Bi…

OpenAI 推出了 Canvas 和 SearchGPT

今天的故事从 ChatGPT 推出的 Canvas 和 SearchGPT 开始。 ©作者|Ninja Geek 来源|神州问学 ChatGPT 在最近推出了令人兴奋的 Canvas 功能&#xff0c;Canvas 不仅带来了 ChatGPT 界面上的变化&#xff0c;还完全改变了人们撰写文档和书写代码的体验&#xff01; Canvas…

CentOS 9 配置静态IP

文章目录 1_问题原因2_nmcli 配置静态IP3_使用配置文件固定IP4_重启后存在的问题5_nmcli 补充 1_问题原因 CentOS 7 于 2014年6月发布&#xff0c;基于 RHEL 7&#xff0c;并在 2024年6月30日 结束维护。 CentOS 9 作为目前的最新版本&#xff0c;今天闲来闲来无事下载下来后…

ProjectSend 身份认证绕过漏洞复现(CVE-2024-11680)

0x01 产品描述: ProjectSend 是一个开源文件共享网络应用程序,旨在促进服务器管理员和客户端之间的安全、私密文件传输。它是一款相当流行的应用程序,被更喜欢自托管解决方案而不是 Google Drive 和 Dropbox 等第三方服务的组织使用。0x02 漏洞描述: ProjectSend r1720 之前…

【Vulkan入门】01-列举物理设备

目录 先叨叨git信息主要逻辑VulkanEnvEnumeratePhysicalDevices()PrintPhysicalDevices() 编译并运行程序 先叨叨 上一篇已经创建了VkInstance&#xff0c;本篇我们问问VkInstance&#xff0c;在当前平台上有多少个支持Vulkan的物理设备。 git信息 repository: https://gite…

小家电出海,沃丰科技助力保障售后服务的及时性与高效性

随着全球化步伐的加快&#xff0c;小家电行业也逐渐迈向国际市场&#xff0c;面向全球消费者提供服务。然而&#xff0c;跨国界的销售和服务挑战也随之而来&#xff0c;尤其是售后服务的及时性与高效性成为了企业亟需解决的问题。沃丰科技凭借其全渠道在线客服、工单系统和视频…

AI RPA 影刀基础教程:开启自动化之旅

RPA 是什么 RPA 就是机器人流程自动化&#xff0c;就是将重复的工作交给机器人来执行。只要是标准化的、重复的、有逻辑行的操作&#xff0c;都可以用 RPA 提效 准备 安装并注册影刀 影刀RPA - 影刀官网 安装 Chrome 浏览器 下载链接&#xff1a;Google Chrome 网络浏览器 …

Qt 2D绘图之五:图形视图框架的结构、坐标系统和框架间的事件处理与传播

参考文章链接: Qt 2D绘图之五:图形视图框架的结构和坐标系统 Qt 2D绘图之六:图形视图框架的事件处理与传播 图形视图框架的结构 在前面讲的基本绘图中,我们可以自己绘制各种图形,并且控制它们。但是,如果需要同时绘制很多个相同或不同的图形,并且要控制它们的移动、…

JavaScript 键盘控制移动

如果你想通过 JavaScript 实现键盘控制对象&#xff08;比如一个方块&#xff09;的移动&#xff0c;下面是一个简单的示例&#xff0c;展示如何监听键盘事件并根据按下的键来移动一个元素。 HTML 和 CSS&#xff1a; <!DOCTYPE html> <html lang"en">…

【Web】0基础学Web—html基本骨架、语义化标签、非语义化标签、列表、表格、表单

0基础学Web—html基本骨架、语义化标签、非语义化标签、列表、表格、表单 html基本骨架语义化标签图片属性a链接 非语义化标签特殊符号标签 列表无序列表结果展示 有序列表结果展示 定义列表结果展示 表格table属性tr属性结果展示 表单单标签form属性input属性selecttextareabu…

find / -name ‘*.jar‘ 需要加上英文单引号 (shell 的通配符展开行为)

文章目录 1. Shell 通配符展开&#xff08;Glob Expansion&#xff09;2. 有引号时的行为&#xff08;推荐&#xff09;3. 无引号时的行为4. 总结原因5. 推荐实践 rootiZuf67xiyefycct0a9rdi3Z:~# find / -name *.jar find: paths must precede expression: o2o.jar Usage: fin…

一次奇妙的getshell之旅

1. 资产收集时发现一个网站&#xff1a; https://xxxxxxxxxx/ischool/publish_page/0/ 发现存在管理员登陆: 这里之前在该旁站找到一个SQL注入&#xff0c;然后找到的这个账户密码&#xff08;这里如何从SQL注入找到账户密码前借鉴前面的报告。&#xff09;&#xff1a; 账号&…

QT6_UI设计——设置控件背景

1、右击选择控件 2、选择背景 color 颜色 background-color 背景颜色 alternate-background-color 交替背景颜色 border-color 边框颜色 border-top-color 边框顶端 border-right-color 边框右边 border-bottom-color 边框底部 border-left-color 边框左边 gridline-color 网…

第十三章 Linux计划任务

注意&#xff1a;进公司和有公司成员离职&#xff0c;一定要问计划任务&#xff0c;防止别人搞破坏背锅 13.1 一次性计划任务(atd服务) 1 安装 atd 服务 yum install -y at systemctl enable atd systemctl start atd ## 启动atd服务 systemctl status atd ## 查看atd服务…

Day28 买卖股票的最佳时机 跳跃游戏 跳跃游戏 II K 次取反后最大化的数组和

贪心算法 part02 122. 买卖股票的最佳时机 II - 力扣&#xff08;LeetCode&#xff09; 求最大利润 将每天的正利润加和 public int maxProfit(int[] prices) {int totalPrices 0;for(int i0;i<prices.length;i){if(i<prices.length-1&&prices[i1]>prices[…

Mac苹果电脑 java前后端开发环境及软件安装教程

本文记录我初次使用macOS系统&#xff0c;m4 mini安装开发软件及环境的全过程&#xff0c;希望能帮助到你&#xff0c;好用的请点赞评论收藏增加热度&#xff0c;让更多Mac小白轻松体验开发&#xff0c;20241129 …

ubuntu20.04安装OpenPcdet,CUDA版本11.8,显卡4090

本文参考这2篇文章的内容&#xff1a;https://blog.csdn.net/jin15203846657/article/details/122735375#comments_25352667 https://zhuanlan.zhihu.com/p/642158810 记录了自己安装OpenPcdet的过程。 OpenPcdet的安装需要cuda和pytorch版本严格关联。本例的CUDA版本&#xf…

Clickhouse MergeTree存储引擎

文章目录 MergeTree特点MergeTree核心参数- ORDER BY- PARTITION BY- PRIMARY KEY- SAMPLE BY- TTL- SETTINGS- index_granularity- index_granularity_bytes- min_index_granularity_bytes- enable_mixed_granularity_parts- use_minimalistic_part_header_in_zookeeper- min_…

将word里自带公式编辑器编辑的公式转换成用mathtype编辑的格式

文章目录 将word里自带公式编辑器编辑的公式转换成用mathtype编辑的格式MathType安装问题MathType30天试用延期MathPage.wll文件找不到问题 将word里自带公式编辑器编辑的公式转换成用mathtype编辑的格式 word自带公式编辑器编辑的公式格式&#xff1a; MathType编辑的格式&a…

期权懂|期权新手指南——个股期权操作方式详细解释

期权小懂每日分享期权知识&#xff0c;帮助期权新手及时有效地掌握即市趋势与新资讯&#xff01; 期权新手指南——个股期权操作方式详细解释 个股期权的操作方式相对复杂&#xff0c;但可以分为场内交易和场外交易两大类。 场内个股期权交易&#xff1a;是指在证券交易所上市交…