Flink 流式读写文件、文件夹

文章目录

  • 一、flink 流式读取文件夹、文件
  • 二、flink 写入文件系统——StreamFileSink
  • 三、查看完整代码

一、flink 流式读取文件夹、文件

Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        TextInputFormat textInputFormat = new TextInputFormat(null);
        DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);

StreamExecutionEnvironment.readFile()接收如下参数:

  • FileInputFormat参数,负责读取文件中的内容。
  • 文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
  • PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
  • 30000L表示多久扫描一次监听的文件。

FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。

DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。

在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。

二、flink 写入文件系统——StreamFileSink

该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

streamFileSink中输出的文件,其生命周期会经历3中状态:

  • in-progress Files 当前文件正在写入中
  • Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
  • Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
    下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !

数据文件格式是行式存储格式

        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));
        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
                new Path(savePath),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据
                                .withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB
                                .build())
                .withBucketAssigner(assigner)
                .build();

其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。

所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:
在这里插入图片描述
将数据以列式存储的格式输出到文件中

三、查看完整代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;

import java.time.ZoneId;
import java.util.concurrent.TimeUnit;

public class WordTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.设置CK&状态后端
        env.setStateBackend(new FsStateBackend("hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint"));
        env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpoint
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次
        env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成,否则就会被抛弃
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** ms
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s
        // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
        String sourcePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com";
        String savePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01";

        TextInputFormat textInputFormat = new TextInputFormat(null);
        DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);

        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));
        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
                new Path(savePath),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据
                                .withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB
                                .build())
                .withBucketAssigner(assigner)
                .build();


        source.map(line -> JSONObject.parseObject(line))
                .filter(line -> line.getString("text").length() > 200 && line.getInteger("id") % 7 == 0)
                .map(line -> JSON.toJSONString(line))
                .addSink(fileSink);

        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/75782.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

emqx-5.1.4开源版使用记录

emqx-5.1.4开源版使用记录 windows系统安装eqmx 去官网下载 emqx-5.1.4-windows-amd64.zip&#xff0c;然后找个目录解压 进入bin目录,执行命令启动emqx 执行命令 emqx.cmd start使用emqx 访问内置的web管理页面 浏览器访问地址 http://localhost:18083/#/dashboard/overv…

H3C交换机MIB库

非常齐全的官方MIB库 为Zabbix监控华三交换机提供诸多方便。 如下信息提供下载链接和下载账号: MIB清单下载:交换机-新华三集团-H3C MIB库:MIB-新华三集团-H3C

Aspera替代方案:探索这些安全且可靠的文件传输工具

科技的发展日新月异&#xff0c;文件的传输方式也在不断地更新换代。传统的邮件附件、FTP等方式已经难以满足人们对于传输速度和安全性的需求了。近年来&#xff0c;一些新兴的文件传输工具受到了人们的关注&#xff0c;其中除了知名的Aspera之外&#xff0c;还有许多可靠安全的…

梅赛德斯-奔驰将成为首家集成ChatGPT的汽车制造商

ChatGPT的受欢迎程度毋庸置疑。OpenAI这个基于人工智能的工具&#xff0c;每天能够吸引无数用户使用&#xff0c;已成为当下很受欢迎的技术热点。因此&#xff0c;有许多公司都在想方设法利用ChatGPT来提高产品吸引力&#xff0c;卖点以及性能。在汽车领域&#xff0c;梅赛德斯…

springBoot 集中配置管理

springBoot 集中配置管理 项目配置如果上线项目&#xff0c;运维或者开发者可以直接和jar包同目录下创建文件&#xff0c;然后更改属性 项目配置 创建文件&#xff0c;调整配置如果上线项目&#xff0c;运维或者开发者可以直接和jar包同目录下创建文件&#xff0c;然后更改 属…

Redis实现共享Session

Redis实现共享Session 分布式系统中&#xff0c;sessiong共享有很多的解决方案&#xff0c;其中托管到缓存中应该是最常用的方案之一。 1、引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM…

undefined reference to `dlopen‘ ‘SSL_library_init‘ `X509_certificate_type‘

使用Crow的时候需要注意crow依赖asio依赖OpenSSL&#xff0c;asio要求1.22以上版本&#xff0c;我使用的是1.26.0&#xff1b; 这个版本的asio要求OpenSSL是1.0.2&#xff0c;其他版本我得机器上编不过&#xff0c;ubuntu上默认带的OpenSSL是1.1.1; 所以我下载了OPENSSL1.2.0重…

QT使用QML实现地图绘制虚线

QML提供了MapPolyline用于在地图上绘制线段&#xff0c;该线段是实线&#xff0c;因此我使用Canvas自定义绘制的方式在地图上绘制线段&#xff0c;如图&#xff1a; 鼠标在地图上点击后&#xff0c;在点击位置添加图标 &#xff0c;当有多个图标被添加到地图上后&#xff0c;计…

