Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

目录

1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。

2. File Sink

File Sink

Format Types 

Row-encoded Formats 

Bulk-encoded Formats 

桶分配

滚动策略

3. 如何输出结果

集合数据到客户端,execute_and_collect方法将收集数据到客户端内存

将结果发送到DataStream sink connector

将结果发送到Table & SQL sink connector

4. 执行 PyFlink DataStream API 作业。


1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。

本教程使用 FileSink 将结果数据写入文件中。

def split(line):
    yield from line.split()

# compute word count
ds = ds.flat_map(split) \
    .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
    .key_by(lambda i: i[0]) \
    .reduce(lambda i, j: (i[0], i[1] + j[1]))

ds.sink_to(
    sink=FileSink.for_row_format(
        base_path=output_path,
        encoder=Encoder.simple_string_encoder())
    .with_output_file_config(
        OutputFileConfig.builder()
        .with_part_prefix("prefix")
        .with_part_suffix(".ext")
        .build())
    .with_rolling_policy(RollingPolicy.default_rolling_policy())
    .build()
)

sink_to函数,将DataStream数据发送到自定义sink connector,仅支持FileSink,可用于batch和streaming执行模式。

2. File Sink

Streaming File Sink是Flink1.7中推出的新特性,是为了解决如下的问题:

大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中。

Streaming File Sink就可以用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中,支持Exactly-Once语义。这种sink实现的Exactly-Once都是基于Flink checkpoint来实现的两阶段提交模式来保证的,主要应用在实时数仓、topic拆分、基于小时分析处理等场景下。

Streaming File Sink 是社区优化后添加的connector,推荐使用。

Streaming File Sink更灵活,功能更强大,可以自己实现序列化方法

Streaming File Sink有两个方法可以输出到文件:行编码格式forRowFormat 和  块编码格式forBulkFormat。

forRowFormat 比较简单,只提供了SimpleStringEncoder写文本文件,可以指定编码。

由于流数据本身是无界的,所以,流数据将数据写入到分桶(bucket)中。默认使用基于系统时间(yyyy-MM-dd--HH)的分桶策略。在分桶中,又根据滚动策略,将输出拆分为 part 文件。

Flink 提供了两个分桶策略,分桶策略实现了

org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner 接口:

BasePathBucketAssigner,不分桶,所有文件写到根目录;

DateTimeBucketAssigner,基于系统时间(yyyy-MM-dd--HH)分桶。

除此之外,还可以实现BucketAssigner接口,自定义分桶策略。

Flink 提供了两个滚动策略,滚动策略实现了

org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy 接口:

DefaultRollingPolicy 当超过最大桶大小(默认为 128 MB),或超过了滚动周期(默认为 60 秒),或未写入数据处于不活跃状态超时(默认为 60 秒)的时候,滚动文件;

OnCheckpointRollingPolicy 当 checkpoint 的时候,滚动文件。

File Sink

File Sink 将传入的数据写入存储桶中。考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的 Part 文件。 完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。这意味着桶中将包含一个小时间隔内接收到的记录。

桶目录中的数据被拆分成多个 Part 文件。对于相应的接收数据的桶的 Sink 的每个 Subtask,每个桶将至少包含一个 Part 文件。将根据配置的滚动策略来创建其他 Part 文件。 对于 Row-encoded Formats默认的策略是根据 Part 文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。 对于 Bulk-encoded Formats 在每次创建 Checkpoint 时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。

重要:  STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

Format Types 

FileSink 不仅支持 Row-encoded 也支持 Bulk-encoded,例如 Apache Parquet 这两种格式可以通过如下的静态方法进行构造:

  • Row-encoded sink: FileSink.forRowFormat(basePath, rowEncoder)
  • Bulk-encoded sink: FileSink.forBulkFormat(basePath, bulkWriterFactory)

不论创建 Row-encoded Format 或者 Bulk-encoded Format Sink 时,都必须指定桶的路径以及对数据进行编码的逻辑。

