最新版Flink CDC MySQL同步Elasticsearch(一)

1.环境准备

首先我们要基于Flink CDC MySQL同步MySQL的环境基础上(flink-1.17.1、Java8、MySQL8)搭建Elasticsearch7-17-10和Kibana 7.17.10。笔者已经搭建好环境,这里不做具体演示了,如果需要Es的搭建教程情况笔者其他博客

注意: 建议生产环境统一使用稳定版本Flink1.16.*。笔者这里只是作为教程编写采用当下最新版本,生产环境不推荐使用

2.编译flink-sql-connector-mysql-cdc

最新版本flink-1.17.1 mysql同步Es具体jar依赖版本如下所示:

注意:下载链接仅适用于稳定版本,SNAPSHOT依赖需要您自己构建。

flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

flink-sql-connector-mysql-cdc-2.5-SNAPSHOT.jar(需要自行进行构建编译,笔者构建的已经上次至次博客。需要可以进行下载,csdn需要积分下载,无法设置免费的,需要免费版可以直接联系笔者)

下载所需的JAR包并放在下面flink-1.17.1/lib/:

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

3.建立mysql和Es映射关系表

使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。
在这里插入图片描述首先,每 3 秒启用一次检查点

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

编辑源数据库Flink Sql代码,如下所示:

CREATE TABLE products (
 id INT NOT NULL,
 name STRING,
 description STRING,
 PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入
 'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的
 'port' = '3306', #源数据库端口
 'username' = 'root',#源数据库账号
 'password' = '*****',#源数据库密码
 'database-name' = 'mydb',#源数据库
 'table-name' = 'products'#源数据库表
);

在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表

-- Flink SQL
Flink SQL> CREATE TABLE products (
>     id INT,
>     name STRING,
>     description STRING,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '192.168.50.163',
>     'port' = '3306',
>     'username' = 'root',
>     'password' = '****',
>     'database-name' = 'mydb',
>     'table-name' = 'products'
>   );

在es创建要同步的目标索引,具体语句如下:

PUT product1
{
  "settings": {
    "number_of_shards": 12,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "integer"
      },
      "name": {
        "type": "keyword"
      },
      "description": {
        "type": "text"
      }
    }
  }
}

编辑目标ES映射Flink Sql代码,如下所示:

   CREATE TABLE product1 (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'elasticsearch-7',#目标ES版本,最新目前支持7
     'hosts' = 'http://192.168.50.236:9200',#连接信息
     'index' = 'product1'#索引信息
 );

注意: 本文Es为测试版本没有配置账号密码,如果有账号密码配置即可 ‘username’ = ‘xxxx’,‘password’=‘xxxx’

建立目标索引与Flink SQL的映射关系,具体语句如下:

-- Flink SQL
 CREATE TABLE product1 (

>     id INT,
>     name STRING,
>     description STRING,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>      'connector' = 'elasticsearch-7',#目标ES版本,最新目前支持7
>      'hosts' = 'http://192.168.50.236:9200',#连接信息
>      'index' = 'product1'#索引信息
>  );

使用Flink SQL添加mysql和Es映射表数据关联关系

-- Flink SQL
Flink SQL> insert into product1 select * from products;

4.时区问题处理

错误:
The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone Etc/UTC. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

解决思路:

  • Flink集群开启NTP服务器 时间同步
  • 把服务器时区改成和数据库一样的时间本文为(Asia/Shanghai)
  • 配置Flink sql的时区为Asia/Shanghai,具体命令如下所示:
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';

注意:这是笔者遇到的问题,具体问题具体解决即可

5.具体实现结果

整体实现结果如下图所示:

Flink 运行任务

在这里插入图片描述

mysql 源数据表数据

在这里插入图片描述

Es目标索引已经数据查询图

在这里插入图片描述至此,笔者的Flink CDC MySQL同步Elasticsearch第一篇讲解完毕,希望能帮助到搭建

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/35236.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JVM源码剖析之Java对象创建过程

关于 "Java的对象创建" 这个话题分布在各种论坛、各种帖子,文章的水平参差不齐。并且大部分仅仅是总结 "面试宝典" 的流程,小部分就是copy其他帖子,极少能看到拿源码作为论证。所以特意写下这篇文章。 版本信息如下&…

Eclipse显示层级目录结构(像IDEA一样)

有的小伙伴使用IDEA习惯了,可能进入公司里面要求使用eclipse,但是eclipse默认目录是并列显示,而不是层级显示。部分人用起来感觉十分不方便。我们可以更改一下设置。 1、打开eclipse,找到这里 2、选择PackagePresentation 3、选…

Github-提交PR指南

1. Fork你将要提交PR的repo 2. 将你fork下来的repo克隆到你的本地 git clone your_repo.git Cloning into ultralytics... remote: Enumerating objects: 8834, done. remote: Counting objects: 100% (177/177), done. remote: Compressing objects: 100% (112/112), done. …

第二步:STM32F407ZGT6资源介绍

1.1 STM32F407ZGT6资源描述 内核: 32位 高性能ARM Cortex-M4处理器 时钟:高达168M,实际还可以超屏一点点 支持FPU(浮点运算)和DSP指令 IO口: STM32F407ZGT6: 144引脚 114个IO 大部分IO口都耐5V(模拟通道除外) …

C# .NET 如何调用 SAP RFC 接口

