Flink_DataStreamAPI_输出算子Sink

Flink_DataStreamAPI_输出算子Sink

  • 1连接到外部系统
  • 2输出到文件
  • 3输出到Kafka
  • 4输出到MySQL(JDBC)
  • 5自定义Sink输出

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

在这里插入图片描述

1连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,

stream.sinkTo()

当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:
在这里插入图片描述
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
在这里插入图片描述
除此以外,就需要用户自定义实现sink连接器了。

2输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
-行编码: FileSink.forRowFormat(basePath,rowEncoder)。
-批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

示例:

public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);
        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );
        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();
        dataGen.sinkTo(fieSink);
        env.execute();
    }
}

3输出到Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码
输出无key的record:

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);
        /**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("atguigu-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();


        sensorDS.sinkTo(kafkaSink);
        env.execute();
    }
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);
        /**
         * 如果要指定写入kafka的key,可以自定义序列化器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {
                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        sensorDS.sinkTo(kafkaSink);
        env.execute();
    }
}

(4)运行代码,在Linux主机启动一个消费者,查看是否收到数据

[atguigu@hadoop102 ~]$ 
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

4输出到MySQL(JDBC)

写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:

<repositories>
    <repository>
        <id>apache-snapshots</id>
        <name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
    </repository>
</repositories>

添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:

	<mirror>
            <id>aliyunmaven</id>
            <mirrorOf>*,!apache-snapshots</mirrorOf>
            <name>阿里云公共仓库</name>
            <url>https://maven.aliyun.com/repository/public</url>
       </mirror>

(2)启动MySQL,在test库下建表ws

mysql>     
CREATE TABLE `ws` (
  `id` varchar(100) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());


        /**
         * TODO 写入mysql
         * 1、只能用老的sink写法: addsink
         * 2、JDBCSink的4个参数:
         *    第一个参数: 执行的sql,一般就是 insert into
         *    第二个参数: 预编译sql, 对占位符填充值
         *    第三个参数: 执行选项 ---》 攒批、重试
         *    第四个参数: 连接选项 ---》 url、用户名、密码
         */
        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("000000")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );


        sensorDS.addSink(jdbcSink);


        env.execute();
    }
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

5自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/917402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

RAG经验论文《FACTS About Building Retrieval Augmented Generation-based Chatbots》笔记

《FACTS About Building Retrieval Augmented Generation-based Chatbots》是2024年7月英伟达的团队发表的基于RAG的聊天机器人构建的文章。 这篇论文在待读列表很长时间了&#xff0c;一直没有读&#xff0c;看题目以为FACTS是总结的一些事实经验&#xff0c;阅读过才发现FAC…

【Android compose原创组件】在Compose里面实现内容不满一屏也可以触发边界阻尼效果的一种可用方法

创意背景 在安卓 View 传统命令式开发里面提供了非常多稳定美观体验好的组件&#xff0c;但是目前Compose还未有可用的组件&#xff0c;比如View中可以使用 coordinatorlayout 的滚动效果可以实现局部&#xff08;即使内容不满一屏也可以触发滚动边界阻尼效果&#xff09;&…

Android笔记(三十六):封装一个Matrix从顶部/底部对齐的ImageView

背景 ImageView的scaleType默认显示图片是这样&#xff0c;但是有时候设计稿需求希望图片左右能紧贴着ImageView左右边缘&#xff0c;又不破坏图片的比例&#xff0c;用自带的matrix&#xff0c;centerCrop等都可以满足 但是都会造成图片的某些区域被裁剪了&#xff0c;如果设…

docker desktop运行rabittmq容器,控制台无法访问

docker desktop运行rabittmq容器&#xff0c;控制台无法访问 启动过程&#xff1a;…此处缺略&#xff0c;网上一大堆 原因 原因是在Docker上运行的RabbitMQ&#xff0c;默认情况下是没有启用管理插件和管理页面的 解决办法 使用命令 docker exec -it 容器id /bin/bash 进…

重拾CSS,前端样式精读-媒体查询

前言 本文收录于CSS系列文章中&#xff0c;欢迎阅读指正 说到媒体查询&#xff0c;大家首先想到的可能是有关响应式的知识点&#xff0c;除此之外&#xff0c;它还可以用于条件加载资源&#xff0c;字体大小&#xff0c;图像和视频的优化&#xff0c;用户界面调整等等方面&am…

使用 Grafana api 查询 Datasource 数据

一、使用grafana 的api 接口 官方API 二、生成Api key 点击 Administration -》Users and accss -》Service accounts 进入页面 点击Add service account 创建 service account 点击Add service account token 点击 Generate token , 就可以生成 api key 了 三、进入grafana…

uniapp luch-request 使用教程+响应对象创建

1. 介绍 luch-request 是一个基于 Promise 开发的 uni-app 跨平台、项目级别的请求库。它具有更小的体积、易用的 API 和方便简单的自定义能力。luch-request 支持请求和响应拦截、全局挂载、多个全局配置实例、自定义验证器、文件上传/下载、任务操作、自定义参数以及多拦截器…

