FlinkCDC-MYSQL批量写入

一、运行环境

(1)Flink:1.17.2

(2)Scala:2.12.20

(3)Mysql:5.7.43     ##开启binlog

二、代码示例

         思路:通过滚动窗口收集一批数据推给sink消费。binlog日志对于dataStream是没有key的,那么需要给每条数据造一个key。两种方式:(1)通过UUID创建一个全局key。(2)解析数据中的时间字段作为key。

package org.example;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.val;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.example.sink.MyJdbcSinkFunction;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

public class FLCDCToMySql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("snapshot.locking.mode", "none");

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("root321")
                .databaseList("test")
                .tableList("test.t_class")
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.earliest())
                .build();

        DataStreamSource<String> dataStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Mysql Source");
        // 10s一批数据推给sink
        SingleOutputStreamOperator<List<String>> outputStream = dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(UUID.randomUUID().toString(), s);
                    }
                }).keyBy(x -> x.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Tuple2<String, String>, List<String>, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<Tuple2<String, String>, List<String>, String, TimeWindow>.Context context, Iterable<Tuple2<String, String>> iterable, Collector<List<String>> collector) throws Exception {
                        List datalist = new ArrayList();
                        for(Tuple2<String, String> element : iterable){
                            datalist.add(element.f1);
                        }
                        collector.collect(datalist);
                    }
                });
//        outputStream.print();
        outputStream.addSink(new MyJdbcSinkFunction());
        env.execute("flink cdc demo ");

    }
}

        批次写入思路:使用executeBatch方式提交,url中需添加rewriteBatchedStatements=true 。

package org.example.sink;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.example.RecordData;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;

public class MyJdbcSinkFunction extends RichSinkFunction<List<String>> {

    private int batchSize = 10;
    private Connection connection = null;
    private PreparedStatement statement = null;
    private String jbcUrl = String.format("jdbc:mysql://localhost:3306/test?&useSSL=false&rewriteBatchedStatements=true");
    private String insertSql = String.format("insert into %s values (?,?,?,?,?,?)","t_demo");

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = DriverManager.getConnection(jbcUrl,"root","xxxxxx");
        connection.setAutoCommit(false);

        statement = connection.prepareStatement(insertSql);
    }

    @Override
    public void invoke(List<String> value, Context context) throws Exception {
        super.invoke(value, context);
        System.out.println("dataInfo: "+ value);
        int flag = 0;          //处理数据条数的标记
        int batchSize = 1000;  //批次commit
        int dataSize = value.size();

        String before = "";
        String after = "";
        String op = "";
        String dbname = "";
        String tablename = "";
        String ts_ms = "";

        for (int i = 0; i < dataSize; i++){
            String data = value.get(i);
            JSONObject object = JSONObject.parseObject(data);

            op = String.valueOf(object.get("op"));
            if("c".equals(op)){
                after = String.valueOf(object.get("after"));
            }else if("d".equals(op)){
                before = String.valueOf(object.get("before"));
            }else if("u".equals(op)){
                before = String.valueOf(object.get("before"));
                after = String.valueOf(object.get("after"));
            }

            JSONObject sourceObj = JSONObject.parseObject(object.get("source").toString());
            dbname = String.valueOf(sourceObj.get("db"));
            tablename = String.valueOf(sourceObj.get("table"));
            ts_ms = String.valueOf(sourceObj.get("ts_ms"));

            statement.setString(1, before);
            statement.setString(2, after);
            statement.setString(3, op);
            statement.setString(4, dbname);
            statement.setString(5, tablename);
            statement.setString(6, ts_ms);
            statement.addBatch();

            flag = flag + 1;
            if(i % batchSize == 0 ){
                statement.executeBatch();
                connection.commit();
                statement.clearBatch();
                flag = 0;  //批次提交后重置
            }
        }
        if(flag > 0){   //不满批次的提交
            statement.executeBatch();
            connection.commit();
            statement.clearBatch();
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        connection.close();
    }
}

