使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

使用Flink实现Kafka到MySQL的数据流转换

在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍如何使用Apache Flink来实现这一过程。

在这里插入图片描述

环境准备

在开始之前,请确保你的开发环境中已经安装并配置了以下组件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>EastMoney</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

</project>

Kafka消息队列

1. 启动zookeeper
 zkServer start
2. 启动kafka服务
 kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topic
 kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
6. 生产数据
 kafka-console-producer --broker-list localhost:9092 --topic east_money

MySQL数据库
初始化mysql表

CREATE TABLE `t_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

定义Kafka数据源表:我们使用一个SQL语句创建了一个Kafka表re_stock_code_price_kafka,这个表代表了我们要从Kafka读取的数据结构和连接信息。

tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE NULL," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE ," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

    val result = tEnv.executeSql("select * from re_stock_code_price_kafka")

定义MySQL目标表:然后,我们定义了一个MySQL表re_stock_code_price,指定了与MySQL的连接参数和表结构。

val sink_table: String =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin
    tEnv.executeSql(sink_table)

数据转换和写入:最后,我们执行了一个插入操作,将从Kafka读取的数据转换并写入到MySQL中。

tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")

result.print()

全部代码

package org.east

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Kafka2Mysql {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE NULL," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE ," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

    val result = tEnv.executeSql("select * from re_stock_code_price_kafka")


    val sink_table: String =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin
    tEnv.executeSql(sink_table)
    tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")


    result.print()
    print("数据打印完成!!!")
  }
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/502540.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

算法学习——LeetCode力扣动态规划篇3

算法学习——LeetCode力扣动态规划篇3 494. 目标和 494. 目标和 - 力扣&#xff08;LeetCode&#xff09; 描述 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 ‘’ 或 ‘-’ &#xff0c;然后串联起所有整数&#xff0c;可以构造一个 表达式 …

用JSch实现远程传输文件并打包成jar

本文将简单介绍一下 JSch 这个Java的第三方库的一个简单用法&#xff0c;并以此为实例&#xff0c;讲解 IntelliJ 中打包成 jar 包的2种方式。 实现目标 我们的目标是&#xff0c;做出一个jar包&#xff0c;它能够实现类似于 scp 命令的远程传输文件的功能。用法如下&#xf…

成为嵌入式编程高手:C语言学习网站大揭秘!

介绍&#xff1a;嵌入式C语言是针对嵌入式系统开发的一种编程语言&#xff0c;它基于标准的C语言&#xff0c;但进行了特定的优化和调整&#xff0c;以适应嵌入式环境的特殊需求。以下是对嵌入式C语言的详细介绍&#xff1a; 语法基础&#xff1a;嵌入式C语言在语法上与标准C语…

支付后打开半屏小程序能力的相关调整通知

来源&#xff1a;小程序官方公告 各位开发者&#xff1a; 打开半屏小程序 能力是微信团队提供的一项方便用户从小程序便捷打开另一个小程序的轻量化体验能力。为了优化用户体验&#xff0c;避免用户在没有预期的情况下以半屏方式打开另一个小程序&#xff0c;微信团队将回收支…

代码学习记录31---动态规划开始

随想录日记part31 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.29 主要内容&#xff1a;今天开始要学习动态规划的相关知识了&#xff0c;今天的内容主要涉及四个方面&#xff1a; 理论基础 ; 斐波那契数 ;爬楼梯 ;使用最小花费爬楼梯。 理论基础 509. 斐…

平价运动型蓝牙耳机哪个牌子好?精心筛选五大必购产品分享!

蓝牙耳机已成为现代人生活中不可或缺的一部分&#xff0c;特别是那些追求健康、热爱运动的朋友们&#xff0c;平价且实用的运动型蓝牙耳机更是他们的首选&#xff0c;在众多的品牌与型号中&#xff0c;如何选择一款既符合预算又满足运动需求的蓝牙耳机呢&#xff1f;今天就让我…

个人优势能力测评,寻找你的天赋

个人优势能力测评&#xff0c;用来发现自己的天赋&#xff0c;也被称之为多元智力测评&#xff0c;该理论认为人的智力不仅仅是逻辑思维能力&#xff0c;每个人的天赋不同&#xff0c;具有多样性&#xff0c;目前的智力测试基本上都以逻辑思维&#xff0c;推理能力为主&#xf…

C++项目——集群聊天服务器项目(九)客户端异常退出业务

服务器端应检测到客户端是否异常退出&#xff0c;因此本节来实现客户端异常退出&#xff0c;项目流程见后文 一、客户端异常退出业务流程 &#xff08;1&#xff09;在业务模块定义处理客户端异常退出的函数 &#xff08;2&#xff09;集群聊天服务器项目(八&#xff09;提到…

剑指Offer题目笔记23(归并排序)

面试题77&#xff1a; 问题&#xff1a; ​ 输入一个链表的头节点&#xff0c;将该链表排序。 解决方案&#xff1a; ​ 使用归并排序。将链表分为两个子链表&#xff0c;在对两个子链表排序后再将它们合并为一个排序的链表。 源代码&#xff1a; /*** Definition for sin…

Python循环语句for

主要解决什么样的问题&#xff1a;具有重复性、规律性的问题 循环四要素&#xff1a; 循环的开始&#xff08;从第1步开始&#xff1b;从第1步开始/从起点开始&#xff09; 循环的继续条件&#xff08;还没走到第10步&#xff1b;没有碰到墙/就是看距离&#xff09; 循环体&…

算法学习——LeetCode力扣动态规划篇4

算法学习——LeetCode力扣动态规划篇4 377. 组合总和 Ⅳ 377. 组合总和 Ⅳ - 力扣&#xff08;LeetCode&#xff09; 描述 给你一个由 不同 整数组成的数组 nums &#xff0c;和一个目标整数 target 。请你从 nums 中找出并返回总和为 target 的元素组合的个数。 题目数据保…

JSQLParserException异常

前言 SQL中加入了租户字段&#xff0c;报这个错&#xff0c;可以查出数据&#xff0c;但是不多&#xff1b;SQL检查无问题 解决 原因一 引入新的SQL解析器检查解析SQL&#xff0c;与mybatis多租户无关 参考 <!--jsqlparser版本太低也无法解析&#xff0c;如2.0--> &…

左侧或水平导航菜单栏与main区域联动

系列文章目录 一、elementui 导航菜单栏和Breadcrumb 面包屑关联 二、左侧导航菜单栏与main区域联动 文章目录 系列文章目录前言一、实现步骤1.<el-menu>中设置属性router为true2.<el-menu-item>中设置路由 route"/"3.<el-main>里设置路由出口4…

Android Studio Iguana | 2023.2.1 补丁 1

Android Studio Iguana | 2023.2.1 Canary 3 已修复的问题Android Gradle 插件 问题 295205663 将 AGP 从 8.0.2 更新到 8.1.0 后&#xff0c;任务“:app:mergeReleaseClasses”执行失败 问题 298008231 [Gradle 8.4][升级] 由于使用 kotlin gradle 插件中已废弃的功能&#…

游戏行业行业竞争越来越激烈,遇到DDoS攻击遭受严重损失该如何解决

近年来&#xff0c;我们见证了数字化的快速发展&#xff0c;随着这样的发展&#xff0c;网络的威胁也逐渐增多&#xff0c;在网络攻击门槛不断降低&#xff0c;行业竞争越来越激烈&#xff0c;游戏行业的DDoS攻击如雨点般密集&#xff0c;在整个DDoS攻击的份额中&#xff0c;游…

C语言goto语句介绍

在C语言中&#xff0c;goto语句是一种流程控制语句&#xff0c;用于无条件地转移到程序中的特定标签位置。尽管goto语句在编程中具有一定的争议&#xff0c;但在某些情况下&#xff0c;它可以提供一种简单有效的解决方案。本文将深入介绍C语言中的goto语句&#xff0c;包括其基…

android安卓英语学习课设

一、关于这个项目ELAPP 该项目是一个基于java开发的服务器-客户端模式的安卓英语学习软件&#xff0c;主要功能点就是背单词&#xff0c;中英文翻译&#xff0c;OCR文字翻译。 服务器端使用springboot&#xff0c;mybatisplus&#xff0c;MySQL&#xff0c;mongodb&#xff0…

Solo 开发者周刊 (第9期):Dawwin首位人工智能编程师或将改变未来?

这里会整合 Solo 社区每周推广内容、产品模块或活动投稿&#xff0c;每周五发布。在这期周刊中&#xff0c;我们将深入探讨开源软件产品的开发旅程&#xff0c;分享来自一线独立开发者的经验和见解。本杂志开源&#xff0c;欢迎投稿。 好文推荐 Dawwin首位人工智能编程师&#…

企业数据被新型.rmallox勒索病毒加密,应该如何还原?

.rmallox勒索病毒为什么难以解密&#xff1f; .rmallox勒索病毒难以解密的主要原因在于其采用了高强度的加密算法&#xff0c;并且这些算法被有效地实施在了病毒程序中。具体来说&#xff0c;.rmallox勒索病毒使用了RSA和AES这两种非常成熟的加密算法。RSA是一种非对称加密算法…