1. 导言:
Apache Flink是一款功能强大的流式处理引擎,可用于实时处理大规模数据。本文将介绍如何使用Flink与MySQL数据库进行交互,以清洗股票数据为例。
2. 环境准备:
首先,确保已安装Apache Flink并配置好MySQL数据库。导入相关依赖包,并创建必要的Table。同时需要提前创建好mysql表,一行source表,一张sink表。
CREATE TABLE `re_stock_code_price` (
`id` bigint NOT NULL AUTO_INCREMENT,
`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
`close` double DEFAULT NULL COMMENT '最新价',
`change_percent` double DEFAULT NULL COMMENT '涨跌幅',
`change` double DEFAULT NULL COMMENT '涨跌额',
`volume` double DEFAULT NULL COMMENT '成交量(手)',
`amount` double DEFAULT NULL COMMENT '成交额',
`amplitude` double DEFAULT NULL COMMENT '振幅',
`turnover_rate` double DEFAULT NULL COMMENT '换手率',
`peration` double DEFAULT NULL COMMENT '市盈率',
`volume_rate` double DEFAULT NULL COMMENT '量比',
`hign` double DEFAULT NULL COMMENT '最高',
`low` double DEFAULT NULL COMMENT '最低',
`open` double DEFAULT NULL COMMENT '今开',
`previous_close` double DEFAULT NULL COMMENT '昨收',
`pb` double DEFAULT NULL COMMENT '市净率',
`create_time` varchar(64) NOT NULL COMMENT '写入时间',
`rise` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
CREATE TABLE `t_stock_code_price` (
`id` bigint NOT NULL AUTO_INCREMENT,
`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
`close` double DEFAULT NULL COMMENT '最新价',
`change_percent` double DEFAULT NULL COMMENT '涨跌幅',
`change` double DEFAULT NULL COMMENT '涨跌额',
`volume` double DEFAULT NULL COMMENT '成交量(手)',
`amount` double DEFAULT NULL COMMENT '成交额',
`amplitude` double DEFAULT NULL COMMENT '振幅',
`turnover_rate` double DEFAULT NULL COMMENT '换手率',
`peration` double DEFAULT NULL COMMENT '市盈率',
`volume_rate` double DEFAULT NULL COMMENT '量比',
`hign` double DEFAULT NULL COMMENT '最高',
`low` double DEFAULT NULL COMMENT '最低',
`open` double DEFAULT NULL COMMENT '今开',
`previous_close` double DEFAULT NULL COMMENT '昨收',
`pb` double DEFAULT NULL COMMENT '市净率',
`create_time` varchar(64) NOT NULL COMMENT '写入时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
package org.east;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;
object TableETL {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
// 定义源表
val source_table =
"""
CREATE TEMPORARY TABLE t_stock_code_price (
id BIGINT NOT NULL,
code STRING NOT NULL,
-- 其他字段...
create_time STRING NOT NULL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = 't_stock_code_price',
'username' = 'root',
'password' = '12345678'
)
""".stripMargin
// 定义目标表
val sink_table =
"""
CREATE TEMPORARY TABLE re_stock_code_price (
id BIGINT NOT NULL,
code STRING NOT NULL,
-- 其他字段...
create_time STRING NOT NULL,
rise INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = 're_stock_code_price',
'username' = 'root',
'password' = '12345678'
)
""".stripMargin
tEnv.executeSql(source_table)
tEnv.executeSql(sink_table)
在这段代码中,我们首先创建了Flink的流式执行环境和StreamTableEnvironment。然后,我们定义了两个临时表,用于存储原始股票数据和清洗后的数据。
3. 数据清洗:
接下来,我们执行数据清洗操作,并将结果写入目标表。
// 执行清洗操作,并将结果写入目标表
tEnv.executeSql("INSERT INTO re_stock_code_price " +
"SELECT *, CASE WHEN change_percent > 0 THEN 1 ELSE 0 END AS rise FROM t_stock_code_price")
在这里,我们计算了股票涨跌情况,并将结果写入到目标表中。在这个例子中,我们假设change_percent字段表示股票价格的变化百分比,rise字段为1表示股票上涨,为0表示股票下跌。
4. 结果展示:
最后,我们查询目标表并打印结果。
5. 完整代码:
下面是完整的代码:
package org.east;
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object TableETL {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
val source_table =
"""
|CREATE TEMPORARY TABLE t_stock_code_price (
| id BIGINT NOT NULL,
| code STRING NOT NULL,
| name STRING NOT NULL,
| `close` DOUBLE,
| change_percent DOUBLE,
| change DOUBLE,
| volume DOUBLE,
| amount DOUBLE,
| amplitude DOUBLE,
| turnover_rate DOUBLE,
| peration DOUBLE,
| volume_rate DOUBLE,
| hign DOUBLE,
| low DOUBLE,
| `open` DOUBLE,
| previous_close DOUBLE,
| pb DOUBLE,
| create_time STRING NOT NULL,
| PRIMARY KEY (id) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/mydb',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'table-name' = 't_stock_code_price',
| 'username' = 'root',
| 'password' = '12345678'
|)
|""".stripMargin
val sink_table =
"""
|CREATE TEMPORARY TABLE re_stock_code_price (
| id BIGINT NOT NULL,
| code STRING NOT NULL,
| name STRING NOT NULL,
| `close` DOUBLE,
| change_percent DOUBLE,
| change DOUBLE,
| volume DOUBLE,
| amount DOUBLE,
| amplitude DOUBLE,
| turnover_rate DOUBLE,
| peration DOUBLE,
| volume_rate DOUBLE,
| hign DOUBLE,
| low DOUBLE,
| `open` DOUBLE,
| previous_close DOUBLE,
| pb DOUBLE,
| create_time STRING NOT NULL,
| rise int,
| PRIMARY KEY (id) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/mydb',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'table-name' = 're_stock_code_price',
| 'username' = 'root',
| 'password' = '12345678'
|)
|""".stripMargin
tEnv.executeSql(source_table)
tEnv.executeSql(sink_table)
tEnv.executeSql("insert into re_stock_code_price select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")
val user_DS = tEnv.executeSql("select * from re_stock_code_price")
user_DS.print()
}
}
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。