MySQL数据迁移至ClickHouse
一、生成测试数据表和数据 1.在MySQL创建数据表和数据 2.在ClickHouse创建数据表
二、生成模板文件 1.模板文件内容 2.模板文件参数详解 2.1 全局设置 2.2 数据读取(Reader) 2.3 数据写入(Writer) 2.4 性能设置
三、案例
一、生成测试数据表和数据
1.在MySQL创建数据表和数据
# 1.创建数据库
CREATE DATABASE test charset=utf8mb4;
USE test;
# 2.创建表
CREATE TABLE User (
userId INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
registrationDate DATETIME NOT NULL,
lastLogin DATETIME,
createTime DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 创建时间
updateTime DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP -- 更新时间
);
# 3.插入测试数据
INSERT INTO test.`User` (username, email, registrationDate, lastLogin) VALUES
('JohnDoe01', 'john.doe01@example.com', '2023-02-01 08:00:00', '2023-02-02 09:00:00'),
('JaneDoe02', 'jane.doe02@example.com', '2023-02-02 10:00:00', '2023-02-03 11:00:00'),
('MikeSmith03', 'mike.smith03@example.com', '2023-02-03 12:00:00', '2023-02-04 13:00:00'),
('LucyBrown04', 'lucy.brown04@example.com', '2023-02-04 14:00:00', '2023-02-05 15:00:00'),
('DavidWilson05', 'david.wilson05@example.com', '2023-02-05 16:00:00', '2023-02-06 17:00:00'),
('LindaTaylor06', 'linda.taylor06@example.com', '2023-02-06 18:00:00', '2023-02-07 19:00:00'),
('RobertJones07', 'robert.jones07@example.com', '2023-02-07 20:00:00', '2023-02-08 21:00:00'),
('PatriciaWhite08', 'patricia.white08@example.com', '2023-02-08 22:00:00', '2023-02-09 23:00:00'),
('MichaelHarris09', 'michael.harris09@example.com', '2023-02-09 08:30:00', '2023-02-10 09:30:00'),
('SarahMartin10', 'sarah.martin10@example.com', '2023-02-10 10:30:00', '2023-02-11 11:30:00');
# 4.批量插入100w数据
# 4.1 创建存储过程
DELIMITER $$
CREATE PROCEDURE InsertUsers()
BEGIN
DECLARE i INT DEFAULT 0;
WHILE i < 1000000 DO
INSERT INTO User (username, email, registrationDate, lastLogin) VALUES
(CONCAT('User', LPAD(i, 7, '0')), CONCAT('user', LPAD(i, 7, '0'), '@example.com'), NOW(), NOW());
SET i = i + 1;
END WHILE;
END$$
DELIMITER ;
# 4.2 调用存储过程,生成100w用户数据
CALL InsertUsers();
2.在ClickHouse创建数据表
CREATE TABLE User (
userId Int32,
username String,
email String,
registrationDate DateTime,
lastLogin Nullable(DateTime),
createTime DateTime, -- 创建时间
updateTime DateTime -- 更新时间
) ENGINE = MergeTree()
ORDER BY userId;
二、生成模板文件
当前安装DataX的目录为:/data/datax
# 1.进入datax的工具目录
cd /data/datax/bin/
# 2.生成模板
python datax.py -r mysqlreader -w clickhousewriter > ../job/mysql_to_clickhouse.json
1.模板文件内容
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"batchByteSize": 134217728,
"batchSize": 65536,
"column": [
"col1",
"col2",
"col3"
],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://<host>:<port>[/<database>]",
"table": [
"table1",
"table2"
]
}
],
"dryRun": false,
"password": "password",
"postSql": [],
"preSql": [],
"username": "username",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
2.模板文件参数详解
2.1 全局设置
job: 定义了整个数据迁移作业的配置。
content: 包含了一个或多个数据同步任务的列表。
2.2 数据读取(Reader)
reader: 定义了数据来源的相关配置。
name: 使用的读取插件名称,这里是mysqlreader,表示从MySQL数据库读取数据。 parameter: 读取数据时的参数配置。
column: 需要读取的列名列表。这里指定了从MySQL表中读取userId, username, email, registrationDate, lastLogin这几个字段。 connection: 数据库连接信息。
jdbcUrl: 数据库的JDBC连接URL。需要替换<your_mysql_host>, <your_mysql_port>, <your_mysql_database>为实际的MySQL服务器地址、端口和数据库名。 table: 指定要读取数据的表名列表,在这个例子中是User表。 password: 用于连接MySQL数据库的密码。 username: 用于连接MySQL数据库的用户名。 where: 可以指定一个WHERE条件来过滤读取的数据,这里留空表示不过滤,读取所有数据。
2.3 数据写入(Writer)
writer: 定义了数据目的地的相关配置。
name: 使用的写入插件名称,这里是clickhousewriter,表示数据将被写入到ClickHouse数据库。 parameter: 写入数据时的参数配置。
batchByteSize: 指定每个批次写入的最大字节数。这里设置为134217728,约等于128MB。 batchSize: 指定每个批次写入的记录数。这里设置为65536。 column: 指定写入到目标表的列名。应与读取的列对应。 connection: 目标数据库的连接信息。
jdbcUrl: ClickHouse的JDBC连接URL。需要替换, , [/]为实际的ClickHouse服务器地址、端口和数据库名。 table: 指定要写入数据的表名,在这个例子中是User表。 dryRun: 是否进行干运行(不实际写入数据)。这里设置为false,表示将实际执行数据写入。 password: 用于连接ClickHouse数据库的密码。 postSql: 在数据写入完成后执行的SQL语句列表,这里留空。 preSql: 在数据写入前执行的SQL语句列表,这里留空。 username: 用于连接ClickHouse数据库的用户名。 writeMode: 写入模式,这里设置为insert,表示通过INSERT语句进行数据写入。
2.4 性能设置
setting: 定义了作业的全局设置。
speed: 控制数据同步的速度。
channel: 指定并发通道的数量,这里设置为4,意味着数据迁移任务将并行执行,使用4个并发通道。
三、案例
1.全量数据迁移
1.1 配置迁移模板
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"userId",
"username",
"email",
"registrationDate",
"lastLogin",
"createTime",
"updateTime"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://192.168.86.128:3306/test?useUnicode=true&characterEncoding=utf-8"],
"table": [
"User"
]
}
],
"password": "xxx",
"username": "root",
"where": ""
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"batchByteSize": 134217728,
"batchSize": 65536,
"column": [
"userId",
"username",
"email",
"registrationDate",
"lastLogin",
"createTime",
"updateTime"
],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://192.168.86.128:8123/default",
"table": [
"User"
]
}
],
"dryRun": false,
"password": "qwe123",
"postSql": [],
"preSql": [],
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": 4
}
}
}
}
1.2.运行迁移命令
python /data/datax/bin/datax.py /data/datax/job/mysql_to_clickhouse.json
2.增量数据迁移
主要差别在于,需要有一个createTime字段,代表源数据的创建时间,那么更新的时候,只迁移过滤这个时间段的数据,达到增量数据迁移
2.1 配置迁移模板
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"userId",
"username",
"email",
"registrationDate",
"lastLogin",
"createTime",
"updateTime"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://192.168.86.128:3306/test?useUnicode=true&characterEncoding=utf-8"],
"table": [
"User"
]
}
],
"password": "qwe123",
"username": "root",
"where": "createTime>='${startDatetime} 00:00:00' and createTime<='${endDatetime} 23:59:59'"
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"batchByteSize": 134217728,
"batchSize": 65536,
"column": [
"userId",
"username",
"email",
"registrationDate",
"lastLogin",
"createTime",
"updateTime"
],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://192.168.86.128:8123/default",
"table": [
"User"
]
}
],
"dryRun": false,
"password": "qwe123",
"postSql": [],
"preSql": [],
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": 4
}
}
}
}
2.2 运行迁移命令
python /data/datax/bin/datax.py /data/datax/job/mysql_to_clickhouse.json -p "-DstartDatetime=2024-02-09 -DendDatetime=2024-02-10"