FlinkX学习

FlinkX学习

FlinkX安装

由于flinkx已经改名chunjun 官网已不存在

(https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档

1、上传并解压

unzip flinkx-1.10.zip -d /usr/local/soft/

2、配置环境变量

FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$FLINKX_HOME/bin:$PATH

3、给bin/flinkx这个文件加上执行权限

 chmod +x flinkx

4、修改配置文件,设置运行端口

vim flinkconf/flink-conf.yaml

## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888

启动

命令行参数选项

  • model
    • 描述:执行模式,也就是flink集群的工作模式
      • local: 本地模式
      • standalone: 独立部署模式的flink集群
      • yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
      • yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster"
    • 必选:否
    • 默认值:local
  • job
    • 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
    • 必选:是
    • 默认值:无
  • pluginRoot
    • 描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
    • 必选:是
    • 默认值:无
  • flinkconf
    • 描述:flink配置文件所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/conf
    • 必选:否
    • 默认值:无
  • yarnconf
    • 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
    • 必选:否
    • 默认值:无
  • flinkLibJar
    • 描述:flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib
    • 必选:否
    • 默认值:无
  • confProp
    • 描述:flink相关参数,如{“flink.checkpoint.interval”:200000}
    • 必选:否
    • 默认值:无
  • queue
    • 描述:yarn队列,如default
    • 必选:否
    • 默认值:无
  • pluginLoadMode
    • 描述:yarnPer模式插件加载方式:
      • classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快
      • shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境
    • 必选:否
    • 默认值:classpath

FlinkX概述

FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

FlinkX是一个基于Flink的批流统一体的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行.

image-20220619223533456.png

FlinkX的简单使用

MySQL2HDFS

场景

将mysql Y1数据库下的Student表数据写入HDFS上的指定路径中

参考文档

mysqlreader:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/reader/mysqlreader.md)

hdfswriter:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/writer/hdfswriter.md)

创建mysql2hdfs.json文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "Student"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "Sid > 05 ",
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "path": "hdfs://master:9000/bigdata30/flinkx/out1",
                        "defaultFS": "hdfs://master:9000",
                        "column": [
                            {
                                "name": "col1",
                                "index": 0,
                                "type": "string"
                            },{
                                "name": "col2",
                                "index": 1,
                                "type": "string"
                            },{
                                "name": "col3",
                                "index": 2,
                                "type": "string"
                            },{
                                "name": "col4",
                                "index": 3,
                                "type": "string"
                            }
                        ],
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

运行模式

  • 单机模式:对应Flink集群的单机模式
  • standalone模式:对应Flink集群的分布式模式
  • yarn模式:对应Flink集群的yarn模式
  • yarnPer模式: 对应Flink集群的Per-job模式

运行:

flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

监听日志:

flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件

tail -f nohup.out

通过web界面查看任务运行情况

http://master:8888

hdfs上出现文件:

image.png

查看该文件:

hdfs dfs -cat /bigdata30/flinkx/out1/0.44b7d6c8dcaadcc14ae55fb482f9fb27.0

出现Sid大于05的学生:

image.png

MySQLToHive

