一文说清flink从编码到部署上线

引言:目前flink的文章比较多,但一般都关注某一特定方面,很少有一个文章,从一个简单的例子入手,说清楚从编码、构建、部署全流程是怎么样的。所以编写本文,自己做个记录备查同时跟大家分享一下。本文以简单的mysql cdc为例展开说明。
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

1.MySQL

1.1 创建数据库和测试数据

数据库脚本:

CREATE DATABASE `flinktest`;
USE `flinktest`;
CREATE TABLE `products` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `description` varchar(512) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4;

insert  into `products`(`id`,`name`,`description`) values 
(1,'aaa','aaaa'),
(2,'ccc','ccc'),
(3,'dd','ddd'),
(4,'eeee','eee'),
(5,'ffff','ffff'),
(6,'hhhh','hhhh'),
(7,'iiii','iiii'),
(8,'jjjj','jjjj');

账号使用root就行。

1.2 开启binlog

参考:https://core815.blog.csdn.net/article/details/144233298
踩坑:测试过程中发现mysql 9.0一直无法获取更新的数据,最终使用的5.7。

2.编码

2.1 主要实现

package com.zl;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.mysql.cj.conf.PropertyKey.useSSL;

public class MysqlExample {
    public static void main(String[] args) throws Exception {

        List<String> SYNC_TABLES = Arrays.asList("flinktest.products");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("10.86.37.169")
                .port(3306)
                .databaseList("flinktest")
                .tableList(String.join(",", SYNC_TABLES))
                .username("root")
                .password("pwd")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        /// 配置flink访问页面-开始
       /* Configuration config = new Configuration();
       // 启用 Web UI,访问地址【http://ip:port】
        config.setBoolean("web.ui.enabled", true); 
        config.setString(RestOptions.BIND_PORT,"8081");
//        这个使用jar直接运行可以,如果提交给yarn会报错,需要改为getExecutionEnvironment()
        StreamExecutionEnvironment env = StreamExecutionEnvironment
        .createLocalEnvironmentWithWebUI(config);*/
        ///配置flink访问页面-结束

        StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();
        env.setParallelism(1);

        /// 设置CK存储-开始(不需要可注释掉)
        // hadoop部署见:https://core815.blog.csdn.net/article/details/144022938
        // hdfs访问地址见:/home/hadoop-3.3.3/etc/hadoop/core-site.xml
        env.getCheckpointConfig()
        .setCheckpointStorage("hdfs://10.86.97.191:9000"+"/flinktest/");
        env.getCheckpointConfig().setCheckpointInterval(3000);
        /// 设置CK存储-结束

        // 如果不能正常读取mysql的binlog:
        //①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);
        // ②可能是数据库ip、port、账号、密码错误。
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1).print();

        env.execute("Print MySQL Snapshot + Binlog");

    }

}

2.2 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.zl.flinkcdc</groupId>
    <artifactId>FlickCDC</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>FlickCDC</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink-version>1.14.0</flink-version>
        <flink-cdc-version>2.4.0</flink-cdc-version>
        <hadoop.version>3.0.0</hadoop.version>
        <slf4j.version>1.7.25</slf4j.version>
        <log4j.version>2.16.0</log4j.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink-cdc-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-guava</artifactId>
            <version>30.1.1-jre-15.0</version>
        </dependency>
        <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-guava</artifactId>
            <version>18.0-13.0</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>


        <!--        hadoop相关依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>commons-cli</artifactId>
                    <groupId>commons-cli</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-compress</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-annotations</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-core</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>asm</artifactId>
                    <groupId>org.ow2.asm</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>avro</artifactId>
                    <groupId>org.apache.avro</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-cli</artifactId>
                    <groupId>commons-cli</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-codec</artifactId>
                    <groupId>commons-codec</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-compress</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-io</artifactId>
                    <groupId>commons-io</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-lang3</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-logging</artifactId>
                    <groupId>commons-logging</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-math3</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jaxb-api</artifactId>
                    <groupId>javax.xml.bind</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>nimbus-jose-jwt</artifactId>
                    <groupId>com.nimbusds</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>zookeeper</artifactId>
                    <groupId>org.apache.zookeeper</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jsr305</artifactId>
                    <groupId>com.google.code.findbugs</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>gson</artifactId>
                    <groupId>com.google.code.gson</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>commons-cli</artifactId>
                    <groupId>commons-cli</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.5.0</version>
        </dependency>

        <!--mvn install:install-file -Dfile
        =D:/maven/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
       -DgroupId=org.apache.flink -DartifactId
       =flink-shaded-hadoop-3 -Dversion=3.1.1.7.2.9.0-173-9.0 -Dpackaging=jar-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-3</artifactId>
            <version>3.1.1.7.2.9.0-173-9.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>com.zl.MysqlExample</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

