使用Canal将MySQL数据同步到ES(Linux)

一、Canal官网文档

去到官方文档根据官网文档进行操作:

QuickStart · alibaba/canal Wiki · GitHub

二、开启服务器中MySQL的binlog

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

进行修改:

注意注意:记得修改后一定要重启MySQL,我就是因为忘记重启了找了一天的bug ,不记得重启会出现后面第八点那个问题。

 登录root

mysql -uroot -p
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

三、将canal-deployer下载到服务器

Releases · alibaba/canal · GitHub

cd /tmp/canal
tar zxvf canal.deployer-1.1.8-SNAPSHOT.tar.gz

解压后:

四、修改对应配置文件

五、启动canal 服务端

sh bin/startup.sh

参考:linux下启动Nacos报错解决:which: no javac in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin)_java_silence_fengxuan-GitCode 开源社区

 这个是我服务器jdk下载的位置,去找到自己对应的位置即可

编辑下面这个文件

在这个文件内添加下面信息

export JAVA_HOME=/www/server/java/jdk-17.0.8/          
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH

再次执行启动命令

查看输出日志:

启动成功!服务端启动成功!

六、客户端        

查看官方使用例子

ClientExample · alibaba/canal Wiki · GitHub

引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

code如下:

修改后启动:

然后我插入一条数据,也是监听到了。

 测试成功。

七、同步ES(Sync ES)

 

大概需要注意这几个东西。

下面给出我的部分配置:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #记得去防火墙开启这个端口
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
     url: jdbc:mysql://你的服务器IP:3306/你的DataSource名?useUnicode=true&useSSL=false&serverTimezone=UTC
     username: canal
     password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: http://你的服务器IP:9200
        properties: 
         mode: rest #9200端口的话就写rest
         cluster.name: elasticsearch

datasourceKey: defaultDS #源数据源的key,对应上面配置的srcDatasources中的值
destination: example #canal的instance或者MO的topic
groupId: g1 #对应MQ模式下的groupId,只会同步对应groupId的数据
esMapping:
  _index: user #es 的索引名称
  _id: _id #es 的 id,如果不配置该项必须配置下面的pk项 id则会由es自动分配
# 下面写你的MySQL需要配置表的查询语句(完整的语句)
  sql: "SELECT
    u.id AS _id,
    u.id ,
    u.openid,
    u.quick_user_id,
    u.mark_number,
    u.brief_introduction
FROM
    user u;" # sq1映射
commitBatch: 500 # 提交批大小

开启服务:

1.进入对应目录

cd /tmp/canal/canal.adapter

2.启动服务

sh bin/startup.sh

再去查看log日志:

启动成功

3.如果想要停止服务

 sh bin/stop.sh

七、测试同步

新增一条数据

更新一条数据

修改同步成功,注意上面一个配置文件中我写的查询语句并不完整,实际上是需要完整的,注意和自己表的查询语句一致,注意注意。。

 

八、曾遇过的bug

如果出现下面这种情况,可能是修改MySQL后没有重启MySQL。。。实习的时候被这这个问题折磨了一天,搜了各种办法,很逆天。

