使用 Debezium 和 RisingWave 对 MongoDB 进行持续分析

MongoDB 和流式 Join 的挑战

谷歌趋势显示,有关 MongoDB 流式计算的搜索率不断上升

作为一种操作型数据库,MongoDB 在提供快速数据操作和查询性能方面表现十分出色。然而,在维护实时视图或执行流处理任务的内置支持方面,它确实存在一些局限性。例如,MongoDB 不支持连接两个集合并实时刷新结果(尤其是高频率按秒刷新)。

在 MongoDB 中,Join 使用的是 $lookup 聚合算子,而非传统的 SQL 式 Join:

db.users.aggregate([{
  $lookup: {
    from: "products",
    localField: "product_id",
    foreignField: "_id",
    as: "products"
  }
}])

当前的方法虽然有效,但在连接两个以上的集合时会比较繁琐和不便。

MongoDB 提倡去规范化,如果有最佳的 Schema,很多情况下就可以避免使用 Join。但是,实际情况可能会比较混乱,因此有时需要使用 Join。与其依赖复杂的 MongoDB 聚合,不如将此任务委托给 RisingWave 这样的专用流处理系统。

RisingWave 解决方案

一个实用的解决方案是通过 Kafka 将 MongoDB 变更流导入到 RisingWave,从而实现灵活的实时连接。

解决方案

RisingWave 可充当实时数据源的中心枢纽。来自 MongoDB 和 Kafka 的数据可以导入到 RisingWave 并进行连接。RisingWave 的状态后端 Hummock 利用云对象存储,提供了弹性和充足的容量。这能够支持在多个 Source 上执行大型 Join,处理 10 个以上的多路 Join。

在这个数据栈中,Debezium 起着至关重要的作用。它提取 MongoDB oplog 条目并将其导出到 Kafka Topic,然后由 RisingWave 消费。例如:

