使用Flink实现MySQL到Kafka的数据流转换

使用Flink实现MySQL到Kafka的数据流转换

本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka,这是一个常见的用例,适用于需要实时数据connector的场景。
在这里插入图片描述

环境准备

在开始之前,确保你的环境中已经安装了以下软件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>EastMoney</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

</project>

MySQL数据库,初始化mysql表

CREATE TABLE `t_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

Kafka消息队列

1. 启动zookeeper
 zkServer start
2. 启动kafka服务
 kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topic
 kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
4. 消费数据
 kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic east_money --from-beginning

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

定义MySQL数据源表:我们使用一个SQL语句创建了一个临时表t_stock_code_price,这个表代表了我们要从MySQL读取的数据结构和连接信息。

val source_table =
      """
        |CREATE TEMPORARY TABLE t_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 't_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin

    tEnv.executeSql(source_table)

定义Kafka目标表:然后,我们定义了一个Kafka表re_stock_code_price_kafka,指定了Kafka的连接参数和表结构。

tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

数据转换和写入:最后,我们执行了一个插入操作,将从MySQL读取的数据转换(这里通过case when语句添加了一个新字段rise)并写入到Kafka中。这个可以实现任何的sql etl 来满足我们的需求。

    tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")

全部代码

package org.east

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Mysql2Kafka {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    val source_table =
      """
        |CREATE TEMPORARY TABLE t_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 't_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin

    tEnv.executeSql(source_table)

    val result = tEnv.executeSql("select * from t_stock_code_price")
    result.print()


    tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )
    tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")

  }
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/503404.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

再次加深理解Java中的并发编程

目录 一、线程、进程、程序 二、线程状态 三、线程的七大参数 四、lock与synchronized锁机制 一&#xff09;、lock与synchronized锁区别 二&#xff09;、synchronized锁原理 三&#xff09;、Lock锁原理 五、synchronized锁升级原理 一&#xff09;、锁升级基础知识 …

超文本传输协议HTTP

HTTP协议 在网络通信中&#xff0c;我们可以自己进行定制协议&#xff0c;但是也有许多已经十分成熟的应用层协议&#xff0c;比如我们下面说的HTTP协议。 HTTP协议简介 HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;协议又叫做超文本传输协议&#xff0c;是一…

JAVAEE之网络编程

1.网络编程 网络编程&#xff0c;指网络上的主机&#xff0c;通过不同的进程&#xff0c;以编程的方式实现网络通信&#xff08;或称为网络数据传输&#xff09;。 当然&#xff0c;我们只要满足进程不同就行&#xff1b; 所以即便是同一个主机&#xff0c;只要是不同进程&am…

算法学习——LeetCode力扣图论篇1

算法学习——LeetCode力扣图论篇1 797. 所有可能的路径 797. 所有可能的路径 - 力扣&#xff08;LeetCode&#xff09; 描述 给你一个有 n 个节点的 有向无环图&#xff08;DAG&#xff09;&#xff0c;请你找出所有从节点 0 到节点 n-1 的路径并输出&#xff08;不要求按特…

elementui 导航菜单栏和Breadcrumb 面包屑关联

系列文章目录 一、elementui 导航菜单栏和Breadcrumb 面包屑关联 文章目录 系列文章目录前言一、elementui 导航菜单栏和Breadcrumb 面包屑怎么关联&#xff1f;二、实现效果三、实现步骤1.本项目演示布局2.添加面包屑2.实现breadcrumbName方法3.监听方法4.路由指配5.路由配置…

【C语言】Infiniband驱动mlx4_reset

一、注释 这个 mlx4_reset 函数负责重置 Mellanox 设备。它保存了设备的 PCI 头信息&#xff0c;然后重置了设备&#xff0c;之后还原保存的 PCI 头信息。请注意&#xff0c;该函数是用英文注释的&#xff0c;下面提供中文注释的版本。以下是该函数的流程&#xff1a; 1. 为保…

springboot项目学习-瑞吉外卖(4)续

1.任务 菜品的添加功能(涉及到两张表的数据添加) 2.菜品添加 功能页面如上&#xff0c;该页面有两个注意点 菜品分类&#xff1a;点击菜品分类后&#xff0c;会展示当前已有菜品&#xff1a;这个功能的实现要从category表里查询数据&#xff0c;然后再做展示口味做法配置&#…

SRS OBS利用RTMP协议实现音视频推拉流

参考&#xff1a;https://ossrs.net/lts/zh-cn/docs/v5/doc/getting-started 1&#xff09;docker直接运行SRS服务&#xff1a; docker run --rm -it -p 1935:1935 -p 1985:1985 -p 8080:8080 registry.cn-hangzhou.aliyuncs.com/ossrs/srs:5运行起来后可以http://localho…

java Web 疫苗预约管理系统用eclipse定制开发mysql数据库BS模式java编程jdbc

一、源码特点 JSP 疫苗预约管理系统是一套完善的web设计系统&#xff0c;对理解JSP java 编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,eclipse开发&#xff0c;数据库为Mysql5.0&#xff0c;使…

kaggle竞赛(房价预测)(Pytorch 06)

一 下载数据集 此数据集由Bart de Cock于2011年收集&#xff0c;涵盖了2006‐2010年期间 亚利桑那州 埃姆斯市的房价。 下载地址&#xff1a; import hashlib import os import tarfile import zipfile import requests#save DATA_HUB dict() DATA_URL http://d2l-data.s3…

“崖山数据库杯”深圳大学程序设计竞赛(正式赛)M题 一图秒

“崖山数据库杯”深圳大学程序设计竞赛&#xff08;正式赛&#xff09;_ACM/NOI/CSP/CCPC/ICPC算法编程高难度练习赛_牛客竞赛OJ (nowcoder.com) —————— 可以去牛客看题解&#xff1a; 题解 | #暂时没想法#_牛客博客 (nowcoder.net) —————— 上面的就是题解了。…

Adobe Illustrator 2023 for Mac/Win:创意无限,设计无界

在数字艺术与设计领域&#xff0c;Adobe Illustrator 2023无疑是一颗璀璨的明星。这款专为Mac和Windows用户打造的矢量图形设计软件&#xff0c;以其强大的功能和卓越的性能&#xff0c;赢得了全球设计师的广泛赞誉。 Adobe Illustrator 2023在继承前代版本优点的基础上&#…

基于ARM内核的智能手环(day1)

整体介绍 智能手环由 ARM 内核 MCU(Cortex-M 系列)、TFTLCD 屏、温湿度传感器、心率传感器、 加速度传感器等主要几部分构成。该平台硬件采用 STM32 芯片&#xff0c;通过对温湿度传感器的驱动编写&#xff0c;获取周围温湿度数据&#xff0c;并在 LCD 屏显示&#xff0c;通过对…

设计模式12--组合模式

定义 案例一 案例二 优缺点

docker配置github仓库ghcr国内镜像加速

文章目录 说明ghcr.io简介配置镜像命令地址命令行方式1panel面板方式方式一&#xff1a;配置镜像加速&#xff0c;命令行拉取方式二&#xff1a;配置镜像仓库&#xff0c;可视化拉取 说明 由于使用的容器需要从github下载镜像&#xff0c;服务器在国外下载速度很慢&#xff0c…

MySQL InnoDB 之 多版本并发控制(MVCC)

多版本并发控制&#xff08;MVCC&#xff0c;Multi-Version Concurrency Control&#xff09;是数据库管理系统中用于提供高并发性和在事务处理中实现隔离级别的一种技术。MVCC 允许系统在不完全锁定数据库资源的情况下&#xff0c;处理多个并发事务&#xff0c;从而提高了数据…

计算机网络实验五:特定主机路由和默认路由

实验五&#xff1a;特定主机路由和默认路由 5.1 实验目的 &#xff08;1&#xff09;学习默认路由的概念和作用 &#xff08;2&#xff09;学习特定路由的概念和作用 &#xff08;3&#xff09;了解网络中路由选择的基本原理和应用 5.2 实验步骤 5.2.1 构建网络拓扑 在栏…

LeetCode - 字母板上的路径

1138. 字母板上的路径 刚看到这道题的时候,我居然想用搜索去做这道题,其实有更优解,用 / %算会更加的快,只需要遍历一次即可.假如说我们要找n,n是第13个字母,那他就位于 13 / 5 2, 13 % 5 3.他就位于三行三列(a为0,0),知道了原理,代码就好写了. class Solution { public:st…

基于51单片机HC05蓝牙环境检测系统

目录 1、概要 2、HC05配对传送数据教程 2.1 进入AT模式 2.2串口软件配置 2.3 异常分析 3、代码编写 4、原理图 5、仿真图 6、实物运行视频 7、小结 资料下载地址&#xff1a;基于51单片机手自动浇花系统 1、概要 本文详细介绍HC05蓝牙模块与51单片机的连接配对过程&#xff0c…

【WEEK5】 【DAY5】DML语言【中文版】

2024.3.29 Friday 目录 3.DML语言3.1.外键&#xff08;了解&#xff09;3.1.1.概念3.1.2.作用3.1.3.添加&#xff08;书写&#xff09;外键的几种方法3.1.3.1.创建表时直接在主动引用的表里写&#xff08;被引用的表的被引用的部分&#xff09;3.1.3.2.先创建表后修改表以添加…