使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

CSDN开发云
实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。

  • 1. 从源数据库(MySQL和Oracle)实时抽取数据
  • 2. 对数据进行清洗和转换
  • 3. 将转换后的数据写入目标数据库(MySQL)
    请添加图片描述

我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。

环境准备

  • 确保MySQL和Oracle数据库运行**,并创建相应的表。
  • 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。

第一步:创建源和目标数据库表

假设我们有以下三个表:

  • source_mysql_table(MySQL中的源表)
  • source_oracle_table(Oracle中的源表)
  • target_table(目标MySQL表)

MySQL源表

CREATE DATABASE source_mysql_db;
USE source_mysql_db;

CREATE TABLE source_mysql_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id VARCHAR(255) NOT NULL,
    action VARCHAR(255) NOT NULL,
    timestamp VARCHAR(255) NOT NULL
);

Oracle源表

CREATE TABLE source_oracle_table (
    id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,
    user_id VARCHAR2(255) NOT NULL,
    action VARCHAR2(255) NOT NULL,
    timestamp VARCHAR2(255) NOT NULL,
    PRIMARY KEY (id)
);

目标MySQL表

CREATE DATABASE target_db;
USE target_db;

CREATE TABLE target_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id VARCHAR(255) NOT NULL,
    action VARCHAR(255) NOT NULL,
    timestamp VARCHAR(255) NOT NULL
);

第二步:添加项目依赖

在pom.xml中添加Flink、MySQL和Oracle相关的依赖:

<dependencies>
    <!-- Spring Boot dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <!-- Apache Flink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>

    <!-- MySQL JDBC driver -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>

    <!-- Oracle JDBC driver -->
    <dependency>
        <groupId>com.oracle.database.jdbc</groupId>
        <artifactId>ojdbc8</artifactId>
        <version>19.8.0.0</version>
    </dependency>
</dependencies>

第三步:编写Flink ETL任务

创建一个Flink任务类来实现ETL逻辑。

创建一个POJO类表示数据结构

package com.example.flink;

public class UserAction {
    private int id;
    private String userId;
    private String action;
    private String timestamp;

    // Getters and setters
    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
}

编写Flink任务类

package com.example.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

@Component
public class FlinkETLJob implements CommandLineRunner {

    @Override
    public void run(String... args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从MySQL读取数据
        DataStream<UserAction> mysqlDataStream = env.addSource(new MySQLSource());

        // 从Oracle读取数据
        DataStream<UserAction> oracleDataStream = env.addSource(new OracleSource());

        // 合并两个数据流
        DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);

        // 清洗和转换数据
        DataStream<UserAction> transformedStream = mergedStream.map(new MapFunction<UserAction, UserAction>() {
            @Override
            public UserAction map(UserAction value) throws Exception {
                // 进行清洗和转换
                value.setAction(value.getAction().toUpperCase());
                return value;
            }
        });

        // 将数据写入目标MySQL数据库
        transformedStream.addSink(new MySQLSink());