Row-encoded Formats 

Row-encoded Format 需要指定一个 Encoder,在输出数据到文件过程中被用来将单个行数据序列化为 Outputstream

除了 bucket assignerRowFormatBuilder 还允许用户指定以下属性:

  • Custom RollingPolicy :自定义滚动策略覆盖 DefaultRollingPolicy
  • bucketCheckInterval (默认值 = 1 min) :基于滚动策略设置的检查时间间隔
data_stream = ...
sink = FileSink \
    .for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \
    .with_rolling_policy(RollingPolicy.default_rolling_policy(
        part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000)) \
    .build()
data_stream.sink_to(sink)

这个例子中创建了一个简单的 Sink,默认的将记录分配给小时桶。 例子中还指定了滚动策略,当满足以下三个条件的任何一个时都会将 In-progress 状态文件进行滚动:

  • 包含了至少15分钟的数据量
  • 从没接收延时5分钟之外的新纪录
  • 文件大小已经达到 1GB(写入最后一条记录之后)

Bulk-encoded Formats 

Bulk-encoded Sink 的创建和 Row-encoded 的相似,但不需要指定 Encoder,而是需要指定 BulkWriter.Factory BulkWriter 定义了如何添加和刷新新数据以及如何最终确定一批记录使用哪种编码字符集的逻辑。

Flink 内置了5 BulkWriter 工厂类:

  • ParquetWriterFactory
  • AvroWriterFactory
  • SequenceFileWriterFactory
  • CompressWriterFactory
  • OrcBulkWriterFactory

重要 Bulk-encoded Format 仅支持一种继承了 CheckpointRollingPolicy 类的滚动策略。 在每个 Checkpoint 都会滚动。另外也可以根据大小或处理时间进行滚动。

桶分配

桶的逻辑定义了如何将数据分配到基本输出目录内的子目录中。

Row-encoded Format Bulk-encoded Format使用了 DateTimeBucketAssigner 作为默认的分配器。 默认的分配器 DateTimeBucketAssigner 会基于使用了格式为 yyyy-MM-dd--HH 的系统默认时区来创建小时桶。日期格式(  桶大小)和时区都可以手动配置。

还可以在格式化构造器中通过调用 .withBucketAssigner(assigner) 方法指定自定义的 BucketAssigner

Flink 内置了两种 BucketAssigners

  • DateTimeBucketAssigner :默认的基于时间的分配器
  • BasePathBucketAssigner :分配所有文件存储在基础路径上(单个全局桶)

PyFlink 只支持 DateTimeBucketAssigner  BasePathBucketAssigner 

滚动策略

RollingPolicy 定义了何时关闭给定的 In-progress Part 文件,并将其转换为 Pending 状态,然后在转换为 Finished 状态。 Finished 状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。  STREAMING 模式下,滚动策略结合 Checkpoint 间隔(到下一个 Checkpoint 成功时,文件的 Pending 状态才转换为 Finished 状态)共同控制 Part 文件对下游 readers 是否可见以及这些文件的大小和数量。在 BATCH 模式下,Part 文件在 Job 最后对下游才变得可见,滚动策略只控制最大的 Part 文件大小。

Flink 内置了两种 RollingPolicies

  • DefaultRollingPolicy
  • OnCheckpointRollingPolicy

PyFlink 只支持 DefaultRollingPolicy  OnCheckpointRollingPolicy 

3. 如何输出结果

Print

ds.print()

Collect results to client

集合数据到客户端,execute_and_collect方法将收集数据到客户端内存

with ds.execute_and_collect() as results:

    for result in results:

        print(result)

将结果发送到DataStream sink connector

add_sink函数,将DataStream数据发送到sink connector,此函数仅支持FlinkKafkaProducer, JdbcSink和StreamingFileSink,仅在streaming执行模式下使用

from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors import FlinkKafkaProducer
from pyflink.common.serialization import JsonRowSerializationSchema

serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
    type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_producer = FlinkKafkaProducer(
    topic='test_sink_topic',
    serialization_schema=serialization_schema,
    producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds.add_sink(kafka_producer)

sink_to函数,将DataStream数据发送到自定义sink connector,仅支持FileSink,可用于batch和streaming执行模式

from pyflink.datastream.connectors import FileSink, OutputFileConfig
from pyflink.common.serialization import Encoder

output_path = '/opt/output/'
file_sink = FileSink \
    .for_row_format(output_path, Encoder.simple_string_encoder()) \  .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
    .build()
ds.sink_to(file_sink)

将结果发送到Table & SQL sink connector

Table & SQL connectors也被用于写入DataStream. 首先将DataStream转为Table,然后写入到 Table & SQL sink connector.

from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# option 1:the result type of ds is Types.ROW
def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield Row(s[0], sp)

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: Row(i[0] + j[0], i[1]))

# option 1:the result type of ds is Types.TUPLE
def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: (i[0] + j[0], i[1]))

# emit ds to print sink
t_env.execute_sql("""
        CREATE TABLE my_sink (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'print'
        )
    """)

table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")

4. 执行 PyFlink DataStream API 作业。

PyFlink applications 是懒加载的,并且只有在完全构建之后才会提交给集群上执行。

要执行一个应用程序,你只需简单地调用 env.execute()。

env.execute()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/93564.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Unity实现广告滚动播放、循环播放、鼠标切换的效果

效果: 场景结构: 特殊物体:panel下面用排列组件horizent layout group放置多个需要显示的面板,用mask遮罩好。 using System.Collections; using System.Collections.Generic; using DG.Tweening; using UnityEngine; using Unity…

手机盖板IR油墨透光率检测仪T03

手机盖板作为手机最外层玻璃面板,其加工一般有落料、倒边、抛光、镀膜、丝印等多道加工工序组成,其中任何一个工序出现差错,都有可能导致手机盖板产生缺陷,例如漏油、透光、IR孔不良、视窗划伤、油墨区划伤、內污、边花等&#xf…

淘宝免费爬虫数据 商品详情数据 商品销售额销量API

场景:一个宽敞明亮的办公室,一位公司高管坐在办公桌前。 高管(自言自语):淘宝,这个平台上商品真是琳琅满目,应该有不少销售数据吧。我该怎么利用这些数据呢? 突然,房间…

【vue+uniapp】切换本页面(点击导航按钮)就刷新接口

查阅资料:uni-app官网 点击导航中图标,就执行的方法(和methods同级): onTabItemTap(e) {this.getTaskTotal(); },

【PostGreSQL】PostGreSQL到Oracle的数据迁移

项目需要,有个数据需要导入,拿到手一开始以为是mysql,结果是个PostGreSQL的数据,于是装数据库,但这个也不懂呀,而且本系统用的Oracle,于是得解决迁移转换的问题。 总结下来两个思路。 1、Postg…

【Debug】解决RecursionError: maximum recursion depth exceeded in comparison报错

🚀Debug专栏 目录 🚀Debug专栏 ❓❓问题: 🔧🔧分析: 🎯🎯解决方案: ❓❓问题: 循环中报错RecursionError: maximum recursion depth exceeded in compari…

Git基本操作(Idea版)

第一次发布项目(本地->远程) 方式一 通过push的方式推送本地库到远程库(远程已创建好仓库) 这种方式需要提前创建好仓库。 右键点击项目,可以将当前分支的内容 push 到 GitHub 的远程仓库中。 注意&#xff1a…

arcgis+postgresql+postgis使用介绍

关于arcgis在postgresql创建地理数据库我分享一下自己的经历: 众所周知,arcgis如果在oracle中创建地理数据库,必须要使用ArcToolbox里面的地理数据库工具去创建,在里面发现它还可以创建sql_server, postgresql数据库类型&#xf…

