【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、示例:时态表的join(scala版本)
      • 1)、统计需求对应的SQL
      • 2)、Without connnector 实现代码
      • 3)、With CSVConnector 实现代码


本文给以scala的语言给出来Table API 针对时态表的join操作。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要有kafka的运行环境。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、示例:时态表的join(scala版本)

该示例来源于:https://developer.aliyun.com/article/679659
假设有一张订单表Orders和一张汇率表Rates,那么订单来自于不同的地区,所以支付的币种各不一样,那么假设需要统计每个订单在下单时候Yen币种对应的金额。
在这里插入图片描述

1)、统计需求对应的SQL

SELECT o.currency, o.amount, r.rate
  o.amount * r.rate AS yen_amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency

2)、Without connnector 实现代码

object TemporalTableJoinTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    env.setParallelism(1)
// 设置时间类型是 event-time  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 构造订单数据
    val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
    ordersData.+=((2L, "Euro", new Timestamp(2L)))
    ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
    ordersData.+=((50L, "Yen", new Timestamp(4L)))
    ordersData.+=((3L, "Euro", new Timestamp(5L)))

    //构造汇率数据
    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
    ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
    ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
    ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))

// 进行订单表 event-time 的提取
    val orders = env
      .fromCollection(ordersData)
      .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]())
      .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)

// 进行汇率表 event-time 的提取
    val ratesHistory = env
      .fromCollection(ratesHistoryData)
      .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]())
      .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)

// 注册订单表和汇率表
    tEnv.registerTable("Orders", orders)
    tEnv.registerTable("RatesHistory", ratesHistory)
    val tab = tEnv.scan("RatesHistory");
// 创建TemporalTableFunction
    val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency)
//注册TemporalTableFunction
tEnv.registerFunction("Rates",temporalTableFunction)

    val SQLQuery =
      """
        |SELECT o.currency, o.amount, r.rate,
        |  o.amount * r.rate AS yen_amount
        |FROM
        |  Orders AS o,
        |  LATERAL TABLE (Rates(o.rowtime)) AS r
        |WHERE r.currency = o.currency
        |""".stripMargin

    tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery))

    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
    // 打印查询结果
    result.print()
    env.execute()
  }

}

  • OrderTimestampExtractor 实现如下
import java.SQL.Timestamp

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time

class OrderTimestampExtractor[T1, T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
    element._3.getTime
  }
}

3)、With CSVConnector 实现代码

在实际的生产开发中,都需要实际的Connector的定义,下面我们以CSV格式的Connector定义来开发Temporal Table JOIN Demo。

1、genEventRatesHistorySource

def genEventRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#rate",
      "1#US Dollar#102",
      "1#Euro#114",
      "1#Yen#1",
      "3#Euro#116",
      "5#Euro#119",
      "7#Pounds#108"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency","rate"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

2、genRatesOrderSource


def genRatesOrderSource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#amount",
      "2#Euro#10",
      "4#Euro#10"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency", "amount"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

3、主程序

import java.io.File

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.book.utils.{CommonUtils, FileUtils}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row

object CsvTableSourceUtils {

  def genWordCountSource: CsvTableSource = {
    val csvRecords = Seq(
      "words",
      "Hello Flink",
      "Hi, Apache Flink",
      "Apache FlinkBook"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("words"),
      Array(
        Types.STRING
      ),
      fieldDelim = "#",
      rowDelim = "$",
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }


  def genRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "rowtime ,currency   ,rate",
    "09:00:00   ,US Dollar  , 102",
    "09:00:00   ,Euro       , 114",
    "09:00:00  ,Yen        ,   1",
    "10:45:00   ,Euro       , 116",
    "11:15:00   ,Euro       , 119",
    "11:49:00   ,Pounds     , 108"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("rowtime","currency","rate"),
      Array(
        Types.STRING,Types.STRING,Types.STRING
      ),
      fieldDelim = ",",
      rowDelim = "$",
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

  def genEventRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#rate",
      "1#US Dollar#102",
      "1#Euro#114",
      "1#Yen#1",
      "3#Euro#116",
      "5#Euro#119",
      "7#Pounds#108"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency","rate"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

  def genRatesOrderSource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#amount",
      "2#Euro#10",
      "4#Euro#10"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency", "amount"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }


  /**
    * Example:
    * genCsvSink(
    *   Array[String]("word", "count"),
    *   Array[TypeInformation[_] ](Types.STRING, Types.LONG))
    */
  def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes)
  }

}

4、运行结果
在这里插入图片描述

以上,本文给以scala的语言给出来Table API 针对时态表的join操作。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/284084.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring Cloud Function SpEL注入漏洞(CVE-2022-22963)分析

一、概述 2022年3月24日,Pivotal修补了Spring Cloud Function中一个关键的服务器端代码注入漏洞(Spring表达式语言注入),该漏洞有可能导致系统被攻击。Spring是一种流行的开源Java框架,该漏洞与另一个相关的远程代码执…

[区间动态规划] 棋盘分割

