从变更到通知:使用Python和MongoDB Change Streams实现即时事件监听

MongoDB提供了一种强大的功能,称为Change Streams,它允许应用程序监听数据库中的变更事件,并在数据发生变化时立即做出响应。这在mysql数据库是不具备没有这个功能的。又如:我们在支付环节想一直监听支付回调的状态,就必须定时循环。但是用到Change Streams监听功能,就不要低效率的定时循环做法。
在本文中,我们将探讨如何在Python中使用MongoDB的Change Streams来消费变更事件。
在这里插入图片描述

什么是MongoDB Change Streams?
MongoDB Change Streams是一种实时监听数据库集合变更的能力。它们可以用来监听以下类型的事件:

插入(insert)
更新(update)
删除(delete)
更多…
Change Streams可以在应用层提供数据变更的实时通知,这对于需要即时更新用户界面、触发业务流程或同步数据到其他服务的应用程序非常有用。

为什么使用Change Streams?
使用Change Streams而不是轮询机制有以下几个优势:

性能:轮询会不断查询数据库,这可能会导致不必要的负载。Change Streams仅在数据变更时提供通知,从而减少不必要的数据库交互。
实时性:Change Streams提供近乎实时的数据变更通知,这对于需要快速响应的应用程序至关重要。
可靠性:Change Streams保证不会错过任何变更事件,即使在应用程序重启之后也能从最后的位置继续监听。
如何在Python中使用Change Streams?
要在Python中使用Change Streams,您需要使用pymongo库,这是MongoDB的官方Python驱动程序。以下是如何设置和使用Change Streams的基本步骤:

要配置MongoDB的Change Streams以监控特定类型的变更,你可以使用MongoDB的聚合管道(Aggregation Pipeline)功能。通过提供一个或多个管道阶段的数组,你可以控制Change Streams的输出。

##初始化副本集
连接到任何一个MongoDB实例(通常是配置文件中指定的第一个实例),使用MongoDB Shell来初始化副本集:

mongosh   #mongo的命令行
use admin
rs.initiate({
  _id: "myReplicaSet",   #副本集名称
  members: [
    { _id: 0, host: "host1:27017" },  #host1要换成对应的公网ip才能访问,除非其它情况
    { _id: 1, host: "host2:27017" },
    { _id: 2, host: "host3:27017" }
  ]
})

在这里,host1:27017、host2:27017和host3:27017应该替换为您的MongoDB实例的实际IP地址和端口。_id是一个从0开始的唯一标识符,用于区分不同的成员。

在宝塔内如何处理
安装好mongdoDB
在这里插入图片描述
本地检查:
在这里插入图片描述
加入配置:
目录:
root@iZ2ze50t4ys98i5408heezZ:/www/server/mongodb/bin#

生成配置内

  • keyFile:
    (base) root@VM-4-7-ubuntu:/www/server/mongodb#
    openssl rand -base64 756 > /www/server/mongodb/keyfile
    chmod 400 /www/server/mongodb/keyfile
  • replication:
    replSetName: rs0 # rs0跟命令行创建的id名要一致

在下图加入配置
在这里插入图片描述

在mongdb命令行创建副本

在这里插入图片描述
MongoDB Shell支持命令历史记录功能,您可以使用上下箭头键来检索在Shell中发出的先前命令。

mondosh
rs0 [direct: primary] test> use admin
switched to db admin
rs0 [direct: primary] admin> db.auth("root","ubVvGqDmL7UAdwkG")
{ ok: 1 }
rs0 [direct: primary] admin>
##配置复制xx
var currentConfig = rs.conf();
var newConfig = { _id: "rs0", version: currentConfig.version + 1, members: [ { _id: 0, host: "8.152.219.111:27017" }] };
rs0 [direct: primary] admin> rs.reconfig(newConfig);
{
ok: 1,
'$clusterTime': {
clusterTime: Timestamp({ t: 1733560367, i: 1 }),
signature: {
hash: Binary.createFromBase64('DhedFtGMLmVNqwW7/0tGP91vKoA=', 0),
keyId: Long('7445547217475076102')
}
},
operationTime: Timestamp({ t: 1733560367, i: 1 })
}
rs0 [direct: primary] admin> rs.status() #的输出

在这里插入图片描述

然后重启,先kill -9 端口号,启动mongod命令

##使用客户端2种方式测试连接是否成功
在这里插入图片描述
在这里插入图片描述

以上如果测试通过,表示成功了。

使用代码测试:

monitor_mongo_changes.py

from pymongo import MongoClient
from pymongo.change_stream import ChangeStream