1.神经网络基础知识

所有有用的计算机系统都有一个输入和一个输出, 并在输入和输出之间进行某种类型的计算。 神经网络也是如此。 当我们不能精确知道一些事情如何运作时, 我们可以尝试使用模型来估计其运作方式, 在模型中, 包括了我们可以调整的参数…

Java实现excel表数据的批量存储(结合easyexcel插件)

场景:加哥最近在做项目时,苦于系统自身并未提供数据批量导入的功能还不能自行添加上该功能,且自身不想手动一条一条将数据录入系统。随后,自己使用JDBC连接数据库、使用EasyExcel插件读取表格并将数据按照业务逻辑批量插入数据库完…

LeetCode-455-分发饼干-贪心算法

题目描述: 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],这是能让孩子们满足胃口的饼干的最小尺寸;并且每块饼干 j&#xff…

AM62x GPMC并口如何实现“小数据-低时延,大数据-高带宽”—ARM+FPGA低成本通信方案

GPMC并口简介 GPMC(General Purpose Memory Controller)是TI处理器特有的通用存储器控制器接口,支持8/16bit数据位宽,支持128MB访问空间,最高时钟速率133MHz。GPMC是AM62x、AM64x、AM437x、AM335x、AM57x等处理器专用于与外部存储器设备的接口…

Hive/Spark 整库导出/导入脚本

博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,…

mysql57、mysql80 目录结构 之 Windows

查看mysql 数据存储的位置 /bin:存储可执行文件,主要包含客户端和服务端启动程序,如mysql.exe、mysqld.exe等 /docs:存放一些文档 /include:用于放置一些头文件,如:mysql.h、mysqld_error.h 等 …

taro react/vue h5 中的上传input onchange 值得区别

<inputclassNamebase-input-file-h5typefileacceptimage/*capturecameraonChange{onChangeInput} />1、taro3react 2、taro3vue3

古典加密的C++实现——凯撒密码、单表代换密码

&#x1f64c;秋名山码民的主页 &#x1f602;一个打过一年半的oier&#xff0c;写过一年多的Java&#xff0c;啥都会干一点的普通本科生 &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f64f;作者水平有限&#xff0c;如发现错误&…

Android 中SettingsActivity(PreferenceFragmentCompat)的简单使用

如果你需要一个简单的APP设置&#xff0c;可以使用sharedPreferences进行存储&#xff0c;我们可以借助AndroidStudio快速创建一个用于设置的Activity&#xff0c;其实它是继承PreferenceFragmentCompat&#xff0c;存储方式用的就是sharedPreferences&#xff0c;只是帮我们节…

vue使用vant中的popup层,在popup层中加搜索功能后,input框获取焦点 ios机型的软键盘不会将popup顶起来的问题

1.使用vant的popup弹出层做了一个piker的选择器,用户需要在此基础上增加筛选功能。也就是输入框 2.可是在ios机型中,input框在获取焦点以后,ios的软键盘弹起会遮盖住我们的popup层,导致体验不是很好 3.在大佬的解答及帮助下,采用窗口滚动的方式解决此方法 <Popupv-model&q…

docker的安装以及基本操作

一.认识docker Docker是一种用于构建、打包和运行应用程序的开源平台。它基于操作系统级虚拟化技术&#xff0c;可以将应用程序和其依赖的库、环境等资源打包到一个可移植的容器中&#xff0c;形成一个轻量级、独立的可执行单元。 开发者在本地编译测试通过的容器可以批量地在…

主流深度学习框架及神经网络模型汇总

目录 主流深度学习框架及神经网络模型汇总 一、人工智能的研究领域和分支 二、主流深度学习框架​编辑 1.TensorFlow 2.PyTorch 3.PaddlePaddle 4.Keras 5.Caffe/Caffe2 6.MXNet 7.Theano 8.Torch 9.CNTK 10.ONNX 三、深度学习移动端推理框架 1.TensorRT 2.TF-…