MongoDB提供了一种强大的功能,称为Change Streams,它允许应用程序监听数据库中的变更事件,并在数据发生变化时立即做出响应。这在mysql数据库是不具备没有这个功能的。又如:我们在支付环节想一直监听支付回调的状态,就必须定时循环。但是用到Change Streams监听功能,就不要低效率的定时循环做法。
在本文中,我们将探讨如何在Python中使用MongoDB的Change Streams来消费变更事件。
什么是MongoDB Change Streams?
MongoDB Change Streams是一种实时监听数据库集合变更的能力。它们可以用来监听以下类型的事件:
插入(insert)
更新(update)
删除(delete)
更多…
Change Streams可以在应用层提供数据变更的实时通知,这对于需要即时更新用户界面、触发业务流程或同步数据到其他服务的应用程序非常有用。
为什么使用Change Streams?
使用Change Streams而不是轮询机制有以下几个优势:
性能:轮询会不断查询数据库,这可能会导致不必要的负载。Change Streams仅在数据变更时提供通知,从而减少不必要的数据库交互。
实时性:Change Streams提供近乎实时的数据变更通知,这对于需要快速响应的应用程序至关重要。
可靠性:Change Streams保证不会错过任何变更事件,即使在应用程序重启之后也能从最后的位置继续监听。
如何在Python中使用Change Streams?
要在Python中使用Change Streams,您需要使用pymongo库,这是MongoDB的官方Python驱动程序。以下是如何设置和使用Change Streams的基本步骤:
要配置MongoDB的Change Streams以监控特定类型的变更,你可以使用MongoDB的聚合管道(Aggregation Pipeline)功能。通过提供一个或多个管道阶段的数组,你可以控制Change Streams的输出。
##初始化副本集
连接到任何一个MongoDB实例(通常是配置文件中指定的第一个实例),使用MongoDB Shell来初始化副本集:
mongosh #mongo的命令行
use admin
rs.initiate({
_id: "myReplicaSet", #副本集名称
members: [
{ _id: 0, host: "host1:27017" }, #host1要换成对应的公网ip才能访问,除非其它情况
{ _id: 1, host: "host2:27017" },
{ _id: 2, host: "host3:27017" }
]
})
在这里,host1:27017、host2:27017和host3:27017应该替换为您的MongoDB实例的实际IP地址和端口。_id是一个从0开始的唯一标识符,用于区分不同的成员。
在宝塔内如何处理
安装好mongdoDB
本地检查:
加入配置:
目录:
root@iZ2ze50t4ys98i5408heezZ:/www/server/mongodb/bin#
生成配置内
- keyFile:
(base) root@VM-4-7-ubuntu:/www/server/mongodb#
openssl rand -base64 756 > /www/server/mongodb/keyfile
chmod 400 /www/server/mongodb/keyfile - replication:
replSetName: rs0 # rs0跟命令行创建的id名要一致
在下图加入配置
在mongdb命令行创建副本
MongoDB Shell支持命令历史记录功能,您可以使用上下箭头键来检索在Shell中发出的先前命令。
mondosh
rs0 [direct: primary] test> use admin
switched to db admin
rs0 [direct: primary] admin> db.auth("root","ubVvGqDmL7UAdwkG")
{ ok: 1 }
rs0 [direct: primary] admin>
##配置复制xx
var currentConfig = rs.conf();
var newConfig = { _id: "rs0", version: currentConfig.version + 1, members: [ { _id: 0, host: "8.152.219.111:27017" }] };
rs0 [direct: primary] admin> rs.reconfig(newConfig);
{
ok: 1,
'$clusterTime': {
clusterTime: Timestamp({ t: 1733560367, i: 1 }),
signature: {
hash: Binary.createFromBase64('DhedFtGMLmVNqwW7/0tGP91vKoA=', 0),
keyId: Long('7445547217475076102')
}
},
operationTime: Timestamp({ t: 1733560367, i: 1 })
}
rs0 [direct: primary] admin> rs.status() #的输出
然后重启,先kill -9 端口号,启动mongod命令
##使用客户端2种方式测试连接是否成功
以上如果测试通过,表示成功了。
使用代码测试:
monitor_mongo_changes.py
from pymongo import MongoClient
from pymongo.change_stream import ChangeStream
def monitor_mongo_changes(collection_name, database_name):
"""
监控MongoDB文档变更的函数。
:param collection_name: 要监控的MongoDB集合名称
:param database_name: 要监控的MongoDB数据库名称
"""
# 连接到MongoDB
url = "mongodb://root:ub***G@8.7.0.1:27017/esg?authSource=admin&replicaSet=rs0"
client = MongoClient(url,
serverSelectionTimeoutMS=10000) # 设置超时时间为10秒
# 选择数据库和集合
db = client[database_name]
collection = db[collection_name]
# 创建一个管道,用于匹配特定条件的变更
# pipeline = [{'$match': {'operationType': 'update'}}]
pipeline = [
{'$match': {'operationType': {'$in': ['insert', 'update','delete']}}}
]
# 创建Change Stream并传入管道
change_stream = collection.watch(pipeline)
# 监听Change Stream
for change in change_stream:
print("Change detected:", change)
print("end ....")
# 使用函数
monitor_mongo_changes('gress', 'esg')
运行python monitor_mongo_changes.py ,它不会自动退出,一直会监听mongdb数据库esg数据集的修改、插入、删除动作的。
用到的命令集
启动命令:mongod -f /www/server/mongodb/config.conf
查看启动状态:/etc/init.d/mongodb status
查看进程:ps -ef | grep mongo
mongosh #进入mongdo的cli环境