Flink将数据写入MySQL(JDBC)

一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

        <flink.version>1.14.6</flink.version>
        <spark.version>2.4.3</spark.version>
        <hadoop.version>2.8.5</hadoop.version>
        <hbase.version>1.4.9</hbase.version>
        <hive.version>2.3.5</hive.version>
        <java.version>1.8</java.version>
        <scala.version>2.11.8</scala.version>
        <mysql.version>8.0.22</mysql.version>
        <scala.binary.version>2.11</scala.binary.version>

2.2 导入相关依赖

 <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--mysql连接器依赖-->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>8.0.22</version>
</dependency>

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws` ( 
      `id` varchar(100) NOT NULL
      ,`ts` bigint(20) DEFAULT NULL
      ,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) 
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

package com.flink.POJOs;


import java.util.Objects;

/**
 * TODO POJO类的特点
 * 类是公有(public)的
 * 有一个无参的构造方法
 * 所有属性都是公有(public)的
 * 所有属性的类型都是可以序列化的
 */
public class WaterSensor {
    //类的公共属性
    public String id;
    public Long ts;
    public Integer vc;

    //无参构造方法
    public WaterSensor() {
        //System.out.println("调用了无参数的构造方法");
    }

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    //生成get和set方法
    public void setId(String id) {
        this.id = id;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public void setVc(Integer vc) {
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public Long getTs() {
        return ts;
    }

    public Integer getVc() {
        return vc;
    }

    //重写toString方法
    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }

    //重写equals和hasCode方法
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WaterSensor that = (WaterSensor) o;
        return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }
}
//scala的case类?

2.5 自定义map函数

package com.flink.POJOs;

import org.apache.flink.api.common.functions.MapFunction;

public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
    @Override
    public WaterSensor map(String value) throws Exception {
        String[] datas = value.split(",");
        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
    }
}

2.5 Flink2MySQL

package com.flink.DataStream.Sink;

import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * Flink 输出到 MySQL(JDBC)
 */
public class flinkSinkJdbc {
    public static void main(String[] args) throws Exception {
        //TODO 创建Flink上下文执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        //TODO Source
        DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        //TODO Transfer
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());
        /**TODO 写入 mysql
         * 1、只能用老的 sink 写法
         * 2、JDBCSink 的 4 个参数:
         *   第一个参数: 执行的 sql,一般就是 insert into
         *   第二个参数: 预编译 sql, 对占位符填充值
         *   第三个参数: 执行选项 ---->攒批、重试
         *   第四个参数: 连接选项---->url、用户名、密码
         */
        SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                        System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");
                    }
                }
                , JdbcExecutionOptions
                        .builder()
                        .withMaxRetries(3)         // 重试次数
                        .withBatchSize(100)        // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("********")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );
        //TODO 写入到Mysql
        waterSensorSingleOutputStreamOperator.addSink(sinkFunction);

        streamExecutionEnvironment.execute();
    }
}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

在这里插入图片描述
启动Flink程序
在这里插入图片描述
查看数据库写入是否正常
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/105125.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

UE5 Blueprint发送http请求

一、下载插件HttpBlueprint、Json Blueprint Utilities两个插件是互相依赖的&#xff0c;启用&#xff0c;重启项目 目前两个是Beta的状态&#xff0c;如果你使用的平台支持就可以使用&#xff0c;我们的项目因为需要取Header的值&#xff0c;所有没法使用这两个插件&#xff0…

单链表的定义(数据结构与算法)

单链表的定义 单链表是一种常见的数据结构&#xff0c;用于存储元素的序列。它由一系列节点组成&#xff0c;每个节点包含一个数据元素和一个指向下一个节点的引用&#xff08;指针&#xff09;。单链表中的节点之间通过指针连接起来&#xff0c;形成一个线性结构。单链表是一种…

【Elasticsearch】es脚本编程使用详解

目录 一、es脚本语言介绍 1.1 什么是es脚本 1.2 es脚本支持的语言 1.3 es脚本语言特点 1.4 es脚本使用场景 二、环境准备 2.1 docker搭建es过程 2.1.1 拉取es镜像 2.1.2 启动容器 2.1.3 配置es参数 2.1.4 重启es容器并访问 2.2 docker搭建kibana过程 2.2.1 拉取ki…

Git的远程仓库

Git的远程仓库 添加远程仓库从远程库克隆 添加远程仓库 你在本地创建了一个Git仓库后&#xff0c;又想在GitHub创建一个Git仓库&#xff0c;并且让这两个仓库进行远程同步&#xff0c;这样&#xff0c;GitHub上的仓库既可以作为备份&#xff0c;又可以让其他人通过该仓库来协作…

如何在VScode中让printf输出中文

如何在VScode中让printf输出中文&#xff1f; 1、在“Visual Studio Code”图标上右击&#xff0c;弹出对话框。见下图&#xff1a; 2、点击“以管理员身份运行”&#xff0c;得到下图&#xff1a; 3、点击“UTF-8”按钮&#xff0c;得到下图&#xff1a; 4、点击“通过编码重…

Python---练习:使用for循环嵌套实现打印九九乘法表

思考&#xff1a; 外层循环主要用于控制循环的行数&#xff0c;内层循环用于控制列数。 基本语法&#xff1a; # 外层循环 for i in 序列1:# 内层循环for j in 序列2:循环体 序列1 序列2 &#xff0c;就可以是range(1, 10) -----也就是从1&#xff0c;到9。 参考while循环…

【vector题解】杨辉三角 | 删除有序数组中的重复项 | 只出现一次的数字Ⅱ

杨辉三角 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1…

动态规划(记忆化搜索)

AcWing 901. 滑雪 给定一个 R行 C 列的矩阵&#xff0c;表示一个矩形网格滑雪场。 矩阵中第 i 行第 j 列的点表示滑雪场的第 i 行第 j 列区域的高度。 一个人从滑雪场中的某个区域内出发&#xff0c;每次可以向上下左右任意一个方向滑动一个单位距离。 当然&#xff0c;一个人能…

鸿蒙跨平台框架来了ArkUi-X

前言&#xff1a; 各位同学大家有段时间没有给大家更新博客了 之前鸿蒙推出了鸿ArkUi-X 框架所以就写个文章分享一下 效果图&#xff1a; 首先需要下载支持 ArkUI-X 套件的华为开发工具 DevEco &#xff0c;版本为 4.0 以上&#xff0c;目前可以下载预览版进行体验。下载地址…

Node.js与npm版本比对

Node.js与npm版本比对 Node.js与npm版本比对版本对比表Node版本对比 Node.js与npm版本比对 我们在项目开发过程中&#xff0c;经常会遇到公司一些老的前端工程项目&#xff0c;而我们当前的node及npm版本都是相对比较新的了。 在运行以前工程时&#xff0c;会遇到相关环境不匹…

宝塔Python3.7安装模块报错ModuleNotFoundError: No module named ‘Crypto‘解决办法

前言 今晚遇到一个问题&#xff0c;宝塔服务器上安装脚本的模块时&#xff0c;出现以下报错&#xff0c;这里找到了解决办法 Traceback (most recent call last):File "/www/wwwroot/unifysign/fuck_chaoxing/fuck_xxt.py", line 4, in <module>from Crypto.…

WORD中的表格内容回车行距过大无法调整行距

word插入表格&#xff0c;编辑内容&#xff0c;换行遇到如下问题&#xff1a; 回车后行距过大&#xff0c;无法调整行距。 解决方法&#xff08;并行&#xff09;&#xff1a; 方法1&#xff1a;选中要调整的内容&#xff0c;菜单路径&#xff1a;“编辑-清除-格式” 方法2&am…

解读BOT攻击,探索灵活且准确的安全之道

车票、秒杀、限量球鞋……面对这样的抢购场景&#xff0c;为什么总是落后于人&#xff1f;其实你遇到的并不是真人&#xff0c;而是恶意BOT。恶意的BOT进行信息数据爬取、薅羊毛等攻击行为&#xff0c;正损害着企业和用户的利益。在过去 5 年&#xff0c;几乎每个企业都会遇到由…

JVM相关面试题(每日一练)

1. 什么是垃圾回收机制&#xff1f; 垃圾收集 Garbage Collection 通常被称为“GC”&#xff0c;它诞生于1960年 MIT 的 Lisp 语言&#xff0c;经过半个多世纪&#xff0c;目前已经十分成熟了。 jvm 中&#xff0c;程序计数器、虚拟机栈、本地方法栈都是随线程而生随线程而灭&a…

Git(二)版本控制、发展历史、初始化配置、别名

目录 一、版本控制1.1 为什么要使用版本控制&#xff1f;1.2 集中化的版本控制系统1.3 分布式的版本控制系统1.3 两种版本控制系统对比集中式&#xff08;svn&#xff09;分布式&#xff08;git&#xff09; 二、发展历史三、初始化配置3.1 配置文件3.2 配置内容 四、别名 官网…

esp32-C3固件烧录用户手册

esp32-C3固件烧录用户手册1.4 文章目录 esp32-C3固件烧录用户手册1.4烧录所需硬件软件工具vscodeplatformIOflash_download_tools 插座与USB转TTL模块之间接线esp32-C3版本插座&#xff08;底板4针&#xff09; bin固件和烧录地址获取详细烧录地址和信息获取文件系统程序详细烧…

输入/输出应用程序接口和设备驱动程序接口

文章目录 1.输入/输出应用程序接口1.字符设备接口2.块设备接口3.网络设备接口1.网络设备套接字通信 4.阻塞/非阻塞I/O 2.设备驱动程序接口1.统一标准的设备驱动程序接口 1.输入/输出应用程序接口 1.字符设备接口 get/put系统调用:向字符设备读/写一个字符 2.块设备接口 read/wr…

Android拖放startDragAndDrop拖拽onDrawShadow动态添加View,Kotlin(3)

Android拖放startDragAndDrop拖拽onDrawShadow动态添加View&#xff0c;Kotlin&#xff08;3&#xff09; import android.content.ClipData import android.graphics.Canvas import android.graphics.Point import android.os.Bundle import android.util.Log import android.…

酷开科技 | 酷开系统大屏电视,打造精彩家庭场景

在信息资讯不发达的年代&#xff0c;电视机一直都是个人及家庭重要的信息获取渠道和家庭娱乐中心&#xff0c;是每个家庭必不可少的大家电之一&#xff01;在快节奏的现代生活中&#xff0c;受手机和平板的冲击&#xff0c;电视机这个曾经的客厅“霸主”一度失去了“主角光环”…