[flink 实时流基础] 输出算子(Sink)

学习笔记
Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。
image.png


文章目录

      • **连接到外部系统**
      • **输出到文件**
      • 输出到 Kafka
      • 输出到 mysql
      • 自定义 sink

连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/
image.png

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
image.png
除此以外,就需要用户自定义实现sink连接器了。

输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);
        
        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();
        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

输出到 Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);

        /**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("atguigu-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);


        /**
         * 如果要指定写入kafka的key,可以自定义序列化器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,
        hadoop104:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {

                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

输出到 mysql

写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:

<repositories>
    <repository>
        <id>apache-snapshots</id>
        <name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
    </repository>
</repositories>

添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:

<mirror>
    <id>aliyunmaven</id>
    <mirrorOf>*,!apache-snapshots</mirrorOf>
    <name>阿里云公共仓库</name>
    <url>https://maven.aliyun.com/repository/public</url>
</mirror>

(2)启动MySQL,在test库下建表ws

mysql>
CREATE TABLE ws (
id varchar(100) NOT NULL,
ts bigint(20) DEFAULT NULL,
vc int(11) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

public class SinkMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());


/**
     * TODO 写入mysql
     * 1、只能用老的sink写法: addsink
     * 2、JDBCSink的4个参数:
     *    第一个参数: 执行的sql,一般就是 insert into
     *    第二个参数: 预编译sql, 对占位符填充值
     *    第三个参数: 执行选项 ---》 攒批、重试
     *    第四个参数: 连接选项 ---》 url、用户名、密码
     */
SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
    "insert into ws values(?,?,?)",
    new JdbcStatementBuilder<WaterSensor>() {
        @Override
        public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
            //每收到一条WaterSensor,如何去填充占位符
            preparedStatement.setString(1, waterSensor.getId());
            preparedStatement.setLong(2, waterSensor.getTs());
            preparedStatement.setInt(3, waterSensor.getVc());
        }
    },
    JdbcExecutionOptions.builder()
    .withMaxRetries(3) // 重试次数
    .withBatchSize(100) // 批次的大小:条数
    .withBatchIntervalMs(3000) // 批次的时间
    .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
    .withUsername("root")
    .withPassword("000000")
    .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
    .build()
);


sensorDS.addSink(jdbcSink);


env.execute();
}
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

自定义 sink

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/508283.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入了解 Vue 3 中的 Keyframes 动画

在本文中&#xff0c;我们将探讨如何在 Vue 3 中实现 Keyframes 动画。Keyframes 动画允许我们通过定义关键帧来创建复杂的动画效果&#xff0c;从而为用户提供更吸引人的界面体验。 transition动画适合用来创建简单的过渡效果。CSS3中支持使用animation属性来配置更加复杂的动…

学习笔记——《计算机组成原理》

框图 第一章 总线 1、什么是总线&#xff1f; 总线是连接各个部件的信息传输线&#xff0c;是各个部件共享的传输介质。 2、总线特点&#xff1f; 相对于一对一的连线来说&#xff0c;可拓展性更好&#xff0c;也更省空间&#xff0c;但是某个时刻只能有一对部件进行通信。 3、…

如何将平板或手机作为电脑的外接显示器?

先上官网链接&#xff1a;ExtensoDesk 家里有一台华为平板&#xff0c;自从买回来以后除了看视频外&#xff0c;基本没什么作用&#xff0c;于是想着将其作为我电脑的第二个屏幕&#xff0c;提高我学习办公的效率&#xff0c;废物再次利用。最近了解到华为和小米生态有多屏协同…

基于深度学习的停车场车辆检测算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 上图测试结果如下图所示&#xff1a; 2.算法运行软件版本 matlab2022a 3.部分核心程序 image imread(image_test\test.jpg); image2 image;%图…

关于OcenaBase v4.2中,分区转移和负载均衡的技术解读

OceanBase​​​​​​​​​​​​​​作为一款原生分布式数据库&#xff0c;其核心的技术特性之一是高可扩展性&#xff0c;其具体表现在两个方面&#xff1a; 首先&#xff0c;是灵活的扩缩容能力&#xff0c;包括垂直扩缩容和水平扩缩容&#xff1a; 垂直扩缩容&#xff…

探究云手机的海外原生IP优势

随着全球数字化进程的加速&#xff0c;企业越来越依赖于网络来扩展其业务。在这个数字时代&#xff0c;云手机作为一种创新的通信技术&#xff0c;已经成为了企业网络优化的重要组成部分。云手机支持海外原生IP的特性&#xff0c;为企业在国际市场上的拓展提供了全新的可能性。…

【Node.js从基础到高级运用】二十、Node.js 强大的REPL

引言 Node.js REPL&#xff08;Read-Eval-Print Loop&#xff09;是一种交互式的命令行工具&#xff0c;它允许开发者快速地执行JavaScript代码&#xff0c;并查看结果。这个功能在进行快速原型设计、调试、学习JavaScript或Node.js时非常有用。 启动REPL 首先&#xff0c;确保…