hivewrite:(https://github.com/oceanos/flinkx/blob/1.8_release/docs/hivewriter.md)

配置文件:

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "Student"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "Sid > 05 ",
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hivewriter",
                    "parameter": {
                        "jdbcUrl": "jdbc:hive2://master:10000/bigdata30",
                        "username": "",
                        "password": "",
                        "fileType": "text",
                        "fieldDelimiter": ",",
                        "writeMode": "overwrite",
                        "compress": "",
                        "charsetName": "UTF-8",
                        "maxFileSize": 1073741824,
                        "tablesColumn": "{\"Student\":[{\"key\":\"SId\",\"type\":\"string\"},{\"key\":\"Sname\",\"type\":\"string\"},{\"key\":\"Sage\",\"type\":\"string\"},{\"key\":\"Ssex\",\"type\":\"string\"}]}",
                        "defaultFS": "hdfs://master:9000"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

在hive中建表:

CREATE TABLE `bigdata30`.`Student`(
    `SId` STRING,
    `Sname` STRING,
    `Sage` STRING,
    `Ssex` STRING
				)
PARTITIONED BY ( 
  `pt` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',';

启动hiveserver2

启动任务

flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

运行发现报错 无法解决。

翻阅chunjun官网 在hive-sink中发现 只支持hive1.x hive2.x 现hive版本为3.1.2 不支持 猜测报错原因

尝试使用chunjun 解决

MySQLToHBase

场景

将mysql Y1数据库中的Student表数据写入HBase flinkx_Student表中

配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "Student"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "Sid > 05 ",
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hbasewriter",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.property.clientPort": "2181",
                            "hbase.rootdir": "hdfs://master:9000/hbase",
                            "hbase.cluster.distributed": "true",
                            "hbase.zookeeper.quorum": "master,node1,node2",
                            "zookeeper.znode.parent": "/hbase"
                        },
                        "table": "flinkx_Student",
                        "rowkeyColumn": "$(cf1:SId)",
                        "column": [
                            {
                                "name": "cf1:SId",
                                "type": "string"
                            },
                            {
                                "name": "cf1:Sname",
                                "type": "string"
                            },
                            {
                                "name": "cf1:Sage",
                                "type": "string"
                            },
                            {
                                "name": "cf1:Ssex",
                                "type": "string"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

在hbase中创建flinkx_Student表

create 'flinkx_Student','cf1'

启动

flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

hbase中的flinkx_Student表出现数据

image.png

MySQLToMySQL

场景

将mysql Y1数据库中的Student表数据写入datax1数据库中的Student2表中

配置文件 mysql2mysql.json

{
    "job": {
      "content": [
        {
          "reader": {
            "name": "mysqlreader",
            "parameter": {
              "column": [
                {
                  "name": "SId",
                  "type": "string"
                },
                {
                  "name": "Sname",
                  "type": "string"
                },
                {
                  "name": "Sage",
                  "type": "string"
                },
                {
                  "name": "Ssex",
                  "type": "string"
                }
              ],
              "username": "root",
              "password": "123456",
              "connection": [
                {
                  "jdbcUrl": [
                    "jdbc:mysql://master:3306/Y1?useSSL=false"
                  ],
                  "table": [
                    "Student"
                  ]
                }
              ]
            }
          },
          "writer": {
            "name": "mysqlwriter",
            "parameter": {
              "username": "root",
              "password": "123456",
              "connection": [
                {
                  "jdbcUrl": "jdbc:mysql://master:3306/datax1?useSSL=false",
                  "table": [
                    "Student2"
                  ]
                }
              ],
              "writeMode": "insert",
              "column": [
                {
                    "name": "SId",
                    "type": "string"
                  },
                  {
                    "name": "Sname",
                    "type": "string"
                  },
                  {
                    "name": "Sage",
                    "type": "string"
                  },
                  {
                    "name": "Ssex",
                    "type": "string"
                  }
              ]
            }
          }
        }
      ],
      "setting": {
        "speed": {
          "channel": 1,
          "bytes": 0
        }
      }
    }
  }

在mysql datax1数据库中建表:

create table if not exists datax1.Student2(
    SID varchar(10),
    Sname varchar(100),
    Sage varchar(100),
    Ssex varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;

运行:

flinkx -mode local -job ./mysql2mysql.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

进入网页查看:

master:8888

image.png

查看Student2表 数据已导入:

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/762387.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于Java实现图像浏览器的设计与实现

图像浏览器的设计与实现 前言一、需求分析选题意义应用意义功能需求关键技术系统用例图设计JPG系统用例图图片查看系统用例图 二、概要设计JPG.javaPicture.java 三、详细设计类图JPG.java UML类图picture.java UML类图 界面设计JPG.javapicture.java 四、源代码JPG.javapictur…

Eclipse 2024最新版本分享

一、软件介绍 Eclipse是一个开源的、基于Java的可扩展开发平台,最初由IBM公司开发,后于2001年贡献给开源社区,并由Eclipse基金会负责管理和开发。 如果在官网上下载比较慢,可以试试从云盘中下载,解压即可使用。 二、下…

免费开源的后端API服务-supabase安装和使用-简直是前端学习者福音

文章目录 它是什么安装和部署关于安装关于部署1、注册用户2、创建组织3、创建项目 创建数据库表(填充内容)填充数据库表 使用postman联调API 它是什么 一个开源免费的后端框架,firebase的替代品。可以简单理解类似于headless cms&#xff0c…

【Llama 2的使用方法】

Llama 2是Meta AI(Facebook的母公司Meta的AI部门)开发并开源的大型语言模型系列之一。Llama 2是在其前身Llama模型的基础上进行改进和扩展的,旨在提供更强大的自然语言处理能力和更广泛的应用场景。 以下是Llama 2的一些关键特性和更新点&am…

【SGX系列教程】(三)Intel-SGX 官方示例分析(SampleCode)——SampleEnclave

文章目录 一. 引言二. README2.1 项目目的2.2 构建和执行示例代码的步骤2.3 配置参数解释2.4 配置文件分析2.5 启动令牌初始化 三. 重点代码分析3.1 App文件夹3.1.1 App/App.cpp3.1.2 App/Edger8rSyntax文件夹3.1.2.1 App/Edger8rSyntax/Arrays.cpp3.1.2.2 App/Edger8rSyntax/F…

js实现blockly后台解释器,可以单步执行,可以调用c/c++函数

实现原理 解析blockly语法树,使用js管理状态,实际使用lua执行,c/c函数调用使用lua调用c/c函数的能力 可以单行执行 已实现if功能 TODO for循环功能 函数功能 单步执行效果图 直接执行效果图 源代码 //0 暂停 1 单步执行 2 断点 //创建…

Lipschitz 连续,绝对连续

1. Lipschitz 连续 经常听到这个名词, Lipschitz 连续比普通连续更强,不仅要求函数连续,还要求函数的梯度小于一个正实数。 在单变量实数函数上的定义可以是: 对于定义域内任意两个 x 1 x_1 x1​ and x 2 x_2 x2​, 存在一个…

AI与EHS管理结合:融合创新,赋能绿色安全生产

随着科技的不断进步,人工智能AI已经在我们的日常生活中扮演了重要角色。在环保、健康和安全这个重要领域,也就是我们常说的EHS管理中,AI也正发挥着神奇的作用。 咱们知道,一个公司要想好好运转,确保工人安全、保护环境…

SpringBoot实现图片添加水印

提示&#xff1a;今日完成图片添加水印功能 后续可能还会继续完善这个功能 文章目录 目录 文章目录 前端部分 后端 Xml Controller层 Sercive层 Service实现层 Config配置层 application.properties 文件后缀名获取 常量定义 前端部分 <!DOCTYPE html> <htm…

NC13611 树(dfs序+区间dp)

链接 思路&#xff1a; 容易知道对于同一种颜色的子图一定是仅由该颜色的点连通的。设我们要划分的个数为x&#xff08;x<k&#xff09;&#xff0c;也就是说我们要选出x-1条边&#xff0c;这里有种情况。那么我们需要选出x种颜色&#xff0c;这里有种情况。然后我们需要将…

samba服务的搭建与使用

关闭selinux #暂时关闭selinux 查看selinux状态 [rootlocalhost ~]# getenforce Disabled [rootlocalhost ~]# 如果此处是‘enforcing’&#xff0c;则执行下列代码 [rootlocalhost ~]# setenforce 0 再次查看selinux状态 [rootlocalhost ~]# getenforce permissive #永久关…

MySQL 常见存储引擎详解(一)

本篇主要介绍MySQL中常见的存储引擎。 目录 一、InnoDB引擎 简介 特性 最佳实践 创建InnoDB 存储文件 二、MyISAM存储引擎 简介 特性 创建MyISAM表 存储文件 存储格式 静态格式 动态格式 压缩格式 三、MEMORY存储引擎 简介 特点 创建MEMORY表 存储文件 内…

Ubuntu 24.04-自动安装-Nvidia驱动

教程 但在安全启动模式下可能会报错。 先在Nvidia官网找到GPU对应的驱动版&#xff0c; 1. 在软件与更新中选择合适的驱动 2. ubuntu自动安装驱动 sudo ubuntu-drivers autoinstall显示驱动 ubuntu-drivers devices3. 安装你想要的驱动 sudo apt install nvidia-driver-ve…

【UE 网络】多人游戏开发时应该如何区分客户端逻辑和服务端逻辑 入门篇

目录 0 引言1 服务器和客户端逻辑1.1 服务器职责1.2 客户端职责 2 函数会在客户端执行还是服务端&#xff1f;2.1 只在客户端执行的函数RepNotifyClient RPCMulticast RPC 2.2 只在服务端执行的函数GameModeServer RPC 2.3 在两端都可以执行的函数GetNetMode() 和 HasAuthority…

结构体------“成绩排序”---冒泡----与“输出最高成绩”区别

从大到小或者从小到大排序----冒泡排序---双重循环i,j 比较的时候用的是 排序的时候用的是整体 stu [ j1 ] 和 stu [ j ] 我写错为下面这个&#xff0c;交换的只是学生的出生日期&#xff0c;没有交换整体 #include<stdio.h> #include<string.h>struct student{ch…

EKF+UKF+CKF+PF的效果对比|三维非线性滤波|MATLAB例程

前言 标题里的EKF、UKF、CKF、PF分别为&#xff1a;扩展卡尔曼滤波、无迹卡尔曼滤波、容积卡尔曼滤波、粒子滤波。 EKF是扩展卡尔曼滤波&#xff0c;计算快&#xff0c;最常用于非线性状态方程或观测方程下的卡尔曼滤波。 但是EKF应对强非线性的系统时&#xff0c;估计效果不如…

使用 go-control-plane 自定义服务网格控制面

写在前面 阅读本文需要最起码了解envoy相关的概念 本文只是一个类似于demo的测试&#xff0c;只为了学习istio&#xff0c;更好的理解istio中的控制面和数据面&#xff08;pilot -> proxy&#xff09;是如何交互的&#xff0c;下图的蓝色虚线 先说go-control-plane是什么…

Linux——移动文件或目录,查找文件,which命令

移动文件或目录 作用 - mv命令用于剪切或重命名文件 格式 bash mv [选项] 源文件名称 目标文件名称 注意 - 剪切操作不同于复制操作&#xff0c;因为它会把源文件删除掉&#xff0c;只保留剪切后的文件。 - 如果在同一个目录中将某个文件剪切后还粘贴到当前目录下&#xff0c;…

onnx模型转rknn到部署

简介 最近开始用3568的板子&#xff0c;之前是在用3399&#xff0c;cpu的话3399比3568强&#xff0c;但是3568有1T的npu算力&#xff0c;所以模型移植过来用npu使用&#xff0c;之前用ncnn感觉太慢了&#xff0c;rk的npu使用没有开源&#xff0c;所以没法兼容&#xff0c;只能跑…

基于pycharm对每个工程配置python环境

目录 1 生成环境2 配置pycharm 1 生成环境 设定一个存放虚拟环境的目录&#xff0c;比如可以放在如下目录下&#xff1a; /Users/Name/PycharmProjects/env 然后生成虚拟环境&#xff0c;执行如下操作&#xff1a; python3 -m venv /Users/Name/PycharmProjects/env/agent_pr…