三、插入目标表结果

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/906675.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

集合(数组、链表、map)

目录 Collection包结构 和collections区别 List 数组和arrayList 区别 数组下标为什么从0开始&#xff1f; ArrayList 动态数组 LinkedList双向链表增删快 增删快 链表 单链表和双链表区别 Arraylist VS LinkedList 区别 数组和List之间转换 ArrayList 、LinkedList…

多线程和线程同步基础篇学习笔记(Linux)

大丙老师教学视频&#xff1a;10-线程死锁_哔哩哔哩_bilibili 目录 大丙老师教学视频&#xff1a;10-线程死锁_哔哩哔哩_bilibili 线程概念 为什么要有线程 线程和进程的区别 在处理多任务的时候为什么线程数量不是越多越好? Linux提供的线程API 主要接口 线程创建 pth…

希望十一月对我好一点:C++之多态(2)--多态的原理(部分)

多态的原理 虚函数表指针 下⾯编译为32位程序的运⾏结果是什么&#xff08;&#xff09; A. 编译报错 B.运⾏报错C.8 D.12 class Base{public:virtual void Func1(){cout << "Func1()" << endl;}protected:int _b 1;char _ch x;};int main(){Base b…

Java: 遍历 Map

Java: 遍历 Map package animals;import java.util.HashMap; import java.util.Iterator; import java.util.Map;/*** Description :** author : HMF* Date : Created in 15:33 2024/11/1* version :*/ public class Test002 {public static void main(String[] args){Map<S…

基于vue+neo4j 的中药方剂知识图谱可视化系统

前言 历时一周时间&#xff0c;中药大数据R02系统中药开发完毕&#xff0c;该系统通过scrapy工程获取中药数据&#xff0c;使用python pandas预处理数据生成知识图谱和其他相关数据&#xff0c;利用vuespringbootneo4jmysql 开发系统&#xff0c;具体功能请看本文介绍。 简要…

01.如何用DDD重构老项目

学习资料来源&#xff1a;DDD独家秘籍视频合集 https://space.bilibili.com/24690212/channel/collectiondetail?sid1940048&ctype0 文章目录 动机DDD与重构实践重构? 重写从一开始就采用DDD重构步骤1. 添加领域模块2.分离出有价值的代码3.迁移到领域模块4.重复2,3 动机 …

WPF+MVVM案例实战(十八)- 自定义字体图标按钮的封装与实现(ABD类)

文章目录 1、案例效果1、按钮分类2、ABD类按钮实现描述1.文件创建与代码实现2、样式引用与控件封装3、按钮案例演示1、页面实现与文件创建2、运行效果如下3、总结4、源代码获取1、案例效果 1、按钮分类 在WPF开发中,最常见的就是按钮的使用,这里我们总结以下大概的按钮种类,…

Python 5个数据容器

列表&#xff08;list&#xff09; 列表的定义 定义空列表&#xff1a; 变量名 [] 或 变量名 list() 定义变量&#xff1a; 变量名 [元素1&#xff0c;元素2&#xff0c;元素3&#xff0c;... ] 取出列表元素 列表 [下标索引] 从前向后&#xff0c;从0开始&#xff…

使用语言模型进行文本摘要的五个级别(llm)

视频链接&#xff1a;5 Levels Of LLM Summarizing: Novice to Expert

Qt5.15.x源码编译

介绍&#xff1a; QT5.15以上版本已经不提供现成的集成软件了。所以当我们项目中需要用到5.15以上的版本时&#xff0c;只能自己对源码进行编译来生成一个环境了&#xff08;Qt提供了在线升级&#xff0c;但是在线升级中没有MinGW版本了&#xff09; 背景&#xff1a; 我们想要…

Ubuntu 系统、Docker配置、Docker的常用软件配置(下)

前言 书接上文&#xff0c;现在操作系统已经有了&#xff0c;作为程序的载体Docker也安装配置好了&#xff0c;接下来我们需要让Docker发挥它的法力了。 Docker常用软件的安装 1.Redis 缓存安装 1.1 下载 docker pull redis:7.4.1 #可改为自己需要的版本 1.2 创建本地目录存储…

Redis全系列学习基础篇之位图(bitmap)常用命令的解析

文章目录 描述常用命令及解析常用命令解析 应用场景统计不确定时间周期内用户登录情况思路分析实现 统计某一特定时间内活跃用户(登录一次即算活跃)的数量思路分析与实现 描述 bitmap是redis封装的用于针对位(bit)的操作,其特点是计算效率高&#xff0c;占用空间少,常被用来统计…

面试题:JVM(四)

new对象流程&#xff1f;&#xff08;龙湖地产&#xff09; 对象创建方法&#xff0c;对象的内存分配。&#xff08;360安全&#xff09; 1. 对象实例化 创建对象的方式有几种&#xff1f; 创建对象的步骤 指针碰撞&#xff1a;以指针为分界线&#xff0c;一边是已连续使用的…

手写实现call,apply,和bind方法

手写实现call&#xff0c;apply和bind方法 call&#xff0c;apply和bind方法均是改变this指向的硬绑定方法&#xff0c;要想手写实现此三方法&#xff0c;都要用到一个知识点&#xff0c;即对象调用函数时&#xff0c;this会指向这个对象&#xff08;谁调用this就指向谁&#…

【python ASR】win11-从0到1使用funasr实现本地离线音频转文本

文章目录 前言一、前提条件安装环境Python 安装安装依赖,使用工业预训练模型最后安装 - torch1. 安装前查看显卡支持的最高CUDA的版本&#xff0c;以便下载torch 对应的版本的安装包。torch 中的CUDA版本要低于显卡最高的CUDA版本。2. 前往网站下载[Pytorch](https://pytorch.o…

AI驱动无人驾驶:安全与效率能否兼得?

内容概要 如今&#xff0c;人工智能正以其神奇的魔力驱动着无人驾驶的浪潮&#xff0c;带来了无数令人兴奋的可能性。这一领域的最新动态显示&#xff0c;AI技术在车辆的决策过程和实时数据分析中发挥着重要作用&#xff0c;帮助车辆更聪明地应对复杂的交通环境。通过实时监测…

从头开始学PHP之面向对象

首先介绍下最近情况&#xff0c;因为最近入职了且通勤距离较远&#xff0c;导致精力不够了&#xff0c;而且我发现&#xff0c;人一旦上了班&#xff0c;下班之后就不想再进行任何脑力劳动了&#xff08;对大部分牛马来说&#xff0c;精英除外&#xff09;。 话不多说进入今天的…

Systemd:现代 Linux 系统服务管理的核心

Systemd&#xff1a;现代 Linux 系统服务管理的核心 引言 Systemd 是一种现代的系统和服务管理器&#xff0c;用于在 Linux 系统启动时初始化用户空间&#xff0c;并通过服务管理和资源控制实现系统的自动化管理。自发布以来&#xff0c;Systemd 已逐渐取代传统的 SysVinit 和…

Linux初阶——线程(Part3):POSIX 信号量 CP 模型变体

一、什么是 POSIX 信号量 信号量本质就是一个统计资源数量的计数器。​​​​​​​ 1、PV 操作 pv操作就是一种让信号量变化的操作。其中 P 操作可以让信号量减 1&#xff08;如果信号量大于 0&#xff09;&#xff0c;V 操作可以让信号量加 1. 2、信号量类型——sem_t 3…

《女巫攻击:潜伏在网络背后的隐秘威胁与防御策略》

目录 引言 一、基本概念 二、攻击机制 三、Sybil攻击类型 1、直接通信 2、间接通信 3、伪造身份 4、盗用身份 5、同时攻击 6、非同时攻击 四、攻击影响 五、防御措施 总结 引言 随着区块链技术和去中心化网络的迅速发展&#xff0c;网络安全问题也愈发引起关注。其…