从零搭建:Canal实时数据管道打通MySQL与Elasticsearch

Canal实时同步Mysql BinlogElasticsearch


文章目录

  • Canal实时同步Mysql **Binlog**至**Elasticsearch**
      • 一. 环境准备
        • 1.环境检查
          • 检查`Mysql`是否开启`BinLog`
          • 开启Mysql Binlog
          • Java环境检查
        • 2.新建测试库和表
        • 3.新建Es索引
      • 二.**部署 Canal Server**
        • **2.1 解压安装包**
        • **2.2 配置 Canal Server**
        • **2.3 启动 Canal Server**
      • **三. 部署 Canal Adapter(同步到 Elasticsearch)**
        • **3.1 配置 Adapter**
        • **3.2 配置数据映射**
        • **3.3 启动 Adapter**
      • **4. 验证同步**
        • **4.1 插入测试数据到 MySQL**
        • **4.2 查询 Elasticsearch**

一. 环境准备

  • 操作系统:Linux(Ubuntu 20.04)
  • Java 环境:JDK 8+(建议 OpenJDK 11)
  • MySQL:已启用 Binlog(ROW 模式),并创建 Canal 用户
  • Elasticsearch:已部署(版本 7.x 或 8.x)
  • Canal 二进制包:从 Canal Release 下载 canal.deployer-1.1.8.tar.gzcanal.adapter-1.1.8.tar.gz
1.环境检查
  • 检查Mysql是否开启BinLog
#root账号执行
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

输出如下证明已经打开:

image-20250211103029832

创建 Canal 用户并授权:

#创建用户
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Password@123';
# 给新创建账户赋予从库权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

# 刷新权限
FLUSH PRIVILEGES;

如果没打开BinLog可以通过如下方法打开:

  • 开启Mysql Binlog

修改my.cnf文件,加入如下内容:

log_bin=mysql-bin
binlog_format=ROW
binlog_expire_logs_seconds=172800
expire_logs_days=2

log_bin:启用二进制日志,日志文件会以 mysql-bin 为前缀,并依次生成日志文件(例如:mysql-bin.000001mysql-bin.000002 等)。

binlog_format:设置使用的二进制日志格式,在 MySQL 8.0 版本中,binlog_format 的默认值已经变为 ROW。所以,即使你在配置文件中没有明确设置 binlog_format,MySQL 会默认使用 ROW 作为二进制日志格式。在较早的 MySQL 版本中默认值是 STATEMENT

binlog_expire_logs_seconds=172800expire_logs_days=2:这些配置设置了二进制日志的过期时间(默认情况下,MySQL 会保留二进制日志,直到它们过期或达到日志文件数的限制)。在这种情况下,日志会在 2 天后过期。

配置好后重启Mysql:

systemctl restart mysqld.service
  • Java环境检查