def monitor_mongo_changes(collection_name, database_name):
    """
    监控MongoDB文档变更的函数。

    :param collection_name: 要监控的MongoDB集合名称
    :param database_name: 要监控的MongoDB数据库名称
    """
    # 连接到MongoDB
    url = "mongodb://root:ub***G@8.7.0.1:27017/esg?authSource=admin&replicaSet=rs0"
    client = MongoClient(url,
                         serverSelectionTimeoutMS=10000)  # 设置超时时间为10秒


    # 选择数据库和集合
    db = client[database_name]
    collection = db[collection_name]

    # 创建一个管道,用于匹配特定条件的变更
    # pipeline = [{'$match': {'operationType': 'update'}}]
    pipeline = [
        {'$match': {'operationType': {'$in': ['insert', 'update','delete']}}}
    ]
    # 创建Change Stream并传入管道
    change_stream = collection.watch(pipeline)

    # 监听Change Stream
    for change in change_stream:
        print("Change detected:", change)
    print("end ....")

# 使用函数
monitor_mongo_changes('gress', 'esg')

运行python monitor_mongo_changes.py ,它不会自动退出,一直会监听mongdb数据库esg数据集的修改、插入、删除动作的。

在这里插入图片描述

用到的命令集

启动命令:mongod -f /www/server/mongodb/config.conf
查看启动状态:/etc/init.d/mongodb status
查看进程:ps -ef | grep mongo
mongosh    #进入mongdo的cli环境

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/932196.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

决策树:ID3、C4.5和CART特征选择方式

1 前言 该文章主要目的是记录ID3、C4.5和CART特征选择方式,这里只对决策树进行简单介绍。 决策树(Decision Tree)算法是一种有监督学习算法,它利用分类的思想,根据数据的特征构建数学模型,从而达到数据的筛…

2023 年“泰迪杯”数据分析技能赛B 题企业财务数据分析与造假识别

2023 年“泰迪杯”数据分析技能赛B 题企业财务数据分析与造假识别 一、背景 财务数据是指企业经营活动和财务结果的数据记录,反映了企业的财务状况 与经营成果。对行业、企业的财务数据进行分析,就是要评价其过去的经营业绩、 衡量现在的财务状况、预测…

UE5.5 Geometry库平面切割原理分析