完整代码见:https://gitee.com/core815/flink-cdc-mysql

3.打包

mvn版本:3.5.4。
到pom.xml所在路径,执行“mvn package”
在这里插入图片描述
打包效果:
在这里插入图片描述

4.jar直接运行

java -jar FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5.flink yarn运行

hadoop、flink、yarn环境见:https://core815.blog.csdn.net/article/details/144022938

把FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar放到“/home”路径下。

执行下面命令:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

控制台看到如下打印:
在这里插入图片描述
yarn管理页面:
在这里插入图片描述
运行日志查看步骤:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下面即可看到完整日志:
在这里插入图片描述

6.常见问题

6.1 问题1

日志错误:
The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

解决:
修改my.cnf文件。
[mysqld]
default-time-zone=‘Asia/Shanghai’
重启MySQL服务。

6.2 问题2:hdfs

日志错误:
Permission denied: user=PC2023, access=WRITE, inode=“/”:root:supergroup:drwxr-xr-x

解决:

临时解决
hadoop fs -chmod -R 777 /

6.3 问题3:guava30 guava18冲突

分析:
flink 1.13 cdc2.3的组合容易出这个问题。

解决:
参考:https://developer.aliyun.com/ask/574901
flink 使用1.14.0版本;cdc使用2.4.0版本。

6.4 问题4

日志错误:
/user/root/.flink/application_1733492706887_0002/log4j.properties could only be written to 0 of the 1 minReplication nodes

解决:
https://www.pianshen.com/article/1554707968/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/934960.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ZUC256 Go Go Go!!!

文章目录 背景运行效果代码 背景 因业务需要使用ZUC算法&#xff0c;GitHub上又没有对ZUC256相对应的Go语言的实现。 吃水不忘挖井人&#xff0c;在这里感谢GmSSL及BouncyCastle两个强大的密码学库&#xff01; 本ZUC256的编写&#xff0c;参考了这两个库及中科院软件院发布的…

图论【Lecode_HOT100】

文章目录 1.岛屿数量No.2002.腐烂的橘子No.9943.课程表No.2074.实现Trie&#xff08;前缀树&#xff09;No.208 1.岛屿数量No.200 class Solution {public int numIslands(char[][] grid) {if (grid null || grid.length 0) {return 0;}int numIslands 0;int rows grid.len…

快速将请求头构建成json结构

1.背景 有时候我们要爬虫(组包)请求一个资源数据,需要构建与原始请求一样的请求头,从浏览器复制过来的请求头,有很多,如果一个一个的配置成json有点慢,那么如何快速构建呢? 今天就使用正则表达式的方式实现 正则表达式实现快速将请求头构建成json结构 将冒号后边的换行符去掉…

数据结构6.3--交换排序

目录 交换排序基本思想 1.冒泡排序 2.快速排序 2.1hoare版本 2.2挖坑法 2.3前后指针版本 交换排序基本思想 所谓交换&#xff0c;就是根据序列中两个记录键值的比较结果来对换这两个记录在序列中的位置&#xff0c;交换排序的特点是&#xff1a;将键值较大的记录向序列的尾…

电脑怎么设置通电自动开机(工控机)

操作系统&#xff1a;win10 第一步&#xff0c;电脑开机时按del键进入bios页面。 第二步&#xff0c;选择advanced下的IT8712 Super IO Configuration 第三步&#xff0c;找到Auto Power On&#xff0c;将其从Power off设置为Power On 第四步&#xff0c;F10保存&#xff0c;大…

LearnOpenGL学习(高级OpenGL -> 高级GLSL,几何着色器,实例化)

高级GLSL 内建变量 顶点着色器 gl_PointSoze : float 输出变量&#xff0c;用于控制渲染 GL_POINTS 型图元时&#xff0c;点的大小。可用于粒子系统。将其设置为 gl_Position.z 时&#xff0c;可以使点的距离越远&#xff0c;大小越大。创建出类似近视眼看远处灯光的效果 gl…

SQL语句错误号:Incorrect integer value: ‘‘ for column ‘poi_id‘ at

SQL语句错误号&#xff1a;Incorrect integer value: for column poi_id at通用解决方案 在MySQL 5.7中&#xff0c;如果你遇到 Incorrect integer value: for column poi_id at row 1 错误&#xff0c;这通常意味着你尝试将一个空字符串插入到需要整数值的字段中。以下是几…

Node.js(v16.13.2版本)安装及环境配置教程

一、进入官网地址下载安装包 https://nodejs.org/zh-cn/download/ 选择对应你系统的Node.js版本&#xff0c;这里我选择的是Windows系统、64位&#xff08;v16.13.2版本&#xff09; 下载后的zip文件 二、解压文件到nodejs&#xff0c;并打开文件夹nodejs&#xff0c;复制解压…

