状态的一致性和FlinkSQL

状态一致性

一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次,但是结果只有一个。
三个级别:

  1. 最多一次:1次或0次,有可能丢数据
  2. 至少一次:1次或n次,出错可能会重试
    • 输入端只要可以做到数据重放,即在出错后,可以重新发送一样的数据
  3. 精确一次:数据只会发送1次
    • 幂等写入:多次重复操作不影响结果,有可能出现某个值由于数据重放,导致结果回到原先的值,然后逐渐恢复。
    • 预写日志:
      1. 先把结果数据作为日志状态保存起来
      2. 进行检查点保存时,也会将这些结果数据一并做持久化存储
      3. 在收到检查点完成的通知时,将所有结果数据一次性写入外部系统
    • 预写日志缺点:这种再次确认的方式,如果写入成功返回的ack出现故障,还是会出现数据重复。
    • 两阶段提交(2PC):数据写入过程和数据提交分为两个过程,如果写入过程没有发生异常,就将事务进行提交。
      • 算子节点在收到第一个数据时,就开启一个事务,然后提交数据,在下一个检查点到达前都是预写入,如果下一个检查点正常,再进行最终提交。
      • 对外部系统有一定的要求,要能够识别事务ID,事务的重复提交应该是无效的。
      • 即barrier到来时,如果结果一致,就提交事务,否则进行事务回滚

Flink和Kafka连接时的精确一次保证

  • 开启检查点
  • 开启事务隔离级别,读已提交
  • 注意设置kafka超时时间为10分钟
public class Flink02_KafkaToFlink {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        //开启检查点
        env.enableCheckpointing(1000L);

        //kafka source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092")
                .setGroupId("flinkb")
                .setTopics("topicA")
                //优先使用消费者组 记录的Offset进行消费,如果offset不存在,根据策略进行重置
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                //如果还有别的配置需要指定,统一使用通用方法
                .setProperty("isolation.level", "read_committed")
                .build();

        DataStreamSource<String> ds = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");

        //处理过程


        //kafka Sink
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("first")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )

                //语义
                //AT_LEAST_ONCE:至少一次,表示数据可能重复,需要考虑去重操作
                //EXACTLY_ONCE:精确一次
                //kafka transaction timeout is larger than broker
                //kafka超时时间:1H
                //broker超时时间:15分钟

//                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据传输的保障
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)//数据传输的保障
                .setTransactionalIdPrefix("flink"+ RandomUtils.nextInt(0,100000))
//                .setProperty(ProducerConfig.RETRIES_CONFIG,"10")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"60*1000*10")//10分钟
                .build();

        ds.map(
                JSON::toJSONString
        ).sinkTo(kafkaSink);//写入到kafka 生产者

        ds.sinkTo(kafkaSink);

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

FlinkSQL1.17

FlinkSQL不同版本的接口仍在变化,有变动查看官网。
在官网这个位置可以查看Flink对于以来的一些官方介绍。
在这里插入图片描述
Table依赖剖析
三个依赖:
1. flink-table-api-java-uber-1.17.2.jar (所有的Java API)
2. flink-table-runtime-1.17.2.jar (包含Table运行时)
3. flink-table-planner-loader-1.17.2.jar (查询计划器,即SQL解析器)

静态导包:在import后添加static,并在类后面加上*导入全部。主要是为了方便使用下面的 $ 方法,否则 $ 方法前面都要添加Expressions的类名前缀

table.where($("vc").isGreaterOrEqual(100))
                .select($("id"),$("vc"),$("ts"))
                .execute()
                .print();

程序架构

  1. 准备环境
    • 流表环境:基于流创建表环境
    • 表环境:从操作层面与流独立,底层处理还是流
  2. 创建表
    • 基于流:将流转换为表
    • 连接器表
  3. 转换处理
    • 基于Table对象,使用API进行处理
    • 基于SQL的方式,直接写SQL处理
  4. 输出
    • 基于Table对象或连接器表,输出结果
    • 表转换为流,基于流的方式输出

流处理中的表

  • 处理的数据对象
    • 关系:字段元组的有界集合
    • 流处理:字段元组的无限序列
  • 对数据的访问
    • 关系:可以得到完整的
    • 流处理:数据是动态的

因此处理过程中的表是动态表,必须要持续查询。

流表转换

持续查询

  • 追加查询:窗口查询的结果通过追加的方式添加到表的末尾,使用toDataStream
  • 更新查询:窗口查询的结果会对原有的结果进行修改, 使用toChangeLogStream
  • 如果不清楚是什么类型,直接使用toChangeLogSteam()将表转换为流