革新人脸图片智能修复

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;编程探索专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年11月16日20点46分 神秘男子影, 秘而不宣藏。 泣意深不见, 男子自持重, 子夜独自沉。 论文链接 点击开启你的论文编程之旅…

OpenGL ES 文字渲染方式有几种?

在音视频或 OpenGL 开发中,文字渲染是一个高频使用的功能,比如制作一些酷炫的字幕、为视频添加水印、设置特殊字体等等。 实际上 OpenGL 并没有定义渲染文字的方式,所以我们最能想到的办法是:将带有文字的图像上传到纹理,然后进行纹理贴图。 本文分别介绍下在应用层和 C+…

Javaweb-day12(登录认证)

登录功能 登录校验&#xff08;重点&#xff09; 登录校验指的是在服务器接收到浏览器发送过来的请求之后&#xff0c;首先要对这个请求进行校验&#xff0c;先要校验一下用户登录了没有 怎么来实现登录校验的操作呢&#xff1f;具体的实现思路可以分为两部分&#xff1a; 在…

DBeaver中PostgreSQL数据库显示不全的解决方法

本文介绍在DBeaver中&#xff0c;连接PostgreSQL后&#xff0c;数据库显示不全的解决方法。 最近&#xff0c;在DBeaver中连接了本地的PostgreSQL数据库。但是连接后打开这个数据库时发现&#xff0c;其所显示的Databases不全。如下图所示&#xff0c;Databases只显示了一个pos…

计算机视觉 1-8章 (硕士)

文章目录 零、前言1.先行课程&#xff1a;python、深度学习、数字图像处理2.查文献3.环境安装 第一章&#xff1a;概论1.计算机视觉的概念2.机器学习 第二章&#xff1a;图像处理相关基础1.图像的概念2.图像处理3.滤波器4.卷积神经网络CNN5.图像的多层表示&#xff1a;图像金字…

如何使用EasyExcel生成多列表组合填充的复杂Excel示例

作者&#xff1a;Funky_oaNiu 一、&#xff08;需求&#xff09;生成的表格效果&#xff1a;二、搞一个模板文件三、建立对应的表格实体类四、开始填充五、Vue3前端发起请求下载六、官方文档及AI问答 一、&#xff08;需求&#xff09;生成的表格效果&#xff1a; 其中只有顶部…

手机ip地址异常怎么解决

在现代社会中&#xff0c;手机已成为我们日常生活中不可或缺的一部分&#xff0c;无论是工作、学习还是娱乐&#xff0c;都离不开网络的支持。然而&#xff0c;有时我们会遇到手机IP地址异常的问题&#xff0c;这不仅会影响我们的网络体验&#xff0c;还可能带来安全隐患。本文…

Python酷库之旅-第三方库Pandas(218)

目录 一、用法精讲 1021、pandas.DatetimeIndex.inferred_freq属性 1021-1、语法 1021-2、参数 1021-3、功能 1021-4、返回值 1021-5、说明 1021-6、用法 1021-6-1、数据准备 1021-6-2、代码示例 1021-6-3、结果输出 1022、pandas.DatetimeIndex.indexer_at_time方…

基于 CentOS7.6 的 Docker 下载常用的容器(MySQLRedisMongoDB),解决拉取容器镜像失败问题

安装MySQL&Redis&MongoDB mysql选择是8版本&#xff0c;redis是选择4版本、mongoDB选择最新版&#xff0c;也可以根据自己的需要进行下载对应的版本&#xff0c;无非就是容器名:版本号 这样去拉去相关的容器镜像。如果你还不会在服务器中安装 docker&#xff0c;可以查…

讯飞、阿里云、腾讯云:Android 语音合成服务对比选择

在 移动端 接入语音合成方面&#xff0c;讯飞和腾讯云等都是优秀的选择&#xff0c;但各有其特点和优势。咱们的需求是需要支持普通话/英语/法语三种语言&#xff0c;以下是对各个平台的详细比较&#xff1a; 一、讯飞语音合成介绍 与语音听写相反&#xff0c;语音合成是将一段…

设计模式之责任链模式(Chain Of Responsibility)

一、责任链模式介绍 1、责任链模式介绍 职责链模式(chain of responsibility pattern) 定义: 避免将一个请求的发送者与接收者耦合在 一起&#xff0c;让多个对象都有机会处理请求。将接收请求的对象连接成一条链&#xff0c;并且沿着这条链 传递请求&#xff0c;直到有一个对…

游戏引擎学习第12天

视频参考:https://www.bilibili.com/video/BV1yom9YnEWY 这节没讲什么东西&#xff0c;主要是改了一下音频的代码 后面有介绍一些alloc 和malloc,VirtualAlloc 的东西 _alloca 函数&#xff08;或 alloca&#xff09;分配的是栈内存&#xff0c;它的特点是&#xff1a; 生命周…

更改liunx的磁盘名称

目录 1. 问题的提出 2. 机器环境说明 3. 解决方法 1. 问题的提出 今天在Linux上部署软件&#xff0c;发现要部署软件的硬盘名称带中文&#xff0c;当访问该磁盘时&#xff0c;中文则被转为长长的一串数字字符串&#xff0c;这很不方便&#xff0c;于是需要将带有中文的磁盘名…