使用Apache Flink实现MySQL数据读取和写入的完整指南

1. 导言:

Apache Flink是一款功能强大的流式处理引擎,可用于实时处理大规模数据。本文将介绍如何使用Flink与MySQL数据库进行交互,以清洗股票数据为例。

2. 环境准备:

首先,确保已安装Apache Flink并配置好MySQL数据库。导入相关依赖包,并创建必要的Table。同时需要提前创建好mysql表,一行source表,一张sink表。

CREATE TABLE `re_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  `rise` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

CREATE TABLE `t_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
package org.east;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;

object TableETL {
    def main(args: Array[String]): Unit = {
        val senv = StreamExecutionEnvironment.getExecutionEnvironment
            .setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(senv)

        // 定义源表
        val source_table =
            """
            CREATE TEMPORARY TABLE t_stock_code_price (
                id BIGINT NOT NULL,
                code STRING NOT NULL,
                -- 其他字段...
                create_time STRING NOT NULL,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/mydb',
                'driver' = 'com.mysql.cj.jdbc.Driver',
                'table-name' = 't_stock_code_price',
                'username' = 'root',
                'password' = '12345678'
            )
            """.stripMargin

        // 定义目标表
        val sink_table =
            """
            CREATE TEMPORARY TABLE re_stock_code_price (
                id BIGINT NOT NULL,
                code STRING NOT NULL,
                -- 其他字段...
                create_time STRING NOT NULL,
                rise INT,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/mydb',
                'driver' = 'com.mysql.cj.jdbc.Driver',
                'table-name' = 're_stock_code_price',
                'username' = 'root',
                'password' = '12345678'
            )
            """.stripMargin

        tEnv.executeSql(source_table)
        tEnv.executeSql(sink_table)

在这段代码中,我们首先创建了Flink的流式执行环境和StreamTableEnvironment。然后,我们定义了两个临时表,用于存储原始股票数据和清洗后的数据。

3. 数据清洗:

接下来,我们执行数据清洗操作,并将结果写入目标表。

        // 执行清洗操作,并将结果写入目标表
        tEnv.executeSql("INSERT INTO re_stock_code_price " +
            "SELECT *, CASE WHEN change_percent > 0 THEN 1 ELSE 0 END AS rise FROM t_stock_code_price")

在这里,我们计算了股票涨跌情况,并将结果写入到目标表中。在这个例子中,我们假设change_percent字段表示股票价格的变化百分比,rise字段为1表示股票上涨,为0表示股票下跌。

4. 结果展示:

最后,我们查询目标表并打印结果。

在这里插入图片描述
在这里插入图片描述

5. 完整代码:

下面是完整的代码:

package org.east;
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object TableETL {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    val source_table =
      """
        |CREATE TEMPORARY TABLE t_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 't_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin


    val sink_table =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin


    tEnv.executeSql(source_table)
    tEnv.executeSql(sink_table)
    tEnv.executeSql("insert into re_stock_code_price select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")

    val user_DS = tEnv.executeSql("select * from re_stock_code_price")

    user_DS.print()
  }
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/497662.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言例4-35:鸡翁一,值钱五;鸡母一,值钱三;鸡雏三,值钱一。百钱买百鸡、问鸡翁、鸡母和鸡雏各几何?

方法一&#xff1a; 代码如下&#xff1a; //鸡翁一&#xff0c;值钱五&#xff1b;鸡母一&#xff0c;值钱三&#xff1b;鸡雏三&#xff0c;值钱一。百钱买百鸡、问鸡翁、鸡母和鸡雏各几何&#xff1f; //方法一&#xff1a; #include<stdio.h> int main(void) {int x…

Switch 和 PS1 模拟器:3000+ 游戏随心玩 | 开源日报 No.174

Ryujinx/Ryujinx Stars: 26.1k License: MIT Ryujinx 是用 C# 编写的实验性任天堂 Switch 模拟器。 该项目旨在提供出色的准确性和性能、用户友好的界面以及稳定的构建。它已经通过了大约 4050 个测试&#xff0c;其中超过 4000 个可以启动并进入游戏&#xff0c;其中大约 340…

C++:数据类型—字符(9)

什么是字符类型的数据&#xff1a;字符类型用于显示单个字符&#xff0c;比如你的键盘上随便一个字母&#xff0c;就是一个字母 语法&#xff1a;char 变量名 数据值 如&#xff1a;char ch a c和c中字符只占用一个字节 字符变量并不是把字母放到内存中&#xff0c;而是把字…

实战 | 微调训练TrOCR识别弯曲文本

导 读 本文主要介绍如何通过微调训练TrOCR实现弯曲文本识别。 背景介绍 TrOCR&#xff08;基于 Transformer 的光学字符识别&#xff09;模型是性能最佳的 OCR 模型之一。在我们之前的文章中&#xff0c;我们分析了它们在单行打印和手写文本上的表现。 TrOCR—基于Transforme…

java回溯算法笔记

回溯算法综述 回溯用于解决你层for循环嵌套问题&#xff0c;且不剪枝的回溯完全等于暴力搜索。 回溯算法模板https://blog.csdn.net/m0_73065928/article/details/137062099?spm1001.2014.3001.5501 组合问题 不能重复使用的组合问题&#xff08;startindex i1&#xff09…

Mac安装wget流程及异常解决(亲测有效)

目录 1.终端输入wget检查自己是否已经安装过wget,没有安装如下图2. 安装brew1&#xff09;点击brew官网&#xff1a;[官网网址](https://brew.sh)2&#xff09;将命令粘贴到终端&#xff0c;回车执行3&#xff09;输入sudo密码4&#xff09;系统开始自动安装brew&#xff0c;等…

C++的非类型模板参数与模板分离编译(模板显式实例化)

非类型模板参数与模板分离编译&#xff08;模板显式实例化&#xff09; 文章目录 非类型模板参数与模板分离编译&#xff08;模板显式实例化&#xff09;前言一、非类型模板参数二、模版分离编译1. 分离编译概念2. 模版的分离编译问题案例解决方法 总结 前言 ​ 本篇博客文章介…

【python分析实战】成本:揭示电商平台月度开支与成本结构占比 - 过于详细 【收藏】

重点关注本文思路&#xff0c;用python分析&#xff0c;方便大家实验复现&#xff0c;代码每次都用全量的&#xff0c;其他工具自行选择。 全文3000字&#xff0c;阅读10min&#xff0c;操作1小时 企业案例实战欢迎关注专栏 每日更新&#xff1a;https://blog.csdn.net/cciehl/…

深度学习语义分割篇——DeepLabV2原理详解篇

&#x1f34a;作者简介&#xff1a;秃头小苏&#xff0c;致力于用最通俗的语言描述问题 &#x1f34a;专栏推荐&#xff1a;深度学习网络原理与实战 &#x1f34a;近期目标&#xff1a;写好专栏的每一篇文章 &#x1f34a;支持小苏&#xff1a;点赞&#x1f44d;&#x1f3fc;、…

Java八股文(数据结构)

Java八股文の数据结构 数据结构 数据结构 请解释以下数据结构的概念&#xff1a;链表、栈、队列和树。 链表是一种线性数据结构&#xff0c;由节点组成&#xff0c;每个节点包含了指向下一个节点的指针&#xff1b; 栈是一种后进先出&#xff08;LIFO&#xff09;的数据结构&a…

linux中查看内存占用空间

文章目录 linux中查看内存占用空间 linux中查看内存占用空间 使用 df -h 查看磁盘空间 使用 du -sh * 查看每个目录的大小 注意这里是当前目录下的文件大小&#xff0c;查看系统的可以回到根目录 经过查看没有发现任何大的文件夹。 继续下面的步骤 如果您的Linux磁盘已满&a…

安全上网,防止上网被记录(v2ray实现加密通信)

近期听一位亲威说&#xff0c;她在公司休闲的时候上了哪个网站&#xff0c;浏览了过的网站IT部门的人都会知道&#xff0c;这是因为现在大多数网络设备&#xff0c;像路由与交换机都有记录访问网站地址记录功能&#xff0c;涉及还可以设置成记录到交互的内容。要想保密&#xf…

第4章.精通标准提示,引领ChatGPT精准输出

标准提示 标准提示&#xff0c;是引导ChatGPT输出的一个简单方法&#xff0c;它提供了一个具体的任务让模型完成。 如果你要生成一篇新闻摘要。你只要发送指示词&#xff1a;汇总这篇新闻 : …… 提示公式&#xff1a;生成[任务] 生成新闻文章的摘要&#xff1a; 任务&#x…

5.6 物联网RK3399项目开发实录-Android开发之(wulianjishu666)

物联网入门到项目实干案例下载&#xff1a; https://pan.baidu.com/s/1fHRxXBqRKTPvXKFOQsP80Q?pwdh5ug --------------------------------------------------------------------------------------------------------------------------------- U-Boot 使用 前言 RK U-B…

机器学习-生存分析:基于QHScrnomo模型的乳腺癌患者风险评估与个性化预测

一、引言 乳腺癌作为女性常见的恶性肿瘤之一&#xff0c;对女性健康构成威胁。随着医疗技术的不断进步&#xff0c;个性化医疗逐渐成为乳腺癌治疗的重要方向。通过深入研究乳腺癌患者的风险评估和个性化预测&#xff0c;可以帮助医生更准确地制定治疗方案&#xff0c;提高治疗效…

【R语言从0到精通】-1-下载R语言与R最基础内容

在本科&#xff0c;没有人教的情况下&#xff0c;艰难的自学了R语言&#xff0c;因此我想能出一个R语言系列教程&#xff0c;在帮助大家的同时&#xff0c;温故而知新&#xff0c;特别如果你是生物或者医学从业者&#xff0c;那本教程正好合适&#xff0c;因为我也是生物人&…

【计算机网络篇】数据链路层(4.1)可靠传输的相关概念

文章目录 &#x1f354;可靠传输的相关概念⭐分组丢失⭐分组失序⭐分组重复 &#x1f95a;注意 &#x1f354;可靠传输的相关概念 使用差错检测技术&#xff08;例如循环冗余校验CRC&#xff09;&#xff0c;接收方的数据链路层就可以检测出帧在传输过程中是否产生了误码&…

Yarn简介及Windows安装与使用指南

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

“预防儿童烧烫伤”科普安全课堂走进嘉鱼县第一小学

为提高嘉鱼县儿童烧烫伤安全意识、隐患识别能力以及突发应急处置能力&#xff0c;3月26日下午&#xff0c;在中国社会福利基金会烧烫伤关爱公益基金、嘉鱼县妇女联合会、嘉鱼县教育局的支持下&#xff0c;嘉鱼县蒲公英社会工作服务中心走进嘉鱼县第一小学开展预防儿童烧烫伤科普…

Unity2018发布安卓报错 Exception: Gradle install not valid

Unity2018发布安卓报错 Exception: Gradle install not valid Exception: Gradle install not valid UnityEditor.Android.GradleWrapper.Run (System.String workingdir, System.String task, System.Action1[T] progress) (at <c67d1645d7ce4b76823a39080b82c1d1>:0) …