mysql-事务特性以及隔离机制

一.ACID 事务&#xff08;Transaction&#xff09;是访问和更新数据库的程序执行单元&#xff1b;事务中可能包含一个或多个sql语句&#xff0c;这些语句要么都执行&#xff0c;要么都不执行。 1.逻辑架构和存储引擎 如上图所示&#xff0c;MySQL服务器逻辑架构从上往下可以分…

对应分析介绍及SPSS案例分析

在开展统计分析的过程中&#xff0c;分类变量&#xff08;定序和定类变量&#xff09;是我们研究的一个重点。通常我们分析分类变量间关系时&#xff0c;最常用的分析方法是卡方检验&#xff0c;其次是逻辑回归和对数线性模型等。 如果类别变量的分类较少&#xff0c;我们可以…

构建 LVS-DR 群集、配置nginx负载均衡。

目录 一、基于 CentOS 7 构建 LVS-DR 群集 1、准备四台虚拟机 2、配置负载调度器&#xff08;192.168.2.130&#xff09; 3、部署共享存储&#xff08;192.168.2.133&#xff09; 4、配置两个Web服务器&#xff08;192.168.2.131、192.168.2.132&#xff09; 测试集群 二…

PHP最简单自定义自己的框架数据库封装调用(五)

1、实现效果调用实现数据增删改查封装 2、创建数据表 CREATE TABLE test (id int(11) NOT NULL AUTO_INCREMENT,name varchar(30) DEFAULT NULL,age int(11) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEMyISAM AUTO_INCREMENT4 DEFAULT CHARSETutf8;3、index.php 入口定义数据库…

云服务 Ubuntu 20.04 版本 使用 Nginx 部署静态网页

所需操作&#xff1a; 1.安装Nginx 2.修改配置文件 3.测试、重启 Nginx 4.内部修改防火墙 5.配置解析 6.测试是否部署成功 1.安装Nginx // 未使用 root 账号 apt-get update // 更新apt-get install nginx // 安装 nginx 1.1.测试是否安装没问题 在网页上输入云服务的公网…

山西电力市场日前价格预测【2023-08-16】

日前价格预测 预测明日&#xff08;2023-08-16&#xff09;山西电力市场全天平均日前电价为363.90元/MWh。其中&#xff0c;最高日前电价为430.17元/MWh&#xff0c;预计出现在19: 30。最低日前电价为318.33元/MWh&#xff0c;预计出现在13: 00。 价差方向预测 1&#xff1a; 实…

css3 瀑布流布局遇见截断下一列展示后半截现象

css3 瀑布流布局遇见截断下一列展示后半截现象 注&#xff1a;css3实现瀑布流布局简直不要太香&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#xff5e; 场景-在uniapp项目中 当瀑布流布局column-grap:10px 相邻两列之间的间隙为10px&#xff0c;column-count:2,2列展…

如何使用CSS实现一个纯CSS的滚动条样式?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 使用CSS实现自定义滚动条样式⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣…

Leetcode 21. 合并两个有序链表

题目描述 题目链接&#xff1a;https://leetcode.cn/problems/merge-two-sorted-lists/description/ 思路 两个链表都是升序链表&#xff0c;新建一个链表&#xff0c;引入伪头节点作为辅助节点&#xff0c;将各节点添加到伪节点之后&#xff0c;再用一个cur节点指向新链表的…

创新方案|超越炒作 – 从产品驱动增长PLG到产品驱动销售PLS的务实策略指南

这篇文章探讨从产品驱动增长到产品驱动销售的策略&#xff0c;超越了产品驱动增长的炒作。尽管产品驱动增长模式被认为是科技公司的灵丹妙药&#xff0c;但要取得成功&#xff0c;通常需要结合更传统的企业模式的要素。研究表明&#xff0c;只有少数采用产品驱动增长模式的公司…

2023上半年京东手机行业品牌销售排行榜(京东数据平台)

后疫情时代&#xff0c;不少行业都迎来消费复苏&#xff0c;我国智能手机市场在今年上半年也实现温和的复苏&#xff0c;手机市场的出货量回暖。 根据鲸参谋平台的数据显示&#xff0c;2023年上半年&#xff0c;京东平台上手机的销量为2830万&#xff0c;环比增长约4%&#xf…

opencv进阶02-在图像上绘制多种几何图形

OpenCV 提供了方便的绘图功能&#xff0c;使用其中的绘图函数可以绘制直线、矩形、圆、椭圆等多种几何图形&#xff0c;还能在图像中的指定位置添加文字说明。 OpenCV 提供了绘制直线的函数 cv2.line()、绘制矩形的函数 cv2.rectangle()、绘制圆的函数cv2.circle()、绘制椭圆的…