public class Flink04_TableToStreamQQ {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888)
                .map(
                        line -> {
                            String[] fields = line.split(",");
                            return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                        }
                );

        Table table = tableEnv.fromDataStream(ds);

        tableEnv.createTemporaryView("t1", table);

        //SQL
        String appendSQL = "select user, url, ts from t1 where user <> 'zhangsan'";
        //需要在查询过程中更新上一次的值
        String updateSQL = "select user, count(*) cnt from t1 group by user";

        Table resultTable = tableEnv.sqlQuery(updateSQL);

        //表转换为流
        //doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user], select=[user, COUNT(*) AS cnt])
//        DataStream<Row> rowDs = tableEnv.toDataStream(resultTable);

        //有更新操作时,使用toChangelogStream(),它即支持追加,也支持更新查询
        DataStream<Row> rowDs = tableEnv.toChangelogStream(resultTable);

        rowDs.print();

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

将动态表转换为流

  • 仅追加流:如果表的结果都是追加查询
  • Retract撤回流:
    • 包含两类消息,添加消息和撤回消息
    • 下游需要根据这两类消息进行处理
  • 更新插入流:
    • 两种消息:更新插入消息(带key)和删除消息

连接器

  • DataGen和Print连接器
public class Flink01_DataGenPrint {
    public static void main(String[] args) {
        //TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
        //1. 准备表环境, 基于流环境,创建表环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);



        //DataGen
        String createTable =
                " create table t1 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT " +
                        " ) WITH (" +
                        "  'connector' = 'datagen' ,"  +
                        "  'rows-per-second' = '1' ," +
                        "  'fields.id.kind' = 'random' , " +
                        "  'fields.id.length' = '6' ," +
                        "  'fields.vc.kind' = 'random' , " +
                        "  'fields.vc.min' = '100' , " +
                        "  'fields.vc.max' = '1000' ," +
                        "  'fields.ts.kind' = 'sequence' , " +
                        "  'fields.ts.start' = '1000000' , " +
                        "  'fields.ts.end' = '100000000' " +
                        " )" ;


        tableEnv.executeSql(createTable);

        //Table resultTable = tableEnv.sqlQuery("select * from t1 where vc >= 200");
        //.execute().print();

        //print
        String sinkTable =
                "create table t2(" +
                        "id string," +
                        "vc int," +
                        "ts bigint" +
                        ") with (" +
                        "   'connector' = 'print', " +
                        "   'print-identifier' = 'print>' " +
                       ")";
        tableEnv.executeSql(sinkTable);
        tableEnv.executeSql("insert into t2 select id, vc, ts from t1 where vc >= 200");
    }
}
  • 文件连接器
public class Flink02_FileConnector {
    public static void main(String[] args) {
        TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());

        //FileSource
        String sourceTable =
                " create table t1 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT," +
                        //"  `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号
                        "  `file.size` bigint not null METADATA" +
                        " ) WITH (" +
                        "  'connector' = 'filesystem' ,"  +
                        "  'path' = 'input/ws.txt' ,"  +
                        "  'format' = 'csv' "  +
                        " )" ;

        tableEnvironment.executeSql(sourceTable);

        //tableEnvironment.sqlQuery(" select * from t1 ").execute().print();

        //转换处理...

        //File sink
        String sinkTable =
                " create table t2 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT," +
                        //"  `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号
                        "  file_size bigint" +
                        " ) WITH (" +
                        "  'connector' = 'filesystem' ,"  +
                        "  'path' = 'output' ,"  +
                        "  'format' = 'json' "  +
                        " )" ;

        tableEnvironment.executeSql(sinkTable);

        tableEnvironment.executeSql("insert into t2 " +
                "select id, vc, ts, `file.size` from t1");
    }
}
  • kafka连接器
public class Flink03_KafkaConnector {
    public static void main(String[] args) {
        TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());

        //kafka source
        String sourceTable =
                " create table t1 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT," +
                        "  `topic` string not null METADATA," +
                        "  `partition` int not null METADATA," +
                        "  `offset` bigint not null METADATA" +
                        " ) WITH (" +
                        "  'connector' = 'kafka' ,"  +
                        "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ,"  +
                        "  'topic' = 'topicA', "  +
                        "  'properties.group.id' = 'flinksql', "  +
                        "  'value.format' = 'csv', "  +
                        "  'scan.startup.mode' = 'group-offsets',"  +
                        "  'properties.auto.offset.reset' = 'latest' "  +
                        " )" ;

        //创建表
        tableEnvironment.executeSql(sourceTable);

        //打印查询结果
        //tableEnvironment.sqlQuery(" select * from t1 ").execute().print();

