Apache SeaTunnel MongoDB CDC 使用指南

随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。

file

本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • 批处理
  • 流处理
  • 精确一次
  • 列投影
  • 并行度
  • 支持用户定义分片

功能描述

MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。

支持的数据源信息

要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。

数据源支持的版本依赖
MongoDB通用下载

可用性设置

  1. MongoDB版本:MongoDB 版本 >= 4.0。
  2. 集群部署:副本集或分片集群。
  3. 存储引擎:WiredTiger 存储引擎。
  4. 权限:changeStream 和 read
use admin;
db.createRole(
    {
        role: "strole",
        privileges: [{
            resource: { db: "", collection: "" },
            actions: [
                "splitVector",
                "listDatabases",
                "listCollections",
                "collStats",
                "find",
                "changeStream" ]
        }],
        roles: [
            { role: 'read', db: 'config' }
        ]
    }
);

db.createUser(
  {
      user: 'stuser',
      pwd: 'stpw',
      roles: [
         { role: 'strole', db: 'admin' }
      ]
  }
);

数据类型映射

以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。

MongoDB BSON 类型SeaTunnel 数据类型
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
Int32INTEGER
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateDATE
TimestampTIMESTAMP
ObjectROW
ArrayARRAY

对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。

MongoDB BSON 类型SeaTunnel STRING 表示
Symbol{"_value": {"$symbol": "12"}}
RegularExpression{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript{"_value": {"$code": "function() { return 10; }"}}
DbPointer{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}
提示

在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。

名称类型必须默认值描述
hostsString-MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018
usernameString-连接 MongoDB 时使用的数据库用户名。
passwordString-连接 MongoDB 时使用的密码。
databaseList-要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。
collectionList-数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。
connection.optionsString-MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。
batch.sizeLong1024游标批大小。
poll.max.batch.sizeEnum1024轮询新数据时包含在单个批次中的更改流文档的最大数量。
poll.await.time.msLong1000等待检查更改流上的新结果之前的时间量。
heartbeat.interval.msString0发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。
incremental.snapshot.chunk.size.mbLong64增量快照的块大小(MB)。
common-options-源插件通用参数,请参考源通用选项获取详情。

提示:

  • 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
  • MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
  • 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。

如何创建 MongoDB CDC 数据同步作业

将 CDC 数据打印到客户端

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:

env {
  # 您可以在此处设置引擎配置
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
    schema = {
      fields {
        "_id" : string,
        "name" : string,
        "description" : string,
        "weight" : string
      }
    }
  }
}

# 在本地客户端打印读取的 MongoDB 数据
sink {
  Console {
    parallelism = 1
  }
}

将 CDC 数据写入 MysqlDB

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://mysql_cdc_e2e:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "st_user"
    password = "seatunnel"

    generate_sink_sql = true
    # You need to configure both database and table
    database = mongodb_cdc
    table = products
    primary_keys = ["_id"]
  }
}

多表同步

以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory","crm"]
    collection = ["inventory.products","crm.test"]
    username = stuser
    password = stpw
  }
}

# Console printing of the read Mongodb data
sink {
  Console {
    parallelism = 1
  }
}

提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。

使用正则表达式匹配多表

以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:

匹配示例表达式描述
前缀匹配^(test).*匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。
后缀匹配.*[p$]匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }

Console printing of the read Mongodb data

sink { Console { parallelism = 1 } }


### 实时流数据格式

{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }

```

到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。

Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/453201.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

支付模块-01微信支付和支付宝支付接口调研

支付接口调研 一般情况下,一个网站要支持在线支付功能通常接入第三方支付平台,比如:微信支付、支付宝、其它的聚合支付平台 微信支付方式 支付方式描述应用场景付款码支付指用户展示微信钱包内的付款码给商户系统扫描后直接完成支付适用于线…

【嵌入式——QT】QPainter基本绘图

【嵌入式——QT】QPainter基本绘图 QPainter与QPaintDevicepaintEvent事件和绘图区QPainter主要属性QPen主要功能QBrush主要功能QPainter绘制基本图形方法图示代码示例 QPainter与QPaintDevice QPainter是用来进行绘图操作的类,QPaintDevice是一个可以使用QPainter…

力扣hot100题解(python版69-73题)

69、有效的括号 给定一个只包括 (,),{,},[,] 的字符串 s ,判断字符串是否有效。 有效字符串需满足: 左括号必须用相同类型的右括号闭合。左括号必须以正确的顺序闭合。每个右括号都有一个对应…

YOLOv9改进策略:注意力机制 | EMA:基于跨空间学习的高效多尺度注意力,效果优于ECA、CBAM、CA

💡💡💡本文改进内容:加入EMA注意力,一种基于跨空间学习的高效多尺度注意力,效果优于ECA、CBAM、CA等经典注意力。 yolov9-c-EMA summary: 970 layers, 51011154 parameters, 51011122 gradients, 238.9 GF…

链动2+1模式与用户留存复购策略:结合消费增值模式的创新应用

大家好,我是吴军,来自一家软件开发公司的产品经理岗位。 今天,我想和大家深入探讨链动21模式,特别是它如何有效应对用户留存和复购的挑战。 尽管有些人认为链动模式已经过时,但我认为它的潜力远未被充分挖掘。链动不仅…

SpringBoot3整合mybatis

SpringBoot3整合mybatis 一、添加mybatis的依赖二、通过XML配置三、通过yum或properties文件配置四、常用注解1.Mapper2.MapperScan 一、添加mybatis的依赖 <!--mybatis--> <dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>…

源聚达科技:抖音今年开店有没有什么新政策

随着电商行业的蓬勃发展&#xff0c;抖音平台作为新兴的社交电商平台&#xff0c;近年来推出了多项新政策以吸引商家入驻&#xff0c;提升用户体验。今年&#xff0c;抖音在开店政策上又有了新的调整和优化&#xff0c;这些变化对于商家来说无疑是重要的风向标。 最新的政策中&…

北京银行助力首批消费类公募REITs成功上市 担任嘉实物美消费REIT托监管行

3月12日&#xff0c;由北京银行担任托监管行并参与战配投资的嘉实物美消费REIT在上交所成功上市。这也让北京银行成为全国首家担任公募REITs托监管银行的城商行&#xff0c;亦是首家参与首批消费基础设施公募REITs战略投资的城商行&#xff0c;成功跻身商业银行综合服务公募REI…

05-ESP32-S3-IDF USART

ESP32-S3 IDF USART详解 USART简介 USART是一种串行通信协议&#xff0c;广泛应用于微控制器和计算机之间的通信。USART支持异步和同步模式&#xff0c;因此它可以在没有时钟信号的情况下&#xff08;异步模式&#xff09;或有时钟信号的情况下&#xff08;同步模式&#xff…

Java项目:48 ssm008医院门诊挂号系统+jsp(含文档)

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 本选题则旨在通过标签分类管理等方式实现 管理员&#xff1b;个人中心、药房管理、护士管理、医生管理、病人信息管理、科室信息管理、挂号管理、诊断…

如何解决word字体大小显示不一,部分文字无法显示/显式为空白?

问题重现 今天重启后打开word&#xff0c;显示如下&#xff1a; 从第1张图看&#xff0c;字体显示大小不同&#xff0c;第2张图&#xff0c;敲“满分”&#xff0c;无法显示“满”字&#xff0c;而且“分”的大小比一般字体要大。 我的解决方案 – 修复office 采用GPT的建议…

移除元素

文章目录 移除元素删除有序数组中的重复项移动零比较含退格的字符串有序数组的平方 移除元素 双指针 删除指定项且不改变顺序 def removeElement(nums: list[int], val: int) -> int:fast slow 0while fast < len(nums):if nums[fast] ! val:nums[slow] nums[fast]sl…

GEE:将数据设置为任何人可读

一些 Google Earth Engine(GEE) 平台的初学者在分享代码的时候&#xff0c;往往不会对代码中的数据设置成任何人可读。这会导致别人打开代码的时候无法正常运行代码&#xff0c;也就无法帮助你修改和调试代码。针对这个问题&#xff0c;本文记录了对 Assets 和 Imports 中的数据…

24年英语四六级报名,注意这5点否则报名失败

多地3月中旬后开始四六级报名&#xff0c;报名前注意这5点&#xff0c;否则报名失败&#xff01; 1、四六级名额有限?报名需要抢&#xff0c;没有抢到的考生可以提交“候补报名”&#xff0c;还有报名机会 2、有的学校则规定六级考到500分则不能再刷分。 3、很多大学的报名…

Tcl语言:基础入门(三)

相关阅读 Tcl语言https://blog.csdn.net/weixin_45791458/category_12488978.html?spm1001.2014.3001.5482 Tcl中的大括号 大括号{}可以使得被其包围的所有内容被解释为字面量&#xff0c;所以不会进行命令替换&#xff0c;转义符替换&#xff08;大部分情况的转义&#xff0…

视频监控管理系统EasyCVR平台设备增删改操作不生效是什么原因?

国标GB28181协议EasyCVR安防平台可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、云存储等丰富的视频能力&#xff0c;平台支持7*24小时实时高清视频监控&#xff0c;能同时播放多路监控视频流&#xf…

气膜建筑是由什么材料制成的?PVDF膜材的革新应用值得期待吗?

随着科技的不断进步和发展&#xff0c;建筑行业也在不断涌现新型的建筑材料。气膜建筑作为其中一种创新的建筑膜材&#xff0c;在体育馆、运动场馆、展览厅等场所得到了广泛的应用。那么&#xff0c;究竟是什么材料构成了气膜建筑呢&#xff1f;轻空间小编将为您详细介绍。 气膜…

ELF技术贴|如何在开发板上实现对Java的支持

Java作为一种功能强大且广泛应用的编程语言&#xff0c;具有广泛的适应性和实用性。在ELF 1开发板上集成Java支持&#xff0c;无疑将赋予嵌入式开发者更广阔的选择空间&#xff0c;今天就为各位小伙伴详细解析如何在ELF 1开发板上成功部署和运行Java环境。 1.拷贝两个压缩包到E…

Caffeine本地缓存快速上手教程,通俗易懂

1. 概述 使用缓存的优点是可以减少直接访问数据库的压力。Caffeine是目前单机版缓存性能最高的&#xff0c;提供了最优的缓存命中率。用法和java中的map集合比较类似&#xff0c;底层使用一个ConcurrencyHashMap来保存所有数据&#xff0c;可以理解为一个增强版的map集合&…

基于SpringBoot的“留守儿童爱心网站”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“留守儿童爱心网站”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统首页界面图 宣传新闻界面图 志愿活动界面…