【API篇】六、Flink输出算子Sink

文章目录

  • 1、输出到外部系统
  • 2、输出到文件
  • 3、输出到KafKa
  • 4、输出到MySQL(JDBC)
  • 5、自定义Sink输出

Flink做为数据处理引擎,要把最终处理好的数据写入外部存储,为外部系统或应用提供支持。与输入算子Source相对应的,输出算子为Sink。

在这里插入图片描述
前面一直在用的print就是一种Sink,用来将数据流写到控制台打印

在这里插入图片描述

1、输出到外部系统

Flink程序中所有对外的输出操作,利用Sink算子完成

Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法

stream.addSink(new SinkFunction());
//重写SinkFunction接口的invoke方法,用来将指定的值写入到外部系统中
//invoke方法在每条数据记录到来时都会调用。

Flink1.12开始,Sink算子的创建是通过调用DataStream的.sinkTo()方法

stream.sinkTo()

Flink官网为我们提供了一部分的框架的Sink连接器:

Flink官方为我们提供了一部分的框架的Sink连接器

source/sink即可读可写,能做为数据源连接,也能做为下游去输出。

2、输出到文件

先引入Flink流式文件系统的连接器FileSink的依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-files</artifactId>
	<version>${flink.version}</version>
</dependency>

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder):

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)

下面演示实现读往d盘下的tmp目录写数据(tmp目录不用提前创建,不存在会自动创建):

public class SinkFile {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个目录中,都有 并行度个数的 文件是正在写入状态
        env.setParallelism(1);

        // 必须开启checkpoint,否则文件一直都是 .inprogress状态,即正在写入
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

		//生成器模拟一个数据源
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000), //每秒生成1000条数据
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀,new也行,这里展示build方式创建配置对象
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("code9527")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录。ZoneId.systemDefault()即系统默认时区,也可是ZoneId类中的其他时区
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();


        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

运行,看下效果:inprocess,此时文件正在写入数据,不可读。一个这个inprocess文件,因为上面并行度设置的1

在这里插入图片描述

总结:重点还是FileSink对象的创建

  • 输出行/批文件存储的文件,可指定文件路径、文件编码、文件前后缀

  • 按目录分桶,传参的接口实现类对象自选,demo中是按照时间给文件夹命名

  • 特别注意文件滚动策略,是达到指定时间或者文件到达指定大小,是或的关系

  • FileSink对象创建完后,直接流对象调用sinkTo即可完成写入到文件的动作

3、输出到KafKa

添加KafKa连接器的依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka</artifactId>
	<version>${flink.version}</version>
</dependency>