【C++】继承的介绍

继承 1.继承的概念及定义1.1继承的概念&#xff1a;1.2 继承定义1.3继承类模板 2.继承中的函数隐藏3.派生类的默认成员函数4.继承中的切割5.多继承及其菱形继承问题5.1继承模型5.2解决菱形继承问题的方法(虚继承) 6.继承和组合 1.继承的概念及定义 1.1继承的概念&#xff1a; …

指令周期流程图

例题一 例题二 例题三

生成式AI概览与详解

1. 生成式AI概览&#xff1a;什么是大模型&#xff0c;大模型应用场景&#xff08;文生文&#xff0c;多模态&#xff09; 生成式AI&#xff08;Generative AI&#xff09;是指通过机器学习模型生成新的数据或内容的人工智能技术。生成式AI可以生成文本、图像、音频、视频等多种…

设计模式之原型模式:深入浅出讲解对象克隆

~犬&#x1f4f0;余~ “我欲贱而贵&#xff0c;愚而智&#xff0c;贫而富&#xff0c;可乎&#xff1f; 曰&#xff1a;其唯学乎” 原型模式概述 在我们的日常生活中&#xff0c;经常会遇到"复制"这样的场景。比如我们在准备文件时&#xff0c;常常会复印一份原件&a…

集合ArrayList

黑马程序员Java的个人笔记 BV17F411T7Ao p111~p115 目录 集合存储数据类型的特点 创建对象 ArrayList 成员方法 .add 增加元素 .remove 删除元素 .set 修改元素 .get 查询元素 .size 获取长度 基本数据类型对应的包装类 Character 练习 返回多个数据 集合存储…

day10性能测试(2)——Jmeter安装环境+线程组+Jmeter参数化

【没有所谓的运气&#x1f36c;&#xff0c;只有绝对的努力✊】 目录 1、LoadRunner vs Jmeter 1.1 LoadRunner 1.2 Jmeter 1.3 对比小结 2、Jmeter 环境安装 2.1 安装jdk 2.2 安装Jmeter 2.3 小结 3、Jmeter 文件目录结构 4、Jmeter默认配置修改 5、Jmeter元件、组…

【全连接神经网络】核心步骤及其缺陷

前向传播 计算公式&#xff08;其中一种&#xff09; x1/x2&#xff1a;输入值&#xff0c;一般是神经网络上一层的输出或者输入数据本身&#xff0c;上图中表示两个节点w11 w13&#xff1a;权重&#xff0c;在神经网络中&#xff0c;权重是学习的参数&#xff0c;表示每个输入…

自荐一部IT方案架构师回忆录

作者本人毕业于一个不知名大专院校&#xff0c;所读专业计算机科学技术。2009年开始IT职业生涯&#xff0c;至今工作15年。擅长TSQL/Shell/linux等技术&#xff0c;曾经就职于超万人大型集团、国内顶级云厂商、央国企公司。参与过运营商大数据平台、大型智慧城市ICT、云计算、人…

【密码学】SM4算法

一、 SM4算法简介 SM4算法是中国国家密码管理局于2012发布的一种分组密码算法&#xff0c;其官方名称为SMS4&#xff08;SMS4.0&#xff09;&#xff0c;相关标准为GM/T 0002-2012《SM4分组密码算法》。SM4算法的分组长度和密钥长度均为128比特,采用非平衡Feistel结构。采用32…

番外篇 | 关于YOLOv8网络结构中添加注意力机制的常见方法 | Neck网络

前言:Hello大家好,我是小哥谈。注意力机制是一种神经网络模型,它通过赋予输入不同的权重的处理方式,来使得模型对输入信息的处理更加关注重要的部分。注意力机制在自然语言处理、计算机视觉等领域中得到了广泛的应用。🌈 目录 🚀1.基础概念 🚀2.案例说明 案例…

有序集合ZSET【Redis对象篇】

&#x1f3c6; 作者简介&#xff1a;席万里 ⚡ 个人网站&#xff1a;https://dahua.bloggo.chat/ ✍️ 一名后端开发小趴菜&#xff0c;同时略懂Vue与React前端技术&#xff0c;也了解一点微信小程序开发。 &#x1f37b; 对计算机充满兴趣&#xff0c;愿意并且希望学习更多的技…

JSON语法、序列化/反序列化、(JS、JSON、Java对象间转换)、fastjson库、JS内置对象JSON

目录 一、JSON基础。 &#xff08;1&#xff09;什么是JSON&#xff1f; &#xff08;2&#xff09;JSON对象语法。 1、数据结构。 2、键与值的格式。 3、JSON对象在线解析与格式验证网址。 4、JSON对象格式的完整示例。 二、序列化与反序列化。 &#xff08;1&#xff09;序列…