题目描述 ​ 将一个8*8的棋盘进行如下分割:将原棋盘割下一块矩形棋盘并使剩下部分也是矩形,再将剩下的部分继续如此分割,这样割了(n−1)次后,连同最后剩下的矩形棋盘共有 n 块矩形棋盘。(每次切割都只能沿…

【SpringBoot】SwaggerKnif4j接口文档集成

[TOC] 序:接口文档 ​ 在开发过程中,接口文档是非常重要的一环,在 Spring Boot 中,我们可以通过集成第三方来实现接口文档的自动生成。 ​ 通过注解来描述接口,然后根据这些注解自动生成接口文档,它不仅…

在Mac上恢复SD卡数据的 6 个有效应用程序

慌!SD卡里的照片和视频不小心删了,Mac设备上还恢复不了数据! 遇到这种情况,你需要的是一款可靠的Mac适用的SD卡恢复软件。我们为你准备了一份最佳的SD卡恢复软件列表,并且还有详细的评论。另外,我们还会给…

WinForm开发 - C# RadioButton(单选框) 设置默认选中或取消默认选中

WinForm开发中RadioButton组件使用过程中的小技巧。 1、属性界面操作 如果有多个组件,希望不显示默认选中单选框只需要将其Checked属性全部设置为False即可, 如果希望默认多个组件中显示默认选中,将其Checked属性设置为True。 2、代码实…

二分算法--x的平方根

个人主页:Lei宝啊 愿所有美好如期而遇 二分算法前言 二分算法原理超详细讲解(包括暴力求解,朴素二分查找,二分查找左右端点):二分查找(非朴素)--在排序数组中查找元素的第一个和最后一个位置https://blog.csdn.net/m0_74824254…

5 个让日常编码更简单的 Python 库

今天我们一起来研究一些非常有用的第三方模块,可以使得我们的日常编码变得更加简单方便 sh https://github.com/amoffat/sh 如果曾经在 Python 中使用过 subprocess 库,那么我们很有可能对它感到失望,它不是最直观的库,可能还有些…

Javascript 循环结构while do while for实例讲解

Javascript 循环结构while do while for实例讲解 目录 Javascript 循环结构while do while for实例讲解 一、while语句 二、do…while语句 三、for循环 疑难解答 我们从上一节课知道,JavaScript循环结构总有3种: (1)while语…

2024年【黑龙江省安全员C证】考试及黑龙江省安全员C证找解析

题库来源:安全生产模拟考试一点通公众号小程序 2024年黑龙江省安全员C证考试为正在备考黑龙江省安全员C证操作证的学员准备的理论考试专题,每个月更新的黑龙江省安全员C证找解析祝您顺利通过黑龙江省安全员C证考试。 1、【多选题】下列属于编制安全检查…

WorkQueue模型

WorkQueues,也被称为任务队列模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时的处理。此时就可以使用work模型:让多个消费者绑定到一个队列&…

动力学约束下的运动规划算法——Kinodynamic RRT*算法

一、RRT * 算法回顾 为了更好的理解Kinodynamic RRT*算法,我们先来回顾一下RRT * 算法 RRT * 先通过Sample函数随机选取一个点Xrand,然后通过Near函数找到当前树上距离Xrand最近的一个点Xnear,再通过Steer函数,沿着从Xnear到Xra…

04 HAL库下使用定时器产生一个中断

目录 一、定时器的相关知识点 1.定时器的定义 2. 查看时钟配置 3. 定时器的分类 二、实验开始 1. 配置一个定时器 2.打开定时器的中断配置 引言 在本文的开头我想给大家分享一下单片机工作的两种工作模式轮询和中断(异步), 中断也叫做…

一文搞懂什么是缓存穿透、缓存雪崩、缓存击穿三个概念,以及解决方案

先理解概念:【注:我们这里说的是分布式、高并发环境】 一、缓存穿透是什么? 缓存穿透是指:请求【可以有很多】的数据在缓存、关系型数据库中都不存在,每次来查询都会查询到关系型数据库中。 解决方案: 1、将…

打破数据孤岛:ChatGPT如何打通金融大数据的任督二脉?

文章目录 一、引言二、ChatGPT与金融大数据分析的融合三、实践应用:ChatGPT在金融大数据分析中的优势与挑战四、案例分析:ChatGPT在金融大数据分析中的应用案例五、前景展望:ChatGPT在金融大数据分析领域的未来发展《AI时代Python金融大数据分…

Qt5 安装教程 - 跳过登录界面

Qt5 安装教程 - 跳过登录界面 引言一、下载二、安装三、使用四、修改、维护、卸载 引言 Qt5.14.2及以前的版本有离线安装包,无需登录 (老版本连登录界面也无)。之后的版本需登录进行在线安装。 本文以Qt5.12.2版本为例,说明如何跳过登录界面&#xff0c…

如何解决企业内部FTP文件传输速度过慢和安全问题

在数据化时代里,企业内部的文件传输永远是刚需,而因为 FTP协议的简单、易用、广泛支持等优点,让很多企业早期都普遍使用,随着数量量的增多,和对安全的要求越来越高,FTP也暴露出了一些列问题,小编…

算法逆袭之路(1)

11.29 开始跟进算法题进度! 每天刷4题左右 ,一周之内一定要是统一类型 而且一定稍作总结, 了解他们的内在思路究竟是怎样的!! 12.24 一定要每天早中晚都要复习一下 早中午每段一两道, 而且一定要是同一个类型, 不然刷起来都没有意义 12.26/27: 斐波那契数 爬…

使用 Tkinter 制作一个进制转换工具,好用!

在平时工作学习当中,我们经常会编写一些简单的 Python GUI 工具,以此来完成各种各样的自动化任务,比如批量处理文件,批量处理图片等等。当我们进行这些工具的编写之时,往往只关注了功能的实现,而忽略了页面…

Android 跨进程之间通信(IPC)方式之ContentProvider

Android 跨进程之间通信 Android 跨进程之间通信(IPC)方式之BroadcastReceiverAndroid 跨进程之间通信(IPC)方式之ContentProvider 文章目录 Android 跨进程之间通信前言一、ContentProvider 是什么?二、如何利用ContentProvider跨进程通信1.创建自定义ContentProv…

伺服电机:原点复位

一、原点复位概念 原点复位指的是,在驱动器使能时,触发原点复位功能后,电机将主动查找零点,完成定位功能。 那么问题来了,什么是原点,什么是零点? 原点:即机械原点,可…