        //转换处理...

        //kafka Sink
        String sinkTable =
                " create table t2 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT," +
                        "  `topic` string " +
                        " ) WITH (" +
                        "  'connector' = 'kafka' ,"  +
                        "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ,"  +
                        "  'topic' = 'topicB', "  +
                        "  'sink.delivery-guarantee' = 'at-least-once', "  +
                        //"  'properties.transaction.timeout.ms' = '', "  +
                        //"  'sink.transactional-id-prefix' = 'xf', "  +
                        //"  'properties.group.id' = 'flinksql', "  +
                        "  'value.format' = 'json' "  +
                        //"  'scan.startup.mode' = 'group-offsets',"  +
                        //"  'properties.auto.offset.reset' = 'latest' "  +
                        " )" ;

        tableEnvironment.executeSql(sinkTable);

        tableEnvironment.executeSql("insert into t2 " +
                "select id, vc, ts, `topic` from t1");


    }
}
  • Jdbc连接器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/247731.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue3 添加编辑页使用 cron 表达式生成

示例效果图 1、添加组件 <template><div class"v3c"><ul class"v3c-tab"><li class"v3c-tab-item" :class"{ v3c-active: tabActive 1 }" click"onHandleTab(1)">秒</li><li class&qu…

【Linux系统编程二十一】:(进程通信3)--消息队列/信号量(system v标准的内核数据结构的设计模式)

【Linux系统编程二十】&#xff1a;消息队列/信号量(system v标准的内核数据结构的设计模式&#xff09; 一.消息队列二.system v标准的内核数据结构的设计三.四个概念(互斥/临界)四.信号量1.多线程并发访问2.计数器3.原子的4.总结 一.消息队列 一个叫做a进程啊&#xff0c;一个…

乐益达教育网页

目录 一、网页效果 二、html代码 三、CSS代码 四、JS代码 一、网页效果 二、html代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, in…

Milesight VPN server.js 任意文件读取漏洞(CVE-2023-23907)

0x01 产品简介 MilesightVPN 是一款软件&#xff0c;一个 Milesight 产品的 VPN 通道设置过程更加完善&#xff0c;并可通过网络服务器界面连接状态。 0x02 漏洞概述 MilesightVPN server.js接口处存在文件读取漏洞&#xff0c;攻击者可通过该漏洞读取系统重要文件&#xff…

LeetCode-42. 接雨水【栈 数组 双指针 动态规划 单调栈】

LeetCode-42. 接雨水【栈 数组 双指针 动态规划 单调栈】 题目描述&#xff1a;解题思路一&#xff1a;单调栈&#xff0c;维护一个单调递减栈。每当遇到当前元素大于栈顶元素就出栈&#xff0c;在出栈时更新答案。当遇到出栈的情况&#xff0c;若单调栈栈左边有一个元素则必有…

Java医院信息化建设云HIS系统源码

云HIS提供标准化、信息化、可共享的医疗信息管理系统&#xff0c;实现医患事务管理和临床诊疗管理等标准医疗管理信息系统的功能。优化就医、管理流程&#xff0c;提升患者满意度、基层首诊率&#xff0c;通过信息共享、辅助诊疗等手段&#xff0c;提高基层医生的服务能力构建和…

Ubuntu20.04 下编译安装 ffmpeg 和 ffplay

Ubuntu20.04 下编译安装 ffmpeg 和 ffplay 一、下载源码包二、安装依赖库三、编译四、添加环境变量五、验证是否成功六、问题 一、下载源码包 1.1 官方下载链接&#xff1a;http://ffmpeg.org/download.html 最新版本为6.1&#xff0c;点击 Download Source Code下载即可 &…

【深度学习】强化学习(二)马尔可夫决策过程

文章目录 一、强化学习问题1、交互的对象2、强化学习的基本要素3、策略&#xff08;Policy&#xff09;4、马尔可夫决策过程1. 基本元素2. 交互过程的表示3. 马尔可夫过程&#xff08;Markov Process&#xff09;4. 马尔可夫决策过程&#xff08;MDP&#xff09;5. 轨迹的概率计…

监控pod 容器外网请求网络带宽,过滤掉内网、基于k8spacket开发、prometheus开发export

首先安装k8spacket 安装k8spacket遇到问题&#xff0c;下载插件一直能不能下载成功&#xff0c;pod不能启动。所有手动下载处理。 helm repo add k8spacket https://k8spacket.github.io/k8spacket-helm-chart helm pull k8spacket/k8spacket打开values.yaml 文件 手动下载插…

Axure元件库的介绍以及个人简介和登录界面案例展示

