Flink读取mysql数据库(java)

代码如下:

package com.weilanaoli.ruge.vlink.flink;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.logging.Level;
import java.util.logging.Logger;

class MysqlExample {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("xx.xx.xx.xx")    //输入地址
                .port(3306)                 //输入端口
                .databaseList("xx")         //输入库名
                .tableList("xx.test")       //输入表名
                .username("xx")             //输入用户名
                .password("xxxx")           //输入密码
                .startupOptions(StartupOptions.initial())  //读取binlog策略,这个启动选项有五种
                .deserializer(new JsonDebeziumDeserializationSchema()) //配置不要锁表,但是数据一致性不是精准一次,会变成最少一次
                .build();
        
        //配置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // enable checkpoint
        env.enableCheckpointing(3000);
        DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
        SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {
                try {
                    System.out.println("processElement=====" + value);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        dataStreamSource.print("原始数据=====");
        env.execute("Print MySQL Snapshot + Binlog");
    }
}

运行结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/60441.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构|二叉树遍历】递归与非递归实现前序遍历、中序遍历、后序遍历

递归与非递归实现二叉树的前序遍历、中序遍历、后序遍历。 二叉树图 定义 前序遍历&#xff08;Preorder Traversal&#xff09;&#xff1a; 前序遍历的顺序是先访问根节点&#xff0c;然后按照先左后右的顺序访问子节点。对于上面的二叉树&#xff0c;前序遍历的结果是&…

【React】搭建React项目

最近自己在尝试搭建react项目&#xff0c;其实react项目搭建没有想象中的那么复杂&#xff0c;我们只需要使用一个命令把React架子搭建好&#xff0c;其他的依赖可以根据具体的需求去安装&#xff0c;比如AntDesignMobile的UI框架&#xff0c;执行npm install antd-mobile --sa…

【计算机网络】NAT及Bridge介绍

OSI七层模型 七层模型介绍及举例 为通过网络将人类可读信息通过网络从一台设备传输到另一台设备&#xff0c;必须在发送设备沿 OSI 模型的七层结构向下传输数据&#xff0c;然后在接收端沿七层结构向上传输数据。 数据在 OSI 模型中如何流动 库珀先生想给帕尔梅女士发一封电…

android kernel移植5-RK3568

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言1.添加开发板默认配置文件前言 前面我们已经学会了移植uboot,其实就是把瑞芯微的关于uboot的一些文件的名字和编译指定的文件改为自己定义的问价和名字,那么接下来的Android kernel其实也是…

探索Streamlit中强大而灵活的 st.write() 函数(五):构建丰富多样的应用界面

文章目录 1 前言2 显示HTML的内容3 显示Markdown内容4 显示代码块5 显示DataFrame的交互式表格6 显示音频和视频7 显示图表8 显示图片9 显示地图10 显示PDF文件11 显示文件下载链接12 结语 1 前言 在这篇博文中&#xff0c;我们将着重介绍Streamlit中一个核心而重要的函数&…

java序列化框架全集讲解

一、简介 Java序列化框架是一种用于在Java应用程序中将对象转换为字节流或从字节流反序列化为对象的工具。序列化是将对象的状态转换为字节流的过程&#xff0c;以便可以将其存储在文件中、通过网络传输或在不同的系统之间共享。反序列化是将字节流转换回对象的过程。 Java序列…

【黑马头条之kafka及异步通知文章上下架】

本笔记内容为黑马头条项目的kafka及异步通知文章上下架部分 目录 一、kafka概述 二、kafka安装配置 三、kafka入门 四、kafka高可用设计 1、集群 2、备份机制(Replication&#xff09; 五、kafka生产者详解 1、发送类型 2、参数详解 六、kafka消费者详解 1、消费者…

Django实现音乐网站 ⑹

使用Python Django框架制作一个音乐网站&#xff0c; 本篇主要是在添加编辑过程中对后台歌手功能优化及表模型名称修改、模型继承内容。 目录 表模型名称修改 模型继承 创建抽象基类 其他模型继承 更新表结构 歌手新增、编辑优化 表字段名称修改 隐藏单曲数和专辑数 姓…

复制带随机指针的链表【构造链表深拷贝】

复制带随机指针的链表 文章目录 复制带随机指针的链表 链表复制要求 解题思路 1、拷贝所有节点&#xff0c;并放在对应原节点的后面 2.将每个 random 指向对应的位置。 3.将复制的链表解下来&#xff0c;尾插到一起&#xff0c;并将原链表恢复 源码 先导知识点&#…

C语言手撕单链表

一、链表的概念 链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;也就是内存存储不是像顺序表那么连续存储&#xff0c;而是以结点的形式一块一块存储在堆上的&#xff08;用动态内存开辟&#xff09;。 既然在内存上不是连续存储&#xff0c;那我们如何将这一…

第28天-Kubernetes架构,集群部署,Ingress,项目部署,Dashboard

1.K8S集群部署 1.1.k8s快速入门 1.1.1.简介 Kubernetes简称k8s&#xff0c;是用于自动部署&#xff0c;扩展和管理容器化应用程序的开源系统。 中文官网&#xff1a;https://kubernetes.io/zh/中文社区&#xff1a;https://www.kubernetes.org.cn/官方文档&#xff1a;https…

保护模式中段选择子权限校验逻辑详解

保护模式中段选择子权限校验逻辑详解 CPLRPLDPL权限校验逻辑测试 CPL CPL是当前进程的权限级别(Current Privilege Level)&#xff0c;是当前正在执行的代码所在的段的特权级&#xff0c;存在于cs段选择子的后两位的低两位。 段选择子可见部分的数据结构如下&#xff1a; 举例…

基于SpringBoot+Vue的漫画网站设计与实现(源码+LW+部署文档等)

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

智能指针shared_ptr:自定义删除器

重点&#xff1a; 1.普通指针转化成智能指针。 2.智能指针创建的时候&#xff0c;第二个参数是自定义删除器&#xff0c;默认情况下&#xff0c;shared_ptr调用delete()函数。 class A { public:void Get() { cout << b << endl; }; private:int b{ 10 }; };clas…

在排序数组中查找元素的第一个和最后一个位置——力扣34

文章目录 题目描述法一 二分查找 题目描述 法一 二分查找 int bsearch_1(int l, int r) {while (l < r){int mid (l r)/2;if (check(mid)) r mid;else l mid 1;}return l; }int bsearch_2(int l, int r) {while (l < r){int mid ( l r 1 ) /2;if (check(mid)) l …

MobPush iOS SDK iOS实时活动

开发工具&#xff1a;Xcode 功能需要: SwiftUI实现UI页面&#xff0c;iOS16.1以上系统使用 功能使用: 需应用为启动状态 功能说明 iOS16.1 系统支持实时活动功能&#xff0c;可以在锁定屏幕上实时获知各种事情的进展&#xff0c;MobPushSDK iOS 4.0.3版本已完成适配&#xf…

序列建模简史(DIN/DIEN/DSIN/BST/MIMN/SIM/ETA/SDIM/TWIN)

序列建模简史(DIN/DIEN/DSIN/BST/MIMN/SIM/ETA/SDIM/TWIN) 史前史 在用户序列专门用于建模之前&#xff0c;一般对序列的建模的处理就是将所有序列行为进行sum/avg pooling操作&#xff0c;将用户的多个序列行为简单聚合成一个Embedding&#xff0c;然后和其他特征一起拼接。…

ansible-playbook roles模块编写lnmp剧本

目录 一&#xff1a;集中式编写lnmp剧本 二&#xff1a;分布式安装lnmp 1、nginx 配置 2、mysql配置 3、php配置 4、运行剧本 一&#xff1a;集中式编写lnmp剧本 vim /etc/ansible/lnmp.yml- name: lnmp playhosts: dbserversremote_user: roottasks:- name: perpare condif…

谷歌云 | 电子商务 | 如何更好地管理客户身份以支持最佳的用户体验

【本文由Cloud Ace整理发布。Cloud Ace是谷歌云全球战略合作伙伴&#xff0c;拥有 300 多名工程师&#xff0c;也是谷歌最高级别合作伙伴&#xff0c;多次获得 Google Cloud 合作伙伴奖。作为谷歌托管服务商&#xff0c;我们提供谷歌云、谷歌地图、谷歌办公套件、谷歌云认证培训…

嵌入式开发学习(STC51-10-直流电机)

内容 直流电机工作约5S后停止 直流电机简介 直流电机是指能将直流电能转换成机械能&#xff08;直流电动机&#xff09;或将机械能转换成直流电能&#xff08;直流发电机&#xff09;的旋转电机&#xff1b; 直流电机的结构应由定子和转子两大部分组成&#xff1b; 直流电…