高度不同的流体瀑布css实现方法

商城商品列表 实现瀑布流展示&#xff0c;通过flex或grid实现会导致每行中的列高度一致&#xff0c;无法达到错落有致的感觉&#xff1b; 为此需要用到&#xff1a; CSS columns 属性 columns 属性是一个简写属性&#xff0c;用于设置列宽和列数。 CSS 语法 columns: column-wi…

Java类和对象练习题

练习一 下面代码的运行结果是&#xff08;&#xff09; public static void main(String[] args){String s;System.out.println("s"s);} 解析&#xff1a;本题中的代码不能编译通过&#xff0c;因为在Java当中局部变量必须先初始化&#xff0c;后使用。所以此处编译不…

信息安全技术基础知识总结

一、信息安全基础知识 信息安全基本要素&#xff1a; 1. 机密性&#xff08;C&#xff09;&#xff1a;确保信息不暴露给未授权的实体或进程 2. 完整性&#xff08;I&#xff09;&#xff1a;只有得到允许的人才能修改数据&#xff0c;并且能够判别出数据是否已被篡改 3. 可用性…

论文笔记✍GS3D- An Efficient 3D Object Detection Framework for Autonomous Driving

论文笔记✍GS3D: An Efficient 3D Object Detection Framework for Autonomous Driving &#x1f4dc; Abstract &#x1f528; 主流做法限制 &#xff1a; 我们在自动驾驶场景中提出了一种基于单个 RGB 图像的高效 3D 物体检测框架。我们的工作重点是提取 2D 图像中的底层 3…

降低项目延期概率的5大注意事项

降低项目延期概率对项目非常重要。因为项目延期往往会导致成本增加&#xff0c;降低客户满意度&#xff0c;影响企业在市场上的竞争力&#xff0c;造成资源浪费。因此&#xff0c;我们需要降低项目延期概率&#xff0c;实现企业长远发展。 而降低项目延期概率&#xff0c;一般来…

java基础之高级面试-2024

抽象类和接口有什么区别 定义和设计&#xff1a;抽象类是使用abstract关键字定义的类&#xff0c;可以包含抽象方法和非抽象方法&#xff0c;可以有实例变量和构造方法&#xff1b;接口通过interface关键字定义&#xff0c;只能包含抽象方法、默认方法和静态方法&#xff0c;不…

基于ssm的家政服务中介网(java项目+文档+源码)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的闲一品交易平台。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 家政服务中介网的主要使用者分为…

【二叉树】Leetcode 230. 二叉搜索树中第K小的元素【中等】

二叉搜索树中第K小的元素 给定一个二叉搜索树的根节点 root &#xff0c;和一个整数 k &#xff0c;请你设计一个算法查找其中第 k 个最小元素&#xff08;从 1 开始计数&#xff09;。 示例1&#xff1a; 输入&#xff1a;root [3,1,4,null,2], k 1 输出&#xff1a;1 解…

【iOS ARKit】3D 视频

在AR 中播放视频也是一种常见的需求&#xff0c;如在一个展厅中放置的虚拟电视上播放宣传视频&#xff0c;或者在游戏中为营造氛围而设置的虚拟电视视频播放&#xff0c;或者在识别的2D个人名片上播放自我介绍视频&#xff0c;因视频具有静态图像无法比拟的综合信息展示能力&am…

【学习笔记】java项目—苍穹外卖day02

文章目录 苍穹外卖-day02课程内容1. 新增员工1.1 需求分析和设计1.1.1 产品原型1.1.2 接口设计1.1.3 表设计 1.2 代码开发1.2.1 设计DTO类1.2.2 Controller层1.2.3 Service层接口1.2.4 Service层实现类1.2.5 Mapper层 1.3 功能测试1.3.1 接口文档测试1.3.2 前后端联调测试 1.4 …

深度卷积神经网络(AlexNet)

文章目录 简介 简介 AlexNet由八层组成&#xff1a;五个卷积层、两个全连接隐藏层和一个全连接输出层。 AlexNet使用ReLU而不是sigmoid作为其激活函数。 import torch from torch import nnnet nn.Sequential(# 这里使用一个11*11的更大窗口来捕捉对象。# 同时&#xff0c;…

面试题:MySQL 优化篇

定位慢查询 &#x1f496; 开源工具 调试工具&#xff1a;Arthas&#xff08;阿尔萨斯&#xff09;运维工具&#xff1a;Prometheus&#xff08;普罗米修斯&#xff09;、Skywalking &#x1f496; MySQL 慢查询日志 # 开启 MySQL 慢查询日志开关 slow_query_log1 # 设置慢…

软件测试-基础篇

目录 1 软件测试的生命周期2 软件测试&软件开发生命周期3 如何描述一个bug4 如何定义bug的级别5 bug的生命周期5.1 bug状态转换图 6 如何开始第一测试7 测试的执行和BUG管理7.1 如何发现更多的bug 8 产生争执怎么办&#xff08;处理人际关系&#xff09; 1 软件测试的生命周…