echo $JAVA_HOME
image-20250211111637904
2.新建测试库和表
 CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;
 
 CREATE TABLE `test_user` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',
  `sex` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
  `tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '电话',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
3.新建Es索引
curl -X PUT "http://<your es IP>:9200/test_user" -H 'Content-Type: application/json' -u <es账号>:<es 密码> -d'
{
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "title": {
        "type": "text"
      },
      "sex": {
        "type": "text"
      },
      "tel": {
        "type": "text"
      }
    }
  }
}
'

二.部署 Canal Server

2.1 解压安装包
# 创建目录
mkdir -p /opt/canal/server /opt/canal/adapter

# 解压 Server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server

# 解压 Adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter
2.2 配置 Canal Server

修改配置文件 /opt/canal/server/conf/canal.properties

# tcp bind ip
canal.ip =127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112

# 目标实例名称(默认 example)
canal.destinations = example

# 持久化模式(默认内存,可选 H2/MySQL)
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml

这里主要修改canal.ip其他保持默认即可。

修改实例配置 /opt/canal/server/conf/example/instance.properties

#被同步的mysql地址,填写自己的IP地址
canal.instance.master.address=127.0.0.1:3306
#第一步创建的数据库从库权限账号/密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Password@123
#数据库连接编码 
canal.instance.connectionCharset = UTF-8 
#Binlog 过滤规则(监控所有库表)
canal.instance.filter.regex=.*\\..*
#指定了 Canal 消费者(比如 MQ 客户端)读取和写入消息的目标主题,保持默认即可
canal.mq.topic=example
2.3 启动 Canal Server
cd /opt/canal/server/bin
./startup.sh

# 查看日志
tail -f /opt/canal/server/logs/canal/canal.log
tail -f /opt/canal/server/logs/example/example.log

image-20250211153418697

image-20250211153400835

image-20250211153538656

可以看到日志没有明显报错,且进程已经启动,则表示Canal Server已经启动成功。

image-20250211153842261

三. 部署 Canal Adapter(同步到 Elasticsearch)

3.1 配置 Adapter

修改配置文件 /opt/canal/adapter/conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
    
canal.conf:
  mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 对应集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批数量
  retries: 0 # 重试次数, -1为无限重试
  timeout: # 同步超时时间, 单位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
 
  srcDataSources: # 源数据库配置
    defaultDScanal是测试数据库
      url: jdbc:mysql://<yourIP>:3306/canal?useUnicode=true&useSSL=true #数据库连接,canal是测试用的数据库
      username: root #数据库账号
      password: Pass@1234 #数据库密码
  canalAdapters: # 适配器列表
  - instance: example # canal实例名,和上述Server的配置一样
    groups: # 分组列表
    - groupId: g1 # 分组id, 如果是MQ模式将用到该值
      outerAdapters:
      - name: logger # 日志打印适配器
      - name: es8 # ES同步适配器根据自己的es版本来
        hosts: <your IP>:9200 # ES连接地址
        properties:
          mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)
          security.auth: elastic:123456 #  连接es的用户和密码,仅rest模式使用
          cluster.name: elasticsearch # ES集群名称

如何获取es集群名称,命令输出的cluster_name就是上面需要配置的集群名字:

curl -u elastic:<esPass> -X GET "http://<es IP>:9200/_cluster/health?pretty"

image-20250211170653195

3.2 配置数据映射

创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml

dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: test_user # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         tb.id AS _id,
         tb.name,
         tb.sex,
         tb.tel
        FROM
         test_user us"        # sql映射
  etlCondition: "where p.id>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小
3.3 启动 Adapter
cd /opt/canal/adapter/bin
./startup.sh

#查看日志
tail -f /opt/canal/adapter/logs/adapter/adapter.log

会输出很多数据库变更的日志:

image-20250211171145018

image-20250211171208031

4. 验证同步

4.1 插入测试数据到 MySQL
#执行sql
INSERT INTO test_user (name, sex, tel) VALUES ('Paco', '男', '123456789');

image-20250211171534121

image-20250211171547780

4.2 查询 Elasticsearch
curl -u elastic:<esPass> -X GET "http://<esIP>:9200/test_user/_search?pretty"

也可以在工具上查看,这边是Eage插件:

image-20250211171753695

image-20250211171808091

至此,即可验证可同步成功。我们可以修改数据测试,看是否能同步。

image-20250211171849802

image-20250211172502715

然后我们测试修改Es的数据:

image-20250211172542248

image-20250211172555087

可以发现数据库并没有变,至此Canal单向实时同步Mysql BinlogElasticsearch就配置完成了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/969306.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Excel 合并列数据

场景 要求每行数据的每个字段的内容不能以 [2,3,33,22] 形式展示 要求独立成列形式如下 代码 maven 依赖 <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>3.17</version></dependency>…

从零到一:基于Rook构建云原生Ceph存储的全面指南(下)

接上篇&#xff1a;《从零到一&#xff1a;基于Rook构建云原生Ceph存储的全面指南&#xff08;上&#xff09;》 链接: link 六.Rook部署云原生CephFS文件系统 6.1 部署cephfs storageclass cephfs文件系统与RBD服务类似&#xff0c;要想在kubernetes pod里使用cephfs&#…

人工智能之深度学习的革命性突破

深度学习的革命性突破 深度学习是机器学习的一个子领域&#xff0c;通过模拟人脑神经网络的结构和功能&#xff0c;实现对复杂数据的高效处理。近年来&#xff0c;深度学习在计算机视觉、自然语言处理、语音识别等领域取得了革命性突破。本文将深入探讨深度学习的核心架构、突…

C#快速排序QuickSort将递归算法修改为堆栈Stack非递归方式

我们知道,方法的调用是采用Stack的方式[后进先出:LIFO], 在DeepSeek中快速搜索C#快速排序, 搜索结果如图: 我们会发现是采用递归的方式 . 递归的优点: 简单粗暴,类似于直接写数学公式,因代码量较少,易于理解.递归与循环迭代的运行次数都是一致的 递归的缺点: 占用大量的内…

Django开发入门 – 3.用Django创建一个Web项目

Django开发入门 – 3.用Django创建一个Web项目 Build A Web Based Project With Django By JacksonML 本文简要介绍如何利用最新版Python 3.13.2来搭建Django环境&#xff0c;以及创建第一个Django Web应用项目&#xff0c;并能够运行Django Web服务器。 创建该Django项目需…

SQL布尔盲注、时间盲注

一、布尔盲注 布尔盲注&#xff08;Boolean-based Blind SQL Injection&#xff09;是一种SQL注入技术&#xff0c;用于在应用程序不直接显示数据库查询结果的情况下&#xff0c;通过构造特定的SQL查询并根据页面返回的不同结果来推测数据库中的信息。这种方法依赖于SQL查询的…

【Python网络爬虫】爬取网站图片实战

【Python网络爬虫】爬取网站图片实战 Scrapying Images on Website in Action By Jackson@ML *声明:本文简要介绍如何利用Python爬取网站数据图片,仅供学习交流。如涉及敏感图片或者违禁事项,请注意规避;笔者不承担相关责任。 1. 创建Python项目 1) 获取和安装最新版…

【docker知识】快速找出服务器中占用内存较高的容器

本文由Markdown语法编辑器编辑完成。 1.背景&#xff1a; 近期在处理现场问题&#xff0c;观察服务器时&#xff0c;会遇到某些进程占用较高内存的情况。由于我们的服务&#xff0c;基本上都是以容器的方式在运行&#xff0c;因此就需要找到&#xff0c;到底是哪个容器&#…

图数据库neo4j进阶(一):csv文件导入节点及关系

CSV 一、load csv二、neo4j-admin import<一>、导入入口<二>、文件准备<三>、命令详解 一、load csv 在neo4j Browser中使用Cypher语句LOAD CSV,对于数据量比较大的情况,建议先运行create constraint语句来生成约束 create constraint for (s:Student) req…

10. Hbase Compaction命令

一. 什么是Compaction 在 HBase 中&#xff0c;频繁进行数据插入、更新和删除操作会生成许多小的 HFile&#xff0c;当 HFile 数量增多时&#xff0c;会影响HBase的读写性能。此外&#xff0c;垃圾数据的存在也会增加存储需求。因此&#xff0c;定期进行 Compact操作&#xff…

【工业场景】用YOLOv8实现火灾识别

火灾识别任务是工业领域急需关注的重点安全事项,其应用场景和背景意义主要体现在以下几个方面: 应用场景:工业场所:在工厂、仓库等工业场所中,火灾是造成重大财产损失和人员伤亡的主要原因之一。利用火灾识别技术可以及时发现火灾迹象,采取相应的应急措施,保障人员安全和…

软件开发 | GitHub企业版常见问题解读

什么是GitHub企业版&#xff1f; GitHub企业版是一个企业级软件开发平台&#xff0c;专为现代化开发的复杂工作流程而设计。 作为可扩展的平台解决方案&#xff0c;GitHub企业版使组织能够无缝集成其他工具和功能&#xff0c;并根据特定需求定制开发环境&#xff0c;提高整体…

CEF132 编译指南 MacOS 篇 - depot_tools 安装与配置 (四)

1. 引言 在 CEF132&#xff08;Chromium Embedded Framework&#xff09;的编译过程中&#xff0c;depot_tools 扮演着举足轻重的角色。这套由 Chromium 项目精心打造的脚本和工具集&#xff0c;专门用于获取、管理和更新 Chromium 及其相关项目&#xff08;包括 CEF&#xff…

NLP Word Embeddings

Word representation One-hot形式 在上一周介绍RNN类模型时&#xff0c;使用了One-hot向量来表示单词的方式。它的缺点是将每个单词视为独立的&#xff0c;算法很难学习到单词之间的关系。 比如下面的例子&#xff0c;即使语言模型已经知道orange juice是常用组合词&#xf…

python实现YouTube关键词爬虫(2025/02/11)

在当今数字化时代&#xff0c;YouTube作为全球最大的视频分享平台之一&#xff0c;拥有海量的视频资源。无论是进行市场调研、内容创作还是学术研究&#xff0c;能够高效地获取YouTube上的相关视频信息都显得尤为重要。今天&#xff0c;我将为大家介绍一个基于Python实现的YouT…

Jenkins 配置 Git Parameter 四

Jenkins 配置 Git Parameter 四 一、开启 项目参数设置 勾选 This project is parameterised 二、添加 Git Parameter 如果此处不显示 Git Parameter 说明 Jenkins 还没有安装 Git Parameter plugin 插件&#xff0c;请先安装插件 Jenkins 安装插件 三、设置基本参数 点击…

自然语言处理NLP入门 -- 第三节词袋模型与 TF-IDF

目标 了解词袋模型&#xff08;BoW&#xff09;和 TF-IDF 的概念通过实际示例展示 BoW 和 TF-IDF 如何将文本转换为数值表示详细讲解 Scikit-learn 的实现方法通过代码示例加深理解归纳学习难点&#xff0c;并提供课后练习和讲解 3.1 词袋模型&#xff08;Bag of Words, BoW&a…

C++模板编程——typelist的实现

文章最后给出了汇总的代码&#xff0c;可直接运行 1. typelist是什么 typelist是一种用来操作类型的容器。和我们所熟知的vector、list、deque类似&#xff0c;只不过typelist存储的不是变量&#xff0c;而是类型。 typelist简单来说就是一个类型容器&#xff0c;能够提供一…

fastadmin 接口请求提示跨域

问题描述 小程序项目&#xff0c;内嵌h5页面&#xff0c;在h5页面调用后端php接口&#xff0c;提示跨域。网上查找解决方案如下&#xff1a; 1&#xff0c;设置header // 在入口文件index.php直接写入直接写入 header("Access-Control-Allow-Origin:*"); header(&q…

只需三步!5分钟本地部署deep seek——MAC环境

MAC本地部署deep seek 第一步:下载Ollama第二步:下载deepseek-r1模型第三步&#xff1a;安装谷歌浏览器插件 第一步:下载Ollama 打开此网址&#xff1a;https://ollama.com/&#xff0c;点击下载即可&#xff0c;如果网络比较慢可使用文末百度网盘链接 注&#xff1a;Ollama是…