以下用socket模拟无界流,来演示数据输出到KafKa:

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是精准一次,必须开启checkpoint,否则无法写入Kafka
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("node1", 9527);

        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("topic1")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("test-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

关于 Kafka Sink,如果要使用精准一次写入Kafka,需要满足以下条件,缺一不可

  • 开启checkpoint(后续介绍)
  • 设置事务前缀
  • 设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟

如果要指定写入kafka的key,可以自定义序列化器:

  • 实现 一个接口,重写 序列化 方法
  • 指定key,转成 字节数组
  • 指定value,转成 字节数组
  • 返回一个 ProducerRecord对象,把key、value放进去
public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("node1", 9527);
        /**
         *指定写入kafka的key,可以自定义序列化器:
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {

                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");  //输入的测试数据格式为a,b,c,所以这里先分割一下
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("topic1", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("test-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

4、输出到MySQL(JDBC)

添加MySQL驱动依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

在这里插入图片描述

再引入flink-jdbc连接器依赖:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.1-1.17</version>
</dependency>



PS:

教学视频中提到了另一种情况,这里记录下。即:官方还未提供flink-connector-jdbc的某高版本的正式依赖,如1.17.0(当前时间已有),暂时从apache snapshot仓库下,因此引入依赖前,先在pom文件中指定仓库路径

<repositories>
    <repository>
        <id>apache-snapshots</id>  <!--这个id后面setting.xml里有用-->
        <name>apache-snapshots</name>
		<url>https://repository.apache.org/content/repositories/snapshots/</url>
    </repository>
</repositories>

再引入flink-jdbc连接器依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加!apache-snapshots

<mirror>
	<id>aliyunmaven</id>
	<mirrorOf>*,!apache-snapshots</mirrorOf>   <!--即除了apache-snapshots,其余的都去阿里仓库下,!即排除,后面的名称是pom中定义的那个-->
	<name>阿里云公共仓库</name>
	<url>https://maven.aliyun.com/repository/public</url>
</mirror>


根据你的数据类型,建立对应结构的表,这里根据要接收的自定义对象WaterSensor建表test:

mysql>     
CREATE TABLE `ws` (
  `id` varchar(100) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

输出到MySQL的Demo代码:

public class SinkMySQL {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());  //输入的信息映射转为自定义的WaterSensor实体类对象

        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://node01:3306/testDB?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("admin123")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );
        sensorDS.addSink(jdbcSink);
        env.execute();
    }
}

总结: 写入mysql时注意只能用老的sink写法: addsink,此外JdbcSink的4个参数:

  • 第一个参数: 执行的sql,一般就是 insert into搭配占位符
  • 第二个参数: 预编译sql对象, 对占位符填充值
  • 第三个参数: 执行选项 ,比如批次大小、重试时间
  • 第四个参数: 数据库连接选项 , url、用户名、密码

运行,输入数据,查看MySQL:

在这里插入图片描述

5、自定义Sink输出

现有的Flink连接器不能满足需求时,需要自定义连接器进行输出。与Source类似,Flink提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,实现这个接口,就可通过DataStream的.addSink()方法自定义写入任何的外部存储。

public class MySinkFunction implements SinkFunction<String>{

	@Override
	public void invoke(String value, Context context) throws Exception{
		//输出逻辑
		//value即流中的数据,来一条数据,invoke方法就被调用一次(所以不要在这里创建连接对象)
		//如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象
	}
}
stream.addSink(new MySinkFunction<String>());

来一条数据,invoke方法就被调用一次,如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象:

public class MySinkFunction implements RichSinkFunction<String>{

	Connection connection = null;

	@Overrdie
	public void open(Configuration parameters) throws Exception{
		connection = new xxConnection(xx);
	}

	@Override
	public void close() throws Exception{
		super.close();
	}

	@Override
	public void invoke(String value, Context context) throws Exception{
		//输出逻辑
		connection.executeXXX(xxx);
		
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/106268.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VSCode远程连接

1 VSCode 远程连接服务器 1、下载vscode 百度搜索vscode&#xff08;注意不是visual studio&#xff09;&#xff0c;进入vscode官网进行下载。 2、安装ssh插件 根据你的操作系统选择对应的版本进行下载和安装。 安装完成之后&#xff0c;启动vscode&#xff0c;选择左侧Exten…

记一次任意文件下载到Getshell

任意文件下载&#xff08;Arbitrary File Download&#xff09;是一种常见的 Web 攻击技术&#xff0c;用于窃取服务器上任意文件的内容。攻击者利用应用程序中的漏洞&#xff0c;通过构造恶意请求&#xff0c;使应用程序将任意文件&#xff08;如配置文件、敏感数据等&#xf…

WebSocket—STOMP详解(官方原版)

WebSocket协议定义了两种类型的消息&#xff08;文本和二进制&#xff09;&#xff0c;但其内容未作定义。该协议定义了一种机制&#xff0c;供客户端和服务器协商在WebSocket之上使用的子协议&#xff08;即更高级别的消息传递协议&#xff09;&#xff0c;以定义各自可以发送…

构建客户门户的痛点及低代码工具解决方案

企业如何做好数字化转型呢&#xff1f; 如果笼统地说起“数字化转型”&#xff0c;这个概念太大了&#xff0c;它涉及到了企业管理中的方方面面。数字化转型是一个持续不断的过程&#xff0c;既要在整体上进行数字规划&#xff0c;也需要从细节入手&#xff0c;将每一个步骤进…

web安全-原发抗抵赖

原发抗抵赖 原发抗抵赖也称不可否认性&#xff0c;主要表现以下两种形式&#xff1a; 数据发送者无法否认其发送数据的事实。例如&#xff0c;A向B发信&#xff0c;事后&#xff0c;A不能否认该信是其发送的。数据接收者事后无法否认其收到过这些数据。例如&#xff0c;A向B发…

【Linux】开发工具

目录 Linux编译器-gcc/g使用执行命令&#xff1a;我们的.o和库是如何链接的? make/Makefile依赖关系、依赖方法 Linux编译器-gcc/g使用 gcc只能编译c语言&#xff0c;g可以编译c语言也可以编译g 背景知识&#xff1a; 预处理&#xff08;进行宏替换)编译&#xff08;生成汇编)…

Spring MVC 中文文档

1. Spring Web MVC Spring Web MVC是建立在Servlet API上的原始Web框架&#xff0c;从一开始就包含在Spring框架中。正式名称 “Spring Web MVC” 来自其源模块的名称&#xff08; spring-webmvc&#xff09;&#xff0c;但它更常被称为 “Spring MVC”。 与Spring Web MVC并…

海南海口大型钢结构件3D扫描全尺寸三维测量平面度平行度检测-CASAIM中科广电

高精度三维扫描技术已经在大型工件制造领域发挥着重要作用&#xff0c;特别是在质量检测环节&#xff0c;高效、高精度&#xff0c;可以轻松实现全尺寸三维测量。本期&#xff0c;CASAIM要分享的应用是在大型钢结构件的关键部位尺寸及形位公差检测。 钢结构件&#xff0c;是将…

用过才知道AI配音软件有多方便,推荐四款高度好评的配音工具~

配音是平时剪辑视频时经常要做的一步&#xff0c;现在很多视频的背景音都是配音而成的&#xff0c;给大家安利4个好用的配音软件&#xff0c;操作简单&#xff0c;还有很多种音色可以选择&#xff0c;有需要的小伙伴可以操作看看。 1.悦音配音 这是个智能配音的软件&#xff0…

利用nicegui开发ai工具示例

from fastapi import FastAPI import uvicorn from nicegui import uiclass PipRequirement:def __init__(self):ui.label("依赖安装与依赖展示")class BasicSettings:def __init__(self):self.project_select ui.select(["test"], label"项目选择&q…

竞赛 深度学习人体跌倒检测 -yolo 机器视觉 opencv python

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习的人体跌倒检测算法研究与实现 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f947;学长这里给一个题目综合评分(每项满…

Echarts 实现 设备运行状态图(甘特图) 工业大数据展示

let option{tooltip: {formatter: function (params) {let startTime new Date(params.value[1])let endTime new Date(params.value[2]);//北京时间/时间戳转成日常时间function convert(date){var y date.getFullYear();var m date.getMonth() 1;m m < 10 ? "0…

OpenFeign实现分析、源码解析

什么是openfeign? 是springcloud全家桶的组件之一&#xff0c;其核心作用是为Rest API提供高效简洁的rpc调用方式。 为什么只定义接口而没有实现类&#xff1f; 源码解读&#xff08;省略&#xff09; 总结&#xff1a; 源码分析&#xff1a;如何发送http请求&#xff1f; …

美摄AR人像美颜,全新视觉体验

企业越来越重视通过视觉媒体来提升品牌形象和吸引客户。然而&#xff0c;传统的摄影技术往往无法满足企业对于高质量、个性化视觉内容的需求。这时&#xff0c;美摄AR人像美颜解决方案应运而生&#xff0c;它以其独特的技术和优势&#xff0c;为企业带来了全新的视觉体验。 美…

STM32 中断NVIC详解,配置及示例

NVIC全称 Nested Vectored Controller 嵌套向量中断控制器 它是一种硬件设备&#xff0c;用于管理和协调处理器的中断请求。NVIC可以管理多个中断请求&#xff0c;并按优先级处理它们。当一个中断请求到达时&#xff0c;NVIC会确定其优先级并决定是否应该中断当前执行的程序&am…

PHP危险函数

PHP危险函数 文章目录 PHP危险函数PHP 代码执行函数eval 语句assert()语句preg_replace()函数正则表达式里修饰符 回调函数call_user_func()函数array_map()函数 OS命令执行函数system()函数exec()函数shell_exec()函数passthru() 函数popen 函数反引号 实列 通过构造函数可以执…

ps2024滤镜插件Portraiture

Photoshop 是最常用到的综合性的设计工具&#xff0c;虽然PS一直在迭代升级&#xff0c;但是在细节功能上&#xff0c;PS总是无法完全满足全部所有的用户需求&#xff0c;今天coco玛奇朵推荐一个个截至目前最受欢迎的免费的PS插件&#xff0c;有了这些功能扩展的插件后PS如虎添…

openGauss学习笔记-107 openGauss 数据库管理-管理用户及权限-三权分立

文章目录 openGauss学习笔记-107 openGauss 数据库管理-管理用户及权限-三权分立107.1 默认的用户权限107.2 三权分立较非三权分立权限变化说明 openGauss学习笔记-107 openGauss 数据库管理-管理用户及权限-三权分立 默认权限机制和管理员两节的描述基于的是openGauss创建之初…

Java练习题2021-4

"某游戏公司设计了一个奖励活动&#xff0c;给N个用户(1≤N≤10^7)连续编号为1到N&#xff0c;依据用户的编号S发放奖励。 发放奖励规则为&#xff1a; 公司随机设定三个非零正整数x&#xff0c;y&#xff0c;z。 如果S同时是x、y的倍数&#xff0c;奖励2张卡片&#xff1…

中间件安全-CVE 复现K8sDockerJettyWebsphere漏洞复现

目录 服务攻防-中间件安全&CVE 复现&K8s&Docker&Jetty&Websphere中间件-K8s中间件-Jetty漏洞复现CVE-2021-28164-路径信息泄露漏洞CVE-2021-28169双重解码信息泄露漏洞CVE-2021-34429路径信息泄露漏洞 中间件-Docker漏洞复现守护程序 API 未经授权访问漏洞…