目录 一. 元件介绍 二. 基本元件的使用 2.1 形状元件 2.2 图片元件 2.3 占位符 2.4 文本 2.5 线段元件 2.6 热区文件 三. 表单元件的使用 3.1 文本框 3.2 文本域 3.3 下拉列表 3.4 列表框 3.5 复选框 3.6 单选按钮 四. 菜单与表格元件的使用 4.1 树 4.2 表格…

【CSS】用 CSS 写一个渐变色边框的输入框

Using_CSS_gradients MDN 多渐变色输入框&#xff0c;群友问了下&#xff0c;就试着写了下&#xff0c;看了看 css 渐变色 MDN 文档&#xff0c;其实很简单&#xff0c;代码记录下&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta ch…

2024美赛备战-美赛必备技能(matlab 和SPSS入门必备)

( 一 )Matlab 1.数值计算和符号计算功能 Matlab 以矩阵作为数据操作的基本单位&#xff0c;它的指令表达式与数学、工程中 常用的符号、表达式十分相似&#xff0c;故用Matlab 来解算问题要比用C、FORTRAN 等 语 言完成相同的事情简捷得多&#xff0c;使学者易于学习和掌握…

python如何发送企业微信群消息

一、创建机器人&#xff0c;并获取webhook 1.1 进入企业微信中&#xff0c;添加群机器人&#xff0c;添加完成后可以获取到一个webhook的地址 1.2 群机器人企业微信接口的调用可以参考这个文件 https://developer.work.weixin.qq.com/document/path/99110#%E5%A6%82%E4%BD%…

【UE5.2】从零开始控制角色移动、游泳、下潜、上浮

目录 效果 步骤 一、项目准备 二、控制角色移动 三、控制角色游泳 四、实现角色潜水、上浮 五、解决在水面上浮的Bug 效果 步骤 一、项目准备 1. 新建一个空白工程&#xff0c;创建一个Basic关卡&#xff0c;添加第三人称游戏资源到内容浏览器 2. 在插件中启用“W…

[C++]——学习模板

了解模板——初阶 前言&#xff1a;一、模板1.1 什么是模板1.2 模板的概念1.3 模板可以做什么1.4 泛型模板 二、函数模板2.1 函数模板概念和格式2.2 函数模板原理2.3 函数模板实例化2.3.1 隐式实例化2.3.2 显式实例化 2.4 模板参数的匹配原则2.5 函数模板声明定义分离 三、类模…

YOLOv8改进 | 2023Neck篇 | 轻量级跨尺度特征融合模块CCFM(附yaml文件+添加教程)

一、本文介绍 本文给大家带来的改进机制是轻量级跨尺度特征融合模块CCFM&#xff08;Cross-Scale Feature Fusion Module&#xff09;其主要原理是&#xff1a;将不同尺度的特征通过融合操作整合起来&#xff0c;以增强模型对于尺度变化的适应性和对小尺度对象的检测能力。我将…

电子学会C/C++编程等级考试2021年03月(六级)真题解析

C/C++等级考试(1~8级)全部真题・点这里 第1题:生日相同 2.0 在一个有180人的大班级中,存在两个人生日相同的概率非常大,现给出每个学生的名字,出生月日。试找出所有生日相同的学生。 时间限制:1000 内存限制:65536输入 第一行为整数n,表示有n个学生,n ≤ 180。此后每…

Linux中的fork()函数

目录 1.现象 2.如何实现的&#xff1f; 1.现象 1.fork()函数是用来创建一个字进程的&#xff1a; 如果这个进程是子进程&#xff0c;那么返回值返回0&#xff0c;如果是父进程的话&#xff0c;那么返回子进程的pid&#xff0c;以便父进程找到子进程&#xff0c;因为子进程的p…

理解数字化转型:3个阶段、2个分类和3类价值

导读&#xff1a;数字化转型是基于IT技术提供业务所需要的支持&#xff0c;让业务和技术真正产生交互而诞生的。我们可以从概念及内涵、分类、价值等多个维度来理解企业数字化转型。 01 数字化转型的概念及内涵 数字化转型运用5G、人工智能、大数据、云计算等新一代数字技术&a…

【信息学奥赛】拼在起跑线上,想入道就别落下自己!

编程无难事&#xff0c;只怕有心人&#xff0c;学就是了&#xff01; 文章目录 1 信息学奥赛简介2 信息学竞赛的经验回顾3 优秀参考图书推荐《信息学奥赛一本通关》4 高质量技术圈开放 1 信息学奥赛简介 信息学奥赛&#xff0c;作为全国中学生学科奥林匹克“五大学科竞赛”之一…