        // 执行任务
        env.execute("Flink ETL Job");
    }

    public static class MySQLSource implements SourceFunction<UserAction> {
        private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";
        private static final String JDBC_USER = "source_user";
        private static final String JDBC_PASSWORD = "source_password";
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<UserAction> ctx) throws Exception {
            try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
                while (isRunning) {
                    String sql = "SELECT * FROM source_mysql_table";
                    try (PreparedStatement statement = connection.prepareStatement(sql);
                         ResultSet resultSet = statement.executeQuery()) {
                        while (resultSet.next()) {
                            UserAction userAction = new UserAction();
                            userAction.setId(resultSet.getInt("id"));
                            userAction.setUserId(resultSet.getString("user_id"));
                            userAction.setAction(resultSet.getString("action"));
                            userAction.setTimestamp(resultSet.getString("timestamp"));
                            ctx.collect(userAction);
                        }
                    }
                    Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次
                }
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    public static class OracleSource implements SourceFunction<UserAction> {
        private static final String JDBC_URL = "jdbc:oracle:thin:@localhost:1521:orcl";
        private static final String JDBC_USER = "source_user";
        private static final String JDBC_PASSWORD = "source_password";
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<UserAction> ctx) throws Exception {
            try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
                while (isRunning) {
                    String sql = "SELECT * FROM source_oracle_table";
                    try (PreparedStatement statement = connection.prepareStatement(sql);
                         ResultSet resultSet = statement.executeQuery()) {
                        while (resultSet.next()) {
                            UserAction userAction = new UserAction();
                            userAction.setId(resultSet.getInt("id"));
                            userAction.setUserId(resultSet.getString("user_id"));
                            userAction.setAction(resultSet.getString("action"));
                            userAction.setTimestamp(resultSet.getString("timestamp"));
                            ctx.collect(userAction);
                        }
                    }
                    Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次
                }
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {
        private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";
        private static final String JDBC_USER = "target_user";
        private static final String JDBC_PASSWORD = "target_password";
        private transient Connection connection;
        private transient PreparedStatement statement;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
            String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";
            statement = connection.prepareStatement(sql);
        }

        @Override
        public void flatMap(UserAction value, Collector<Void> out) throws Exception {
            statement.setString(1, value.getUserId());
            statement.setString(2, value.getAction());
            statement.setString(3, value.getTimestamp());
            statement.executeUpdate();
        }

        @Override
        public void close() throws Exception {
            super.close();
            if (statement != null) {
                statement.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

第四步:配置Spring Boot

在application.properties中添加必要的配置:

# Spring Boot configuration
server.port=8080

第五步:运行和测试

  • 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
  • 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
  • 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。

总结

通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/730787.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开发中遇到的错误 - @SpringBootTest 注解爆红

我在使用 SpringBootTest 注解的时候爆红了&#xff0c;ait 回车也导不了包&#xff0c;后面发现是因为没有加依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId>…

噪声-降噪引脚如何提高系统性能

由于LDO是电子器件&#xff0c;因此它们会自行产生一定量的噪声。选择低噪声LDO并采取措施来降低内部噪声对于生成不会影响系统性能的清洁电源轨而言不可或缺。 识别噪声 理想的 LDO 会生成没有交流元件的电压轨。遗憾的是&#xff0c;LDO 会像其他电子器件一样自行产生噪声。…

详细解读“找不到mfc140u.dll无法继续执行代码”问题

当你打开某个软件或者运行游戏&#xff0c;系统提示mfc140u.dll丢失&#xff0c;此时这个软件或者游戏根本无法运行。其实&#xff0c;mfc140u.dll是动态库文件&#xff0c;它是VS2010编译的软件所产生的&#xff0c;如果电脑运行程序时提示缺少mfc140u.dll文件&#xff0c;程序…

网络虚拟化考题

vrrp讲过吗&#xff1f;&#xff1f;&#xff1f; d 每一层都是什么设备啊 abcd 为啥流量不可控不可视 c是啥意思 讲过吗 abc aNET网络虚拟化是啥啊 为啥&#xff1f;&#xff1f; 啥是CDN&#xff1f;&#xff1f;&#xff1f;&#xff1f;&#xff1f;

汇聚荣做拼多多运营口碑怎么样?

拼多多作为国内领先的电商平台&#xff0c;其运营口碑一直是业界和消费者关注的焦点。汇聚荣作为拼多多的运营服务商&#xff0c;其服务质量直接影响到拼多多平台的用户体验和品牌形象。那么&#xff0c;汇聚荣做拼多多运营口碑怎么样呢? 一、服务响应速度 汇聚荣在服务响应速…

聊聊JSON

引言 JSON的概念 JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;它基于JavaScript的一个子集&#xff0c;但独立于语言&#xff0c;这意味着它可以被许多编程语言轻松解析。JSON的简洁性和易读性使其成为Web开发中数据交换的…

Focal Network for Image Restoration

Focal Network for Image Restoration 用于图像恢复的焦点网络 Yuning Cui1 Wenqi Ren2* Xiaochun Cao2 Alois Knoll1 1Technical University of Munich 2Shenzhen Campus of Sun Yat-sen University {yuning.cui,knoll}in.tum.de {renwq3,caoxiaochun}mail.sysu.edu.cn 论文…

【python】OpenCV——Color Correction

文章目录 cv2.aruco 介绍imutils.perspective.four_point_transform 介绍skimage.exposure.match_histograms 介绍牛刀小试遇到的问题 参考学习来自 OpenCV基础&#xff08;18&#xff09;使用 OpenCV 和 Python 进行自动色彩校正 cv2.aruco 介绍 一、cv2.aruco模块概述 cv2.…

【2024德国工作】外国人在德国找工作是什么体验?

挺难的&#xff0c;德语应该是所有中国人的难点。大部分中国人进德国公司要么是做中国业务相关&#xff0c;要么是做技术领域的工程师。先讲讲人在中国怎么找德国的工作&#xff0c;顺便延申下&#xff0c;德国工作的真实体验&#xff0c;最后聊聊在今年的德国工作签证申请条件…

网络与协议安全复习 - 电子邮件安全

文章目录 PGP(Pretty Good Privacy)功能 S/MIME(Secure/Multipurpose Internet Mail Extensions)DKIM(Domain Keys Identified Mail) PGP(Pretty Good Privacy) 使用符号&#xff1a; Ks&#xff1a;会话密钥、KRa&#xff1a;A 的私钥、KUa&#xff1a;A 的公钥、EP&#xff…

Android开发系列(六)Jetpack Compose之Box

Box是一个用来组合和控制子元素布局的组件。它可以在一个矩形区域内排列一个或多个子元素&#xff0c;并根据所提供的参数来控制它们的位置、大小和样式。 Box的功能类似传统的FrameLayout。 下面通过示例了解Box的使用方法&#xff0c;首先看一个最简单的示例&#xff0c;如下…

永磁同步电机驱动死区补偿

1 死区效应及补偿 1. 1 死区效应 在本文的电机控制嵌入式系统中,逆变器为三 相电压型桥式逆变电路,如图 1 所示。 在理想状态 下,上桥臂和下桥臂的控制信号满足互补通断原则, 即上桥臂开通时,下桥臂关断,反之亦然。 而在实际 应用中,开关管的通断需要一定的开通时…

大语言模型-Transformer

目录 1.概述 2.作用 3.诞生背景 4.历史版本 5.优缺点 5.1.优点 5.2.缺点 6.如何使用 7.应用场景 7.1.十大应用场景 7.2.聊天机器人 8.Python示例 9.总结 1.概述 大语言模型-Transformer是一种基于自注意力机制&#xff08;self-attention&#xff09;的深度学习…

如何使用Windows备份轻松将数据转移到新电脑?这里有详细步骤

序言 我们都知道那种买了一台新电脑,就想直接上手的感觉。我记得在过去的日子里,要花几个小时传输我的文件,并试图复制我的设置。在当今传输数据的众多方法中,Windows备份提供了一个简单可靠的解决方案。 登录到你的Microsoft帐户 Microsoft在传输过程中使用其云存储来保…

NGINX_六 nginx 日志文件详解

六 nginx 日志文件详解 nginx 日志文件分为 **log_format** 和 **access_log** 两部分log_format 定义记录的格式&#xff0c;其语法格式为log_format 样式名称 样式详情配置文件中默认有log_format main $remote_addr - $remote_user [time_local] "req…

二,SpringFramework

二、SpringFramework实战指南 目录 一、技术体系结构 1.1 总体技术体系1.2 框架概念和理解 二、SpringFramework介绍 2.1 Spring 和 SpringFramework概念2.2 SpringFramework主要功能模块2.3 SpringFramework 主要优势 三、Spring IoC容器和核心概念 3.1 组件和组件管理概念3…

超越GPT-4o!新王Claude 3.5 Sonnet来啦!免费使用

目录 01 比GPT-4o更智能&#xff0c;比Claude 3 Opus快两倍 02 最强视觉Model 03 使用Claude的新方式&#xff1a;Artifacts 04 安全性和透明度 Anthropic刚刚发布了全新大模型Claude 3.5 Sonnet&#xff0c;号称是迄今为止最智能的模型。一文几步教你注册使用Claude 3.5 S…

硬件开发笔记(二十一):外部搜索不到的元器件封装可尝试使用AD21软件的“ManufacturerPart Search”功能

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/139869584 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV…

英文字母表

目录 一 设计原型 二 后台源码 一 设计原型 二 后台源码 namespace 英文字母表 {public partial class Form1 : Form{public Form1(){InitializeComponent();}private void Form1_Load(object sender, EventArgs e){foreach (var item in panel1.Controls){if (item ! null)…