随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。
本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。
支持的引擎
SeaTunnel Zeta
Flink
主要特性
- 批处理
- 流处理
- 精确一次
- 列投影
- 并行度
- 支持用户定义分片
功能描述
MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。
支持的数据源信息
要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。
数据源 | 支持的版本 | 依赖 |
---|---|---|
MongoDB | 通用 | 下载 |
可用性设置
- MongoDB版本:MongoDB 版本 >= 4.0。
- 集群部署:副本集或分片集群。
- 存储引擎:WiredTiger 存储引擎。
- 权限:changeStream 和 read
use admin;
db.createRole(
{
role: "strole",
privileges: [{
resource: { db: "", collection: "" },
actions: [
"splitVector",
"listDatabases",
"listCollections",
"collStats",
"find",
"changeStream" ]
}],
roles: [
{ role: 'read', db: 'config' }
]
}
);
db.createUser(
{
user: 'stuser',
pwd: 'stpw',
roles: [
{ role: 'strole', db: 'admin' }
]
}
);
数据类型映射
以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。
MongoDB BSON 类型 | SeaTunnel 数据类型 |
---|---|
ObjectId | STRING |
String | STRING |
Boolean | BOOLEAN |
Binary | BINARY |
Int32 | INTEGER |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Date | DATE |
Timestamp | TIMESTAMP |
Object | ROW |
Array | ARRAY |
对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。
MongoDB BSON 类型 | SeaTunnel STRING 表示 |
---|---|
Symbol | {"_value": {"$symbol": "12"}} |
RegularExpression | {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}} |
JavaScript | {"_value": {"$code": "function() { return 10; }"}} |
DbPointer | {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}} |
提示
在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。
名称 | 类型 | 必须 | 默认值 | 描述 |
---|---|---|---|---|
hosts | String | 是 | - | MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018 |
username | String | 否 | - | 连接 MongoDB 时使用的数据库用户名。 |
password | String | 否 | - | 连接 MongoDB 时使用的密码。 |
database | List | 是 | - | 要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。 |
collection | List | 是 | - | 数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。 |
connection.options | String | 否 | - | MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。 |
batch.size | Long | 否 | 1024 | 游标批大小。 |
poll.max.batch.size | Enum | 否 | 1024 | 轮询新数据时包含在单个批次中的更改流文档的最大数量。 |
poll.await.time.ms | Long | 否 | 1000 | 等待检查更改流上的新结果之前的时间量。 |
heartbeat.interval.ms | String | 否 | 0 | 发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 |
incremental.snapshot.chunk.size.mb | Long | 否 | 64 | 增量快照的块大小(MB)。 |
common-options | 否 | - | 源插件通用参数,请参考源通用选项获取详情。 |
提示:
- 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
- MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
- 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。
如何创建 MongoDB CDC 数据同步作业
将 CDC 数据打印到客户端
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:
env {
# 您可以在此处设置引擎配置
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
schema = {
fields {
"_id" : string,
"name" : string,
"description" : string,
"weight" : string
}
}
}
}
# 在本地客户端打印读取的 MongoDB 数据
sink {
Console {
parallelism = 1
}
}
将 CDC 数据写入 MysqlDB
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
}
}
sink {
jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
password = "seatunnel"
generate_sink_sql = true
# You need to configure both database and table
database = mongodb_cdc
table = products
primary_keys = ["_id"]
}
}
多表同步
以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory","crm"]
collection = ["inventory.products","crm.test"]
username = stuser
password = stpw
}
}
# Console printing of the read Mongodb data
sink {
Console {
parallelism = 1
}
}
提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。
使用正则表达式匹配多表
以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:
匹配示例 | 表达式 | 描述 |
---|---|---|
前缀匹配 | ^(test).* | 匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。 |
后缀匹配 | .*[p$] | 匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。 |
``` | ||
env { | ||
# You can set engine configuration here | ||
parallelism = 1 | ||
job.mode = "STREAMING" | ||
checkpoint.interval = 5000 | ||
} |
source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }
Console printing of the read Mongodb data
sink { Console { parallelism = 1 } }
### 实时流数据格式
{ _id : {
}, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : "
", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : {
}, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : "
", // The database where the change operation occurred "coll" : "
" // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : "
", // The new database name after the change "coll" : "
" // The new collection name after the change }, "source":{ "ts_ms":"
", // The timestamp when the change operation occurred "table":"
" // The collection where the change operation occurred "db":"
", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" :
}, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : {
}, // The fields and values that the update operation modified "removedFields" : [ "
", ... ] // The fields and values that the update operation removed } "clusterTime" :
, // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" :
, // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" :
, "uid" :
} }
```
到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。
Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!
本文由 白鲸开源科技 提供发布支持!