Flink学习记录

可以快速搭建一个Flink编写程序

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.17.1 \
    -DgroupId=com.zxx.langhuan \
    -DartifactId=langhuan-flink \
    -Dversion=1.0.0-SNAPSHOT \
    -Dpackage=com.zxx.langhuan.flink \
    -DinteractiveMode=false

Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs。

对于 Java,Flink 自带有 Tuple0 到 Tuple25 类型。

POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。

基本的 stream source

// DataStream API
// fromElements
StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

// fromCollection
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);

// socketTextStream
DataStream<String> lines = env.socketTextStream("localhost", 9999)

// readTextFile
DataStream<String> lines = env.readTextFile("file:///path");

// customSource
DataStreamSource<OUT> out = env.addSource(SourceFunction<OUT> function);



// Table API
// 创建TableEnvironment(表环境)。 创建表环境时,你可以设置作业属性,定义应用的批流模式,以及创建数据源。 我们先创建一个标准的表环境,并选择流式执行器。
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
// 批处理模式
// EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.executeSql("CREATE TABLE transactions (\n" +
    "    account_id  BIGINT,\n" +
    "    amount      BIGINT,\n" +
    "    transaction_time TIMESTAMP(3),\n" +
    "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
    ") WITH (\n" +
    "    'connector' = 'kafka',\n" +
    "    'topic'     = 'transactions',\n" +
    "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
    "    'format'    = 'csv'\n" +
    ")");

tEnv.executeSql("CREATE TABLE spend_report (\n" +
    "    account_id BIGINT,\n" +
    "    log_ts     TIMESTAMP(3),\n" +
    "    amount     BIGINT\n," +
    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
    ") WITH (\n" +
    "    'connector'  = 'jdbc',\n" +
    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
    "    'table-name' = 'spend_report',\n" +
    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
    "    'username'   = 'sql-demo',\n" +
    "    'password'   = 'demo-sql'\n" +
    ")");

Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

支持的connectors

DataStream ConnectorsTable API Connectors
DataGen
Kafka
Cassandra
DynamoDB
ElasticSearch
Firehose
Kinesis
MongoDB
OpenSearch
文件系统
RabbitMQ
Google Cloud PubSub
Hybrid Source
Pulsar
JDBC
Upsert Kafka
HBase
Print
BlackHole
Hive

场景说明

一个 Flink 集群总是包含一个 JobManager 以及一个或多个 Flink TaskManager。
JobManager 负责处理 Job 提交、 Job 监控以及资源管理。
Flink TaskManager 运行 worker 进程, 负责实际任务 Tasks 的执行,而这些任务共同组成了一个 Flink Job。 
 

有界流:批处理

无界流:流处理

无状态的转换

map()

flatmap()

keyBy()

以下情况,一个类不能作为 key

  1. 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。
  2. 它是任意类的数组。

reduce() 和其他聚合算子  

数据流转换

有状态的转换

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

Rich Functions

  • open(Configuration c):仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。
  • close()
  • getRuntimeContext():为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。
  • setRuntimeContext()
  • getIterationRuntimeContext()

ValueState:要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。(相同的key共享)

  • value()
  • update()
  • clear()

Flink 明确支持以下三种时间语义:

  • 事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间

  • 摄取时间(ingestion time): Flink 读取事件时记录的时间

  • 处理时间(processing time): Flink pipeline 中具体算子处理事件的时间

Watermarks: 定义何时停止等待较早的事件

//        AssignerWithPunctuatedWatermarks 在每个事件上都可以提供水位线,因此水位线可以更加灵活和精确地根据事件的特性进行生成。
//        AssignerWithPeriodicWatermarks 在一定的时间间隔内提供水位线,因此水位线的生成不会频繁,适用于事件的时间戳变化频率较高,而水位线的变化频率较低的情况。
//        在选择使用哪个接口时,可以根据具体的业务需求和事件流的特点来决定。如果事件的时间戳和水位线的变化较为频繁,或者需要更精确的控制,可以选择 AssignerWithPunctuatedWatermarks。如果事件的时间戳变化较为平稳,或者水位线的变化不需要那么频繁,可以选择 AssignerWithPeriodicWatermarks。