CREATE TABLE source_name (
   _id jsonb PRIMARY KEY,
   payload jsonb
)
WITH (
   connector='kafka',
   topic='debezium_mongo_json_customers',
   properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
   scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

创建了 MongoDB Source 之后,我们就可以创建物化视图,实时连接来自 MongoDB 和其他 Source 的数据。

有关此过程的详细步骤,请参阅 RisingWave 文档

处理 MongoDB 的 JSON 数据

RisingWave 可以利用从 PostgreSQL 继承的 JSONB 支持,分析 MongoDB 中的 JSON(BSON)数据。这样,即使在非关系型的 Schema 中也能直接进行摄取和查询。虽然 MongoDB 的原生 API 在数据操作方面表现十分出色,在 Node.js 网络应用程序中尤其如此,但进行分析性工作负载需要不同的专长。SQL 仍是最流行的数据分析语言。目前,RisingWave 提供了超过 30 个 JSON 函数,包括对 JSONPath 的支持。这使得用户在转换 MongoDB 数据时无需编写自定义 UDF。

以下是在 RisingWave 中处理 JSONB 数据的一些示例:

查找年龄在 25 至 30 岁之间的用户:

SELECT *
FROM users
WHERE (payload->>'age')::int BETWEEN 25 AND 30;

通过姓名和电子邮件查找用户:

SELECT *
FROM users
WHERE payload @> '{"name": "Bob"}'
AND payload->>'email' LIKE 'bob@example.com';

结论

RisingWave 能够很好地解析由 Debezium 提取的 MongoDB 变更流。它的云原生存储使其可连接多个 MongoDB 集合,并创建可被其他服务使用的统一流。同时,RisingWave 支持 JSONB,可轻松处理 MongoDB 文档,为实时数据处理挑战提供了强大的解决方案。


RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了近 150 名开源贡献者和近 3000 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

了解更多:

官网: risingwave.com

入门教程:快速上手 | RisingWave

GitHub:risingwave.com/github

微信公众号:RisingWave中文开源社区

中文社区用户交流群:risingwave_assistant

英文社区用户交流群:https://risingwave.com/slack

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/414321.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uni-app之android原生插件开发

官网 uni小程序SDK 一 插件简介 1.1 当HBuilderX中提供的能力无法满足App功能需求,需要通过使用Andorid/iOS原生开发实现时,可使用App离线SDK开发原生插件来扩展原生能力。 1.2 插件类型有两种,Module模式和Component模式 Module模式&…

51单片机 wifi连接

一、基本概念 ESP8266是一款集成了WiFi功能的高性能芯片,广泛应用于物联网设备、智能家居、传感器网络等领域。以下是ESP8266的详细讲解: 1. 功能特点:ESP8266集成了TCP/IP协议栈,支持STA(Station)和AP&am…

OpenAI划时代大模型——文本生成视频模型Sora作品欣赏(八)

Sora介绍 Sora是一个能以文本描述生成视频的人工智能模型,由美国人工智能研究机构OpenAI开发。 Sora这一名称源于日文“空”(そら sora),即天空之意,以示其无限的创造潜力。其背后的技术是在OpenAI的文本到图像生成模…

虚拟机安装+固定ip地址

一、下载CentOS 二、安装CentOS 1、打开你的VMware Workstation Pro,并点击“创建新的虚拟机” 2、点选典型(推荐)(T),并点击“下一步” 3、点选稍后安装操作系统(S),并点击“下一步” 4、点选Linux,并点击“下一步” 6、点击“…

tomcat下载搭建

环境:centos7 打开环境先测试是否有网 ping www.baidu.com 在使用ifconfig命令查询ip地址 准备工作做好打开tomcat官网Apache Tomcat - Apache Tomcat 8 Software Downloads 找到tomcat8安装 复制链接 打开centos安装wget 进入到 /usr/local目录中 cd /usr/loc…

SpringMVC 学习(八)之文件上传与下载

目录 1 文件上传 2 文件下载 1 文件上传 SpringMVC 对文件的上传做了很好的封装,提供了两种解析器。 CommonsMultipartResolver:兼容性较好,可以兼容 Servlet3.0 之前的版本,但是它依赖了 commons-fileupload …

Linux 基础之 vmstat 命令详解

文章目录 一、前言二、使用说明2.1 vmstat [delay/count/d/D/t/w]2.2.vm模式的字段 一、前言 vmstat(VirtualMeomoryStatistics,虚拟内存统计)是一个不错的 Linux/Unix 监控工具,在性能测试中除了top外也是比较常用的工具之一,它可以监控操作…

算法 -【螺旋矩阵】

螺旋矩阵 题目示例1示例2 分析代码 题目 一个 m 行 n 列的矩阵 matrix ,请按照顺时针螺旋顺序 ,返回矩阵中的所有元素。 示例1 输入:matrix [[1,2,3],[4,5,6],[7,8,9]] 输出:[1,2,3,6,9,8,7,4,5] 示例2 输入:matrix…

JWT基于Cookie的会话保持,并解决CSRF问题的方案

使用JWT进行浏览器接口请求,在使用Cookie进行会话保持传递Token时,可能会存在 CSRF 漏洞问题,同时也要避免在产生XSS漏洞时泄漏Token问题,如下图在尽可能避免CSRF和保护Token方面设计了方案。 要点解释如下: 将JWT存入…

DAY12_VUE基本用法详细版

目录 0 HBuilderX酷黑主题修改注释颜色1 VUE1.1 VUE介绍1.2 Vue优点1.3 VUE入门案例1.3.1 导入JS文件1.3.2 VUE入门案例 1.4 VUE基本用法1.4.1 v-cloak属性1.4.2 v-text指令1.4.3 v-html指令1.4.4 v-pre指令1.4.5 v-once指令1.4.6 v-model指令1.4.7 MVVM思想 1.5 事件绑定1.5.1…

Centos6安装PyTorch要求的更高版本gcc

文章目录 CentOS自带版本安装gcc 4的版本1. 获取devtoolset-8的yum源2. 安装gcc3. 版本检查和切换版本 常见问题1. 找不到包audit*.rpm包2. 找不到libcgroup-0.40.rc1-27.el6_10.x86_64.rpm 的包4. cc: fatal error: Killed signal terminated program cc1plus5. pybind11/pybi…

如何使用Fastapi上传文件?先从请求体数据讲起

文章目录 1、请求体数据2、form表单数据3、小文件上传1.单文件上传2.多文件上传 4、大文件上传1.单文件上传2.多文件上传 1、请求体数据 前面我们讲到,get请求中,我们将请求数据放在url中,其实是非常不安全的,我们更愿意将请求数…

【C语言】linux内核ipoib模块 - ipoib_ib_handle_tx_wc

一、中文注释 这个函数是用来处理 Infiniband 设备在传输完成时的回调。该回调负责释放发送队列中的缓冲区并更新网络设备统计信息。 static void ipoib_ib_handle_tx_wc(struct net_device *dev, struct ib_wc *wc) {// 通过net_device结构体获取私有数据结构struct ipoib_d…

网络安全之内容安全

内容安全 攻击可能只是一个点,防御需要全方面进行 IAE引擎 DFI和DPI技术--- 深度检测技术 DPI --- 深度包检测技术--- 主要针对完整的数据包(数据包分片,分段需要重组),之后对 数据包的内容进行识别。(应用…

S32 Design Studio PE工具配置TMR

配置步骤 配置内容 生成的配置结构体如下,在Generated_Code路径下的lpTmr.c文件和lpTmr.h文件。 /*! lpTmr1 configuration structure */ const lptmr_config_t lpTmr1_config0 {.workMode LPTMR_WORKMODE_PULSECOUNTER,.dmaRequest false,.interruptEnable tr…

数据抽取平台pydatax介绍--实现和项目使用

数据抽取平台pydatax实现过程中,有2个关键点: 1、是否能在python3中调用执行datax任务,自己测试了一下可以,代码如下: 这个str1就是配置的shell文件 try:result os.popen(str1).read() except Exception as …

git忽略某些文件(夹)更改方法

概述 在项目中,常有需要忽略的文件、文件夹提交到代码仓库中,在此做个笔录。 一、在项目根目录内新建文本文件,并重命名为.gitignore,该文件语法如下 # 以#开始的行,被视为注释. # 忽略掉所有文件名是 a.txt的文件. a.txt # 忽略所有生成的 java文件, *.java # a.j…

数据结构:栈和队列与栈实现队列(C语言版)

目录 前言 1.栈 1.1 栈的概念及结构 1.2 栈的底层数据结构选择 1.2 数据结构设计代码(栈的实现) 1.3 接口函数实现代码 (1)初始化栈 (2)销毁栈 (3)压栈 (4&…

【MQ05】异常消息处理

异常消息处理 上节课我们已经学习到了消息的持久化和确认相关的内容。但是,光有这些还不行,如果我们的消费者出现问题了,无法确认,或者直接报错产生异常了,这些消息要怎么处理呢?直接丢弃?这就是…

ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1+, currently the ‘ssl‘报错解决

安装labelme出错了 根据爆栈的提示信息,我在cmd运行以下命令之后一切正常了,解决了问题! pip install urllib31.26.6参考网址:ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1, currently the ‘ssl’ module is compile…