DEBuG c.a.0.canal.client.adapter.es,core.service.Essyncservice -ml: {"data":null,"database":"quick pickup","destination": "example","es":1732938564088,"groupId":"g1" "isDd1":false,"old":nul1,"okNames":[1,"sal

最后去重启了一下,结果发现成功了,成功监听到信息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/926814.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CENet及多模态情感计算实战

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

基于深度学习和卷积神经网络的乳腺癌影像自动化诊断系统(PyQt5界面+数据集+训练代码)

乳腺癌是全球女性中最常见的恶性肿瘤之一&#xff0c;早期准确诊断对于提高生存率具有至关重要的意义。传统的乳腺癌诊断方法依赖于放射科医生的经验&#xff0c;然而&#xff0c;由于影像分析的复杂性和人类判断的局限性&#xff0c;准确率和一致性仍存在挑战。近年来&#xf…

【热门主题】000074 深度学习模型:探索与应用

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 【热…

MacOS使用VSCode编写C++程序如何配置clang编译环境

前言 这段时间在练习写C和Python&#xff0c;用vscode这个开发工具&#xff0c;调试的时候遇到一些麻烦&#xff0c;浪费了很多时间&#xff0c;因此整理了这个文档。将详细的细节描述清楚&#xff0c;避免与我遇到同样问题的人踩坑。 1.开发环境的配置 vscode的开发环境配置…

Scala关于成绩的常规操作

score.txt中的数据&#xff1a; 姓名&#xff0c;语文&#xff0c;数学&#xff0c;英语 张伟&#xff0c;87&#xff0c;92&#xff0c;88 李娜&#xff0c;90&#xff0c;85&#xff0c;95 王强&#xff0c;78&#xff0c;90&#xff0c;82 赵敏&#xff0c;92&#xff0c;8…

【实战】在Koa.js中实现文件上传的接口 (本地存储)

目录 环境准备 使用 koa-body 中间件获取上传的文件 使用 Postman 测试 使用 koa-static 中间件生成图片链接 编写前端页面上传文件 文件上传是一个基本的功能&#xff0c;每个系统几乎都会有&#xff0c;比如上传图片、上传Excel等。那么在Node Koa应用中如何实现一个支持…

使用html语言完成拼多多移动端导航栏的设计-大连东软信息学院计算机科学与技术专业高级网页设计基础课题

目录 前言 一、效果图 二、图标的使用 三、代码的编写 四、运行效果 五、文档编写 前言 1.本文所讲内容来自辽宁大连东软信息学院计算机与技术专业高级网页设计&#xff08;专升本&#xff09;课程期中四级项目课题之一&#xff0c;题目要求是自主选择相应的APP移动端&…

从语法、功能、社区和使用场景来比较 Sass 和 LESS

一&#xff1a;可以从语法、功能、社区和使用场景来比较 Sass 和 LESS&#xff1a; 1&#xff1a;语法 原始的 Sass 采用的是缩进而不是大括号&#xff0c;后续的 Sass 版本与 LESS 一样使用与 CSS 类似的语法&#xff1a; address {.fa.fa-mobile-phone {margin: 0 3px 0 2…

7. 现代卷积神经网络

文章目录 7.1. 深度卷积神经网络&#xff08;AlexNet&#xff09;7.2. 使用块的网络&#xff08;VGG&#xff09;7.3. 网络中的网络&#xff08;NiN&#xff09;7.4. 含并行连结的网络&#xff08;GoogLeNet&#xff09;7.5. 批量规范化7.5.1. 训练深层网络7.5.2. 批量规范化层…

sqlmap详细使用

SQLmap使用详解 SQLmap&#xff08;常规&#xff09;使用步骤 1、查询注入点 python sqlmap.py -u http://127.0.0.1/sqli-labs/Less-1/?id12、查询所有数据库 python sqlmap.py -u http://127.0.0.1/sqli-labs/Less-1/?id1 --dbs3、查询当前数据库 python sqlmap.py -u htt…

React+TS+css in js 练习

今天分享的内容是动态规划的经典问题--0-1 背包问题 0-1背包问题的描述如下:给定一组物品,每种物品都有自己的重量和价值,背包的总容量是固定的。我们需要从这些物品中挑选一部分,使得背包内物品的总价值最大,同时不超过背包的总容量。 举个例子&#xff1a;假设这组物品的质量…

刷题日常(找到字符串中所有字母异位词,​ 和为 K 的子数组​,​ 滑动窗口最大值​,全排列)

找到字符串中所有字母异位词 给定两个字符串 s 和 p&#xff0c;找到 s 中所有 p 的 异位词的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 题目分析&#xff1a; 1.将p里面的字符先丢进一个hash1中&#xff0c;只需要在S字符里面找到多少个和他相同的has…

《C++ Primer Plus》学习笔记|第8章 函数探幽 (24-11-30更新)

文章目录 8.1 内联函数8.2 引用变量8.2.1 创建引用变量8.2.2 将引用用作函数参数8.2.3 引用的属性和特别之处特点1&#xff1a;在计算过程中&#xff0c;传入的形参的值也被改变了。特点2&#xff1a;使用引用的函数参数只接受变量&#xff0c;而不接受变量与数值的运算左值引用…

[2024年1月28日]第15届蓝桥杯青少组stema选拔赛C++中高级(第二子卷、编程题(1))

参考程序&#xff1a; #include <iostream> #include <algorithm> // 用于 std::sortusing namespace std;int main() {int a, b, c;cin >> a >> b >> c;// 将三个数放入一个数组中int arr[3] {a, b, c};// 对数组进行排序sort(arr, arr 3);…

基于hexo框架的博客搭建流程

这篇博文讲一讲hexo博客的搭建及文章管理&#xff0c;也算是我对于暑假的一个交代 &#xff01;&#xff01;&#xff01;注意&#xff1a;下面的操作是基于你已经安装了node.js和git的前提下进行的&#xff0c;并且拥有github账号 创建一个blog目录 在磁盘任意位置创建一个…

基于Java Springboot传统戏曲推广微信小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 微信…

数据结构--树二叉树顺序结构存储的二叉树(堆)

前言 前面我们学习了顺序表、链表、栈和队列&#xff0c;这些都是线性的数据结构。今天我们要来学习一种非线性的数据结构——树。 树的概念及结构 树的概念 树是一种非线性的数据结构&#xff0c;是由n&#xff08;n≥0&#xff09;个有效结点组成的一个具有层次关系的集合…

网络安全运行与维护 加固练习题

1. 提交用户密码的最小长度要求。 输入代码: cat /etc/pam.d/common-password 提交答案: flag{20} 2.提交iptables配置以允许10.0.0.0/24网段访问22端口的命令。 输入代码: iptables -A INPUT -p tcp -s 10.0.0.0/24 --dport 22 -j ACCEPT 提交答案: flag{iptables -A I…

【汇编语言】call 和 ret 指令(三) —— 深度解析汇编语言中的批量数据传递与寄存器冲突

文章目录 前言1. 批量数据的传递1.1 存在的问题1.2 如何解决这个问题1.3 示例演示1.3.1 问题说明1.3.2 程序实现 2. 寄存器冲突问题的引入2.1 问题引入2.2 分析与解决问题2.2.1 字符串定义方式2.2.2 分析子程序功能2.2.3 得到子程序代码 2.3 子程序的应用2.3.1 示例12.3.2 示例…

Java 泛型详细解析

泛型的定义 泛型类的定义 下面定义了一个泛型类 Pair&#xff0c;它有一个泛型参数 T。 public class Pair<T> {private T start;private T end; }实际使用的时候就可以给这个 T 指定任何实际的类型&#xff0c;比如下面所示&#xff0c;就指定了实际类型为 LocalDate…