// WatermarkStrategy 接口包含以下方法:
//        withTimestampAssigner:指定如何从事件中提取事件时间戳(timestamp)。
//        withIdleness:配置是否在流处于空闲状态时发出水位线,默认为不发出水位线。
//        withIdlenessTimeout:指定流处于空闲状态多久后发出水位线。
//        withTimestampAssignerAndIdlenessTimeout:同时指定事件时间戳提取逻辑和流空闲状态的配置。

// 通常,你可以通过静态工厂方法 WatermarkStrategy.forXXX() 来创建特定的水位线策略,其中 XXX 可以是以下几种类型之一:
//        forBoundedOutOfOrderness:适用于乱序但有界的事件流,根据最大允许的乱序程度指定固定的延迟时间。
//        forMonotonousTimestamps:适用于单调递增的事件流,例如源自某些消息队列的事件流。
//        forGenerator:通过自定义的水位线生成器函数来创建水位线策略。
//        forPeriodicBoundedOutOfOrderness:适用于周期性有界乱序事件流,指定固定的延迟时间和乱序间隔。
//        forEventTime:适用于事件时间已经正确嵌入在事件中的情况,不需要提取时间戳。

DataStream<Event> stream = ...;

WatermarkStrategy<Event> strategy = WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(strategy);

Windows

Flink 有一些内置的窗口分配器,如下所示:

  • 滚动时间窗口
    • 每分钟页面浏览量
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口
    • 每10秒钟计算前1分钟的页面浏览量
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口
    • 每个会话的网页浏览量,其中会话之间的间隔至少为30分钟
    • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n)Time.seconds(n)Time.minutes(n)Time.hours(n), 和 Time.days(n)

窗口应用函数 

我们有三种最基本的操作窗口内的事件的选项:

  1. 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
  2. 或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
  3. 或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。