平面切割--FMeshPlaneCut 平面定义: 面上一个点 法线 算法流程如下 求几何体所有顶点和面的有向距离(Signs) Sign计算: float Sign (VertexPos - PlaneOrigin).Dot(PlaneNormal); 遍历所有几何体所有交叉边, 进行SplitEdge 对于位于切割面两侧的交叉边(Sign…

【计算机学习笔记】GB2312、GBK、Unicode等字符编码的理解

之前编写win32程序时没怎么关注过宽字符到底是个啥东西,最近在编写网络框架又遇到字符相关的问题,所以写一篇文章记录一下(有些部分属于个人理解,如果有错误欢迎指出) 目录 几个常见的编码方式Unicode和UTF-8、UTF-16、…

CSS 快速上手

目录 一. CSS概念 二. CSS语法 1. 基本语法规范 2. CSS的三种引入方式 (1) 行内样式 (2) 内部样式表 (3) 外部样式表 3. CSS选择器 (1) 标签选择器 (2) 类选择器 (3) id选择器 (4) 通配符选择器 (5) 复合选择器 <1> 空格 <2> 没有空格 <3> &q…

【时间之外】IT人求职和创业应知【60】-卡脖子

目录 新闻一&#xff1a;达成合作&#xff0c;将在中国推出生成式人工智能服务 新闻二&#xff1a;机器人新赛道 新闻三&#xff1a;简化用户信息获取流程&#xff0c;提升小程序体验 去年人口出生下降&#xff0c;3年以后&#xff0c;幼儿园要关闭很多&#xff0c;6年以后小…

centos9升级OpenSSH

需求 Centos9系统升级OpenSSH和OpenSSL OpenSSH升级为openssh-9.8p1 OpenSSL默认为OpenSSL-3.2.2&#xff08;根据需求进行升级&#xff09; 将源码包编译为rpm包 查看OpenSSH和OpenSSL版本 ssh -V下载源码包并上传到服务器 openssh最新版本下载地址 wget https://cdn.openb…

node.js中实现GETPOST请求

创建基本的服务器 const express require(express); const indexRouter require(./router); // 引入路由 const app express(); const port 3000; // 挂载路由 app.use(/api, indexRouter); app.listen(port, () > {console.log(Server is running on http://localhost…

shell 条件测试

一、命令执行结果判定 && &#xff1a; 在命令执行后如果没有任何报错时会执行符号后面的动作 || &#xff1a; 在命令执行后有报错执行符号后的动作 [rootlong ~]# a10 [rootlong ~]# echo $a 10 [rootlong ~]# [ $a -gt "5" ] && echo yes || e…

JS中的原型链与继承

原型链的类比 JS中原型链&#xff0c;本质上就是对象之间的关系&#xff0c;通过protoype和[[Prototype]]属性建立起来的连接。这种链条是动态的&#xff0c;可以随时变更。 这个就跟C/C中通过指针建立的关系很相似&#xff0c;比如&#xff0c;通过指针建立一个链表&#xf…

【Linux网络编程】第七弹---构建类似XShell功能的TCP服务器:从TcpServer类到主程序的完整实现

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】【Linux网络编程】 目录 1、TcpServer.hpp 1.1、TcpServer类基本结构 1.2、 Execute() 2、Command.hpp 2.1、Command类基本结构 …

C语言控制语句与案例

控制语句与案例 1. 选择结构 1.1 if 语句 if 语句用于根据条件执行不同的代码块。最基本的语法形式如下&#xff1a; // 单分支 if (条件) {// 条件为真时执行的代码 }// 双分支 if (条件) {// 条件为真时执行的代码 } else {// 条件为假时执行的代码 }// 多分支 if (条件1…

【分子材料发现】——GAP:催化过程中吸附构型的多模态语言和图学习(数据集处理详解)(二)

Multimodal Language and Graph Learning of Adsorption Configuration in Catalysis https://arxiv.org/abs/2401.07408Paper Data: https://doi.org/10.6084/m9.figshare.27208356.v2 1 Dataset CatBERTa训练的文本字符串输入来源于Open Catalyst 2020 &#xff08;OC20…

SpringBoot自动配置底层核心源码

SpringBoot底层核心源码 一、工程创建二、进一步改造三、自动配置 探究SpringBoot的自动配置原理&#xff0c;我们可以自己写一个启动类的注解。 一、工程创建 首先创建一个工程&#xff0c;工程目录如下&#xff1a; 自定义一个启动函数&#xff1a; package org.springboo…

【Springboot3+vue3】从零到一搭建Springboot3+vue3前后端分离项目之后端环境搭建

【Springboot3vue3】从零到一搭建Springboot3vue3前后端分离项目&#xff0c;整合knef4j和mybaits实现基础用户信息管理 后端环境搭建1.1 环境准备1.2 数据库表准备1.3 SpringBoot3项目创建1.4 MySql环境整合&#xff0c;使用druid连接池1.5 整合mybatis-plus1.5.1 引入mybatie…

【书生大模型实战营】Linux 基础知识-L0G1000

前言&#xff1a;书生大模型实战营是上海人工智能实验室开展的大模型系列实践活动&#xff0c;提供免费算力平台&#xff0c;学员通过闯关式任务&#xff0c;可获得免费算力和存储&#xff0c;助力项目实践。本期是第4期&#xff0c;时间从十一月份开始&#xff0c;持续到十二月…

JS进阶DAY3|事件(二)事件流

目录 一、事件流说明 1.1 事件流概念 1.2 事件捕获阶段 1.3 事件冒泡阶段 二、事件传播的两个阶段说明 2.1 事件捕获 2.2 事件冒泡 3.3 示例代码 三、阻止冒泡 四、事件解绑 4.1 removeEventListener方法 4.2 使用 DOM0 级事件属性 4.3 使用一次性事件监听器 一、…

【AI工具】强大的AI编辑器Cursor详细使用教程

目录 一、下载安装与注册 二、内置模型与配置 三、常用快捷键 四、项目开发与问答 五、注意事项与技巧 参考资料 近日&#xff0c;由四名麻省理工学院&#xff08;MIT&#xff09;本科生共同创立的Anysphere公司宣布&#xff0c;其开发的AI代码编辑器Cursor在成立短短两年…

【AWR软件】AWR 软件添加电磁结构

文章目录 前言步骤 前言 微波虚拟 实验 步骤 project -> add em struture -> new em structure 输入名称&#xff0c;create. 添加端口&#xff1a;add edge port

uni-app登录界面样式

非常简洁的登录、注册界面模板&#xff0c;使用uni-app编写&#xff0c;直接复制粘贴即可&#xff0c;无任何引用&#xff0c;全部公开。 废话不多说&#xff0c;代码如下&#xff1a; login.vue文件 <template><view class"screen"><view class"…