1.分析传参结构 SAP 传参格式对应 .NET 参数格式 SAP 参数.NET 参数参数类型import(导入)——关联类型为数据元素Param单个变量参数import(导出)——关联类型为结构体Struct结构体tableTable表 下面是 SAP 对应参数类型: 2.web.config 配置 配置文件需要客户端…

win10安装pytorch GPU

我记得以前安装过深度学习库GPU版本, 需要安装cuda什么的,翻了下还真写过一篇win10安装tensorflow的文章,但是流程不止不详细,还不清晰。这次就再记录一遍 这次安装的是pytorch,这么多年似乎pytorch要逐渐统一深度学习…

【算法与数据结构】232、LeetCode用栈实现队列

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引,可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析:这道题要求我们用栈模拟队列(工作上一定没人这么搞)。程序当中,pus…

spring之BeanFactory

spring之BeanFactory DefaultListableBeanFactory示例代码类继承实现结构 BeanFactory是Bean工厂,所以很明显,BeanFactory会负责创建Bean,并且提供获取Bean的API。 DefaultListableBeanFactory 在Spring源码中,BeanFactory接口存…

自定义的车牌号键盘组件

<template><view class"keyboard-wrap" v-if"kbShow"><view class"head"><view class"done" tap"done"><text class"iconfont iconxiala-"></text>关闭</view></vi…

spring boot + Apache tika 实现文档内容解析

Apache tika是Apache开源的一个文档解析工具。Apache Tika可以解析和提取一千多种不同的文件类型(如PPT、XLS和PDF)的内容和格式&#xff0c;并且Apache Tika提供了多种使用方式&#xff0c;既可以使用图形化操作页面&#xff08;tika-app&#xff09;&#xff0c;又可以独立部…

Python实现微信发送文件实例

新建Python文件&#xff1a;wx_file.py&#xff0c;代码如下 # -*- coding: utf-8 -*- # Author : CxiuM # Time : 2023-07-06 10:12 # Name : wx_operation.py"""微信群发消息"""import os import time import subprocessimport requests …

【ElasticSearch】DSL查询语法

文章目录 1、DSL查询分类2、DSL基本语法3、全文检索查询4、精确查询5、地理查询6、复合查询--相关性打分算法7、复合查询之Function Score Query8、复合查询之BooleanQuery 1、DSL查询分类 Elasticsearch提供了基于JSON的DSL&#xff08;Domain Specific Language&#xff09;…

Java版本工程项目管理系统源码

Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目显示…

Librosa库——语音识别,语音音色识别训练及应用

很多同学以为语音识别是非常难的&#xff0c;其实并不然&#xff0c;起初我也是这么认为&#xff0c;但后来发现语音识别是最简单的&#xff0c;因为同学们可能不知道Python有一个音频处理库Librosa&#xff0c;这个库非常的强大&#xff0c;可以进行音频处理、频谱表示、幅度转…

CSS(持续更新!~)

二&#xff1a; 进阶&#xff1a; 只打算起到装饰作用的图片就建议就背景图片 块级标签就是&#xff1a;独占一行的标签&#xff08;比如div&#xff09;并且可以加宽加高 行内元素&#xff1a;就是不会独占一行的标签&#xff08;比如a&#xff0c;span等等&#xff0c;不可以…

TensorFlow项目练手(二)——猫狗熊猫的分类任务

项目介绍 通过猫狗熊猫图片来对图片进行识别&#xff0c;分类出猫狗熊猫的概率&#xff0c;文章会分成两部分&#xff0c;从基础网络模型->利用卷积网络经典模型Vgg。 基础网络模型 基础的网络模型主要是用全连接层来分类&#xff0c;比较经典的方法&#xff0c;也是祖先…

MinGW编译OpenCV 过程记录

1.下载源码opencv-3.4.10.zip &#xff0c;可以在OpenCV官网下载Releases - OpenCV 解压缩如下: 2.下载Mingw64工具&#xff0c;需要支持posix 并设置系统环境目录&#xff0c;下载的文件名x86_64-8.1.0-release-posix-sjlj-rt_v6-rev0.7z (可以在网上找) 3.使用Cmake工具构建…

微信小程序个人中心展示样式(2)

这是之前的详细的看这里 因为这是好多年前写的了&#xff0c;好多人私信我代码有问题。正好今天有时间简单的还原下代码 话不多说先看图(图片样式自己搞奥~~~~我也好久没弄了这就是个参考demo) 以下是一个使用微信小程序开发的个人中心展示详情的示例&#xff1a; 在微信开发…

基于PyQt5的桌面图像调试仿真平台开发(10)色彩矩阵

系列文章目录 基于PyQt5的桌面图像调试仿真平台开发(1)环境搭建 基于PyQt5的桌面图像调试仿真平台开发(2)UI设计和控件绑定 基于PyQt5的桌面图像调试仿真平台开发(3)黑电平处理 基于PyQt5的桌面图像调试仿真平台开发(4)白平衡处理 基于PyQt5的桌面图像调试仿真平台开发(5)…

解决问题:通配符的匹配很全面, 但无法找到元素 ‘context:component-scan‘ 的声明~

异常描述如下&#xff1a; 产生异常原因&#xff1a; 因为在配置文件中没有找到<context:component-scan />元素的声明&#xff0c;解决办法&#xff1a;将XML配置文件中的声明改为下述代码&#xff1a; <beans xmlns"http://www.springframework.org/schema/b…