DataStream<SensorReading> input = ...;

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
        SensorReading,                  // 输入类型
        Tuple3<String, Long, Integer>,  // 输出类型
        String,                         // 键类型
        TimeWindow> {                   // 窗口类型

    @Override
    public void process(
            String key,
            Context context,
            Iterable<SensorReading> events,
            Collector<Tuple3<String, Long, Integer>> out) {

        int max = 0;
        for (SensorReading event : events) {
            max = Math.max(event.value, max);
        }
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

// Context 接口 展示大致如下:
public abstract class Context implements java.io.Serializable {
    public abstract W window();

    public abstract long currentProcessingTime();
    public abstract long currentWatermark();

    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
}

// windowState 和 globalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。
// 增量聚合示例
DataStream<SensorReading> input = ...;

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
    public SensorReading reduce(SensorReading r1, SensorReading r2) {
        return r1.value() > r2.value() ? r1 : r2;
    }
}

private static class MyWindowFunction extends ProcessWindowFunction<
    SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

    @Override
    public void process(
            String key,
            Context context,
            Iterable<SensorReading> maxReading,
            Collector<Tuple3<String, Long, SensorReading>> out) {

        SensorReading max = maxReading.iterator().next();
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

晚到的事件

  • 旁路输出

OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream
    .keyBy(...)
    .window(...)
    .sideOutputLateData(lateTag)
    .process(...);

DataStream<Event> lateStream = result.getSideOutput(lateTag);
  • 指定允许的延迟
stream
    .keyBy(...)
    .window(...)
    .allowedLateness(Time.seconds(10))
    .process(...);

如果延迟的事件需要特殊处理,可以从Context中获取水位线时间来做比较

public void processElement(
        TaxiFare fare,
        Context ctx,
        Collector<Tuple3<Long, Long, Float>> out) throws Exception {

    long eventTime = fare.getEventTime();
    TimerService timerService = ctx.timerService();

    if (eventTime <= timerService.currentWatermark()) {
        // 事件延迟;其对应的窗口已经触发。
    } else {
        // 其他处理
    }
}

深入了解窗口操作

滑动窗口是通过复制来实现的

滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。

时间窗口会和时间对齐

仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。

请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口 和 滑动窗口 。

window 后面可以接 window

比如说:

stream
    .keyBy(t -> t.key)
    .window(<window assigner>)
    .reduce(<reduce function>)
    .windowAll(<same window assigner>)
    .reduce(<same reduce function>);

可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。

之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。

空的时间窗口不会输出结果

事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。

延迟事件可能导致延迟合并

会话窗口的实现是基于窗口的一个抽象能力,窗口可以 聚合。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/70043.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《全生命周期眼健康管理》助力健康科学用眼

8月8日下午&#xff0c;烟台正大光明眼科医院眼健康管理中心张提主任受邀来到烟台市残疾人事务综合服务中心&#xff0c;为残联康复训练教师及相关工作人员进行了《全生命周期眼健康管理》讲座。 烟台正大光明眼科医院眼健康管理中心张提主任 “全生命周期眼健康”这一理念其宗…

流水线时序调度之规避冲突

1 写在前面的&#xff1a; 其实略微一个大点的机器&#xff0c;一个测试流程需要若干个步骤&#xff0c;都可以用流水线的思维去看待它&#xff1b; 我之前也没往流水线的角度去考虑&#xff0c;那有些机器的时序调度是不好理解的&#xff0c;甚至计算个通量都很麻烦&#xff…

15-1_Qt 5.9 C++开发指南_Qt多媒体模块概述

多媒体功能指的主要是计算机的音频和视频的输入、输出、显示和播放等功能&#xff0c;Qt 的多媒体模块为音频和视频播放、录音、摄像头拍照和录像等提供支持&#xff0c;甚至还提供数字收音机的支持。本章将介绍 Qt 多媒体模块的功能和使用。 文章目录 1. Qt 多媒体模块概述2. …

pgAdmin开发工具之ERD

pgAdmin 是一个免费的 PostgreSQL 管理与开发平台&#xff0c;这篇文章介绍了它的安装与使用。 今天我们要介绍的是它的一个开发功能&#xff1a;实体关系图&#xff08;ERD&#xff09;工具。 ERD 工具可以使用图形化的方式表示数据库中的表、字段以及表之间的关系。 pgAdmin…

vue svg画渐变色线条

基于业务需求需要&#xff0c;需要使用svg画渐变色弧线并且采用虚线。并且封装成组件。 一、path路径 path路径是svg中最强大的图形&#xff0c;可以绘制各种svg所有能画的图形。 路径中的线是由d属性来绘制&#xff0c;属性参数由各种命令组成&#xff0c;以下是它的基本命…

selenium.webdriver Python爬虫教程

文章目录 selenium安装和使用 selenium安装和使用 pip install selenium 下载对应的浏览器驱动 实例化浏览器 from selenium import webdriverbrowser webdriver.Chrome()元素定位 控制浏览器

Jmeter设置中文的两种方式,建议使用第二种

方案一 进入jmeter图像化界面&#xff0c;选择Options下的Choose Language&#xff0c;再选择Chinese(Simplified)。这个就是选择语言为简体中文&#xff08;缺陷&#xff1a;这个只是在本次使用时为中文&#xff0c;下次打开默认还是英文的&#xff09; 方案二&#xff08;…

时序预测 | Python实现LSTM长短期记忆网络时间序列预测(电力负荷预测)

时序预测 | Python实现LSTM长短期记忆网络时间序列预测(电力负荷预测) 目录 时序预测 | Python实现LSTM长短期记忆网络时间序列预测(电力负荷预测)效果一览基本描述模型结构程序设计参考资料效果一览

Redis安装配置远程连接

1. yum 安装 redis&#xff1a; 直接使用命令&#xff0c;将 redis 安装到 linux 服务器中&#xff1a; yum -y install redis 2. 启动 redis&#xff1a; 在 xshell 里&#xff0c;可以使用下面命令&#xff0c;以后台方式启动 redis&#xff1a; [rootVM-8-17-centos /]…

Vue3+Ts+Vite项目全局配置Element-Plus主题色

概述 我找了很多博客&#xff0c;想全局配置Elmenet-Plus组件主题色&#xff0c;但都没有效果。所以有了这篇博客&#xff0c;希望能对你有所帮助&#xff01;&#xff01;&#xff01; 文章目录 概述一、先看效果二、创建全局颜色文件2.1 /src/styles 下新建 element-plus.sc…

gitlab-Runner搭建

root wget https://packages.gitlab.com/runner/gitlab-runner/packages/fedora/29/gitlab-runner-12.6.0-1.x86_64.rpm/download.rpm rpm -ivh download.rpm ---- 安装 rpm -Uvh download.rpm -----更新升级 然后运行&#xff1a; gitlab-runner register --url https://git…

虚拟展览馆有哪些优势?如何打造自己的虚拟展览馆

引言&#xff1a; 随着科技的不断创新与发展&#xff0c;虚拟展览馆作为一种全新的文化体验方式&#xff0c;正逐渐引起人们的关注。虚拟展览馆以其便捷、创新、可定制的特点&#xff0c;为参观者提供了前所未有的沉浸式体验。 一&#xff0e;什么是虚拟展览馆&#xff1f; 虚…

C语言学习系列-->看淡指针(1)

文章目录 一、概述二、指针变量和地址2.1 取地址操作符2.2 指针变量和解引用操作符2.2.1 指针变量2.2.2 拆解指针类型2.2.4 解引用操作符 2.3 指针变量的大小 三、指针变量的意义3.1 指针的解引用指针-整数 四、 const修饰指针五、指针运算5.1 指针- 整数5.2 指针-指针5.3 指针…

音视频研发分享:关键帧截图+wasm快照--我又做了一件有益于社会的事情

音视频研发分享&#xff1a;关键帧截图wasm快照--我又做了一件有益于社会的事情 简单的一个视频设备快照功能到底有多费事多费电&#xff1f;新的方法有方法&#xff01; 省了多少电&#xff1f; 简单的一个视频设备快照功能到底有多费事多费电&#xff1f; 以前&#xff0c;我…

C++模板,STL(Standard Template Library)

这篇文章的主要内容是C中的函数模板、类模板、STL的介绍。 希望对C爱好者有所帮助&#xff0c;内容充实且干货&#xff0c;点赞收藏防止找不到&#xff01; 再次感谢每个读者和正在学习编程的朋友莅临&#xff01; 更多优质内容请点击移驾&#xff1a; C收录库&#xff1a;重生…

HTML5 Canvas和Svg:哪个简单且好用?

HTML5 Canvas 和 SVG 都是基于标准的 HTML5 技术&#xff0c;可用于创建令人惊叹的图形和视觉体验。 首先&#xff0c;让我们花几句话介绍HTML5 Canvas和SVG。 什么是Canvas? Canvas&#xff08;通过 标签使用&#xff09;是一个 HTML 元素&#xff0c;用于在用户计算机屏幕…

智能合约 -- 常规漏洞分析 + 实例

1.重入攻击 漏洞分析 攻击者利用合约漏洞&#xff0c;通过fallback()或者receive()函数进行函数递归进行持续取钱。 刚才试了一下可以递归10次&#xff0c;貌似就结束了(version: 0.8.20)。 直接看代码: 银行合约&#xff1a;有存钱、取钱、查看账户余额等函数。攻击合约:…

半关闭、端口复用与IO多路复用

文章目录 半关闭端口复用IO多路复用&#xff08;IO多路转接&#xff09;模型解决措施 sellect缺点 poll应用缺点 epoll应用工作模式 半关闭 使用close(fd);所对应的文件描述符写和读都关闭了。 端口复用 可以解决绑定失败的问题。 IO多路复用&#xff08;IO多路转接&#…

【冒泡排序及其优化】

冒泡排序及其优化 冒泡排序核心思想 冒泡排序的核⼼思想就是&#xff1a;两两相邻的元素进⾏⽐较 1题目举例 给出一个倒序数组&#xff1a;arr[10]{9,8,7,6,5,4,3,2,1,0} 请排序按小到大输出 1.1题目分析 这是一个完全倒序的数组&#xff0c;所以确定冒泡排序的趟数&#xff0…

低代码助力传统制造业数字化转型策略

随着制造强国战略逐步实施&#xff0c;制造行业数字化逐渐进入快车道。提高生产管理的敏捷性、加强产品的全生命周期质量管理是企业数字化转型的核心诉求&#xff0c;也是需要思考的核心价值。就当下传统制造业的核心问题来看&#xff0c;低代码是最佳解决方案&#xff0c;那为…