python操作mongodb底层封装并验证pymongo是否应该关闭连接

一、pymongo简介

github地址:https://github.com/mongodb/mongo-python-driver

mymongo安装命令:pip install pymongo==4.7.2

mymongo接口文档:PyMongo 4.7.2 Documentation

PyMongo发行版包含Python与MongoDB数据库交互的工具。bson包是用于Python的bson格式的实现。pymongo包是MongoDB的原生Python驱动程序。gridfs包是pymongo之上的一个gridfs实现。pymongo是python与mongodb交互的首选方式。

二、封装接口

import uuid
from datetime import datetime, timezone
from pydantic import BaseModel,ConfigDict
from pycommon.utils.mongo.mongo_collection_pool import mongo_collection_pool
from pymongo.cursor import Cursor
from pymongo.collection import Collection

class BaseDbService:
    def __init__(self, mongo_settings: MongoSettings, encryptor_key: str) -> None:
        self.mongo_settings = mongo_settings
        self.encryptor_key = encryptor_key

    def create(self, data: T, user_id: str, tenant_id: str = None, collection_name: str = None) -> str:
        collection: Collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        if not get_attr_value(data, 'id'):
            set_attr_value(data, 'id', f"{uuid.uuid1()}")
        set_attr_value(data, 'created_by', user_id)
        set_attr_value(data, 'created_time', datetime.now(timezone.utc))
        request = data.model_dump(by_alias = True)

        id = collection.insert_one(request).inserted_id
        logger.log_info(f'{type(data).__name__} id {id} is created')
        return id
    
    def create_list(self, datas: list[T], user_id: str, tenant_id: str = None, collection_name: str = None) -> list[str]:
        collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        request_list = []
        for data in datas:
            if not get_attr_value(data, 'id'):
                set_attr_value(data, 'id', f"{uuid.uuid1()}")
            set_attr_value(data, 'created_by', user_id)
            set_attr_value(data, 'created_time', datetime.now(timezone.utc))
            request = data.model_dump(by_alias = True)
            
            request_list.append(request)

        ids = collection.insert_many(request_list).inserted_ids
        logger.log_info(f'{type(data).__name__} ids {ids} are created')
        return ids

    def update(self, data: T, user_id: str, tenant_id: str = None, collection_name: str = None):
        if not get_attr_value(data, 'id'):
            raise ValueError(f'{type(data).__name__} id is null or empty when update, user_id={user_id}, tenant_id={tenant_id}, collection_name={collection_name}')
        
        set_attr_value(data, 'updated_by', user_id)
        set_attr_value(data, 'updated_time', datetime.now(timezone.utc))
        collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        # response is dict type
        original_data = collection.find_one(data.id)
        if original_data:
            if "createdBy" in original_data:
                set_attr_value(data, "created_by", original_data["createdBy"])
            if "createdTime" in original_data:
                set_attr_value(data, "created_time", original_data["createdTime"])
            request = data.model_dump(by_alias = True)
            collection.find_one_and_replace({"_id": data.id}, request)
            logger.log_info(f'{type(data).__name__} id {id} is updated')
        else:
            logger.log_info(f'{type(data).__name__} id {id} does not exist when update')

    def get(self, id: str, responseType: TResponse, tenant_id: str = None, collection_name: str = None) -> TResponse:
        if not id:
            logger.log_info(f'{type(responseType).__name__} id is null or empty when get')
            return None

        collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        result = collection.find_one(id)
        return responseType.model_validate(result)
    
    def get_list(self, ids: list[str], entityType: T, tenant_id: str = None, collection_name: str = None) -> list[T]:
        if not ids:
            logger.log_info(f'{type(entityType).__name__} ids is null or empty when get_list')
            return None

        collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        response = list(collection.find({"_id": { '$in': ids }}))
        if not response:
            return []
        result = []
        for item in response:
            result_item = entityType.model_validate(item)
            result.append(result_item)
        return result

整体思路是创建BaseDbService实例时指定目标数据库,进行具体的CRUD操作时再指定表名以及查询结果要转换成的类型(pymongo查询出结果后默认是dict类型),这样底层的数据库服务就能操作任意库的所有表了,而且最终获取的数据是具体类型的对象。

实际上用户大致会按照下述方式使用BaseDbService:

if __name__ == '__main__':
    mongo_settings = MongoSettings(connection_string="mongodb://localhost:27017/", database_name="test-activity")
    db_service = BaseDbService(mongo_settings, "bWRHdzkzVDFJbWNB")

    collection_name = "pycommon"
    tenant_id = "test-tenant-id"
    result = db_service.get("74bae4fe-13ec-11ef-be60-7404f152b5a1", ActivityQueryItem, tenant_id, collection_name)

三、数据库连接池机制 

第二步中对数据库表操作之前都要先执行mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)来获取collection的连接,mongo_collection_pool是一个单例对象,内部有一个缓存机制来存储所有已经操作过的client、database和collection,连接从不关闭,这样可以保证连接不会重复的创建销毁从而提升性能。

看到这里有些细心的朋友可能会提问连接从不关闭,不会造成当前用户过多导致新用户无法连接的情况吗?本节先展示数据库连接池,大家的疑问咱们在下边有验证

from pymongo import MongoClient
from pymongo.database import Database
from pymongo.collection import Collection
from pycommon.models.mongo.mongo_settings import MongoSettings

class MongoCollectionPool:
    def __init__(self):
        self.mongo_client_cache = {}
        self.mongo_database_cache = {}
        self.mongo_collection_cache = {}
    
    def get_collection(self, mongo_settings: MongoSettings, collection_name: str, tenant_id: str) -> Collection:
        client_cache_key = mongo_settings.connection_string
        database_name = mongo_settings.database_name
        database_cache_key = client_cache_key + "_" + mongo_settings.database_name
        collection_name = collection_name + "_" + tenant_id if tenant_id else collection_name
        collection_cache_key = client_cache_key + "_" + mongo_settings.database_name + "_" + collection_name

        mongo_client: MongoClient = None
        if client_cache_key in self.mongo_client_cache:
            mongo_client = self.mongo_client_cache[client_cache_key]
        else:
            mongo_client = MongoClient(mongo_settings.connection_string)
            self.mongo_client_cache[client_cache_key] = mongo_client

        database: Database = None
        if database_cache_key in self.mongo_database_cache:
            database = self.mongo_database_cache[database_cache_key]
        else:
            database = mongo_client[database_name]
            self.mongo_database_cache[database_cache_key] = database

        if collection_cache_key in self.mongo_collection_cache:
            return self.mongo_collection_cache[collection_cache_key]
        else:
            collection = database[collection_name]
            self.mongo_collection_cache[collection_cache_key] = collection
            return collection

# 单例对象
mongo_collection_pool = MongoCollectionPool()
from typing import Optional
from pycommon.models.base_core_model import BaseCoreModel

class MongoSettings(BaseCoreModel):
    connection_string: Optional[str] = None
    database_name: Optional[str] = None

四、mongodb数据库连接是否需要手动关闭

在判断是否需要手动关闭连接时,先要了解连接是在什么时机开启的(这样我们就能知道做哪些操作会创建新的连接),以及对象释放时是否会自动关闭(这样我们就可以根据不同的项目类型来设计不同的架构)。

为了查看mongodb当前的数据库状态,我们需要用到mongosh工具(我这里用的是mongosh-2.2.6-win32-x64,大家需要的话我可以私发到个人邮箱)。

mongosh安装方式:解压后把bin目录下的两个文件拷贝到C:\Windows\System32。

查看mongodb数据库状态:打开cmd -> 输入mongosh -> 输入db.serverStatus().connections

connections中文说明:serverStatus - MongoDB 手册 v 6 。 0

connections英文说明:serverStatus - MongoDB Manual v7.0

测试项目情况:定时任务项目(执行完毕立即退出)

下面是我测试时了解到的情况(详情请看截图):

  • mongodb默认的可用线程高达100w
  • 创建MongoClient时,active+1、current+2、threaded+2、available-2
  • 创建DataBase时,exhaustHello+1
  • 创建Collection时,数据库状态无变化
  • 执行collectionA.find_one()方法时,current+1、threaded+1、available-1
  • 继续执行collectionA.find()时,数据库状态无变化
  • 继续执行collectionB.find()时,数据库状态无变化
  • 项目结束后数据库状态恢复原状

得出结论:

  • mongodb的可用性很强大,对于web服务类型这种常活程序来说只处理一个数据库且并发不高时,手动关闭更好不关也可以接受
  • 对于定时任务这种用完即销毁的程序来说不需要考虑关闭连接的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/667925.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【设计模式深度剖析】【1】【行为型】【模板方法模式】| 以烹饪过程为例加深理解

👈️上一篇:结构型设计模式对比 文章目录 模板方法模式定义英文原话直译如何理解呢? 2个角色类图代码示例 应用优点缺点使用场景 示例解析:以烹饪过程为例类图代码示例 模板方法模式 模板方法模式(Template Method Pattern&…

生信分析进阶4 - 比对结果的FLAG和CIGAR信息含义与BAM文件指定区域提取

BAM文件时存储比对数据的常用格式,可用于短reads和长reads数据。BAM是二进制压缩格式,SAM文件为其纯文本格式,CRAM为BAM的高压缩格式,IO效率相比于BAM略差,但是占用存储空间更小。 1. BAM文件的比对信息 BAM的核心信…

软件测试期末复习

第四章 边界黑盒测试续 4.3边界值设计方法 1.边界值设计方法:故障往往出现在定义域或边界值上。通常边界值分析法是作为对等价类划分法的补充。其测试用例来自等价类的边界。是对输入或输出的边界值进行测试的一种黑盒测试方法。 2.边界值分析法和等价类划分法的…

在Visual Studio2022中同一个项目里写作业,有多个cpp文件会报错

为了省事,在同一个项目里写很多个题目,结果只有一个cpp文件时没出错,写了2个cpp文件再想运行时就出错了; 将不相关的cpp文件移出去 在源文件中对其点击右键,找到“从项目中排除”; 结果如图,剩…

网络原理-三

一、连接管理 建立连接,断开连接 建立连接,TCP有连接的. 客户端执行 socket new Socket(SeverIP,severPort); -> 这个操作就是在建立连接. 上述只是调用socket api,真正建立连接的过程,实在操作系统内核完成的. 内核是怎样完成上述的 " 建立连接 "过程的…

云硬盘的基准性能测试场景

参考来源: 深入浅出云计算-05 | 云硬盘:云上IO到底给不给力 云硬盘的性能等级 当下的云硬盘经过了多次的软硬件迭代,尤其是SSD的迅速发展,吞吐量和随机读写能力等各项性能指标都已经不再是问题了。在现代云计算中,已…

数仓建模—ChatETL

数仓建模—ChatETL 前面我们介绍过ChatBI ,就是让用户通过自然语言对话的方式可以获取到自己想要的数据,然后通过合适的报表展示出来,其实我们可以将其理解为应用层面的技术创新,但是这个实现的前提就是我们底层已经有加工好的大量的数据模型数据表,并且有完善的元数据建…

“论SOA在企业集成架构设计中的应用”必过模板,突击2024软考高项论文

考题部分 企业应用集成(Enterprise Application Integration, EAI)是每个企业都必须要面对的实际问题。面向服务的企业应用集成是一种基于面向服务体系结构(Service-OrientedArchitecture,SOA)的新型企业应用集成技术,强调将企业和组织内部的资源和业务功…

【自动化运维】不要相信人,把所有的东西都交给机器去处理

不积跬步,无以至千里;不积小流,无以成江海。 大家好,我是闲鹤,十多年开发、架构经验,先后在华为、迅雷服役过,也在高校从事教学3年;目前已创业了7年多,主要从事物联网/车…

【网络安全的神秘世界】MySQL

🌝博客主页:泥菩萨 💖专栏:Linux探索之旅 | 网络安全的神秘世界 | 专接本 MySQL MySQL 教程 | 菜鸟教程 (runoob.com) 什么是数据库 数据库(Database)是按照数据结构来组织、存储和管理数据的仓库 在do…

COLING 2024: 复旦发布AoR,层级聚合推理突破大模型复杂推理上限

“三个臭皮匠,顶个诸葛亮?” “一个模型不行,那就再堆一个?” 过去当我们在处理复杂任务的时候,往往会考虑集成策略(Ensembling Strategy),通过多个模型投票的方式,选出…

[手游] Florence逝去的爱弗洛伦斯

图片处理工具箱Hummingbird : Hummingbird使用智能压缩技术来减少文件的大小,支持:jpg、png、webp、svg、gif、gif、css、js、html、mp4、mov,可以设置压缩的同时等比例缩放图片或视频的尺寸。可以拖放文件夹压缩,一次最多可处理1…

茉莉香飘,奶茶丝滑——周末悠闲时光的绝佳伴侣

周末的时光总是格外珍贵,忙碌了一周的我们,终于迎来了难得的闲暇。这时,打开喜欢的综艺,窝在舒适的沙发里,再冲泡一杯香飘飘茉莉味奶茶,一边沉浸在剧情的海洋中,一边品味着香浓丝滑的奶茶&#…

短视频矩阵营销系统V2.3.0

抖音矩阵云混剪系统 源码短视频矩阵营销系统V2.3.0(免授权版)(感觉和上一个版本没什么区别)多平台多账号一站式管理,一键发布作品。智能标题,关键词优化,排名查询,混剪生成原创视频&…

《异常检测——从经典算法到深度学习》29 EasyTSAD: 用于时间序列异常检测模型的工业级基准

《异常检测——从经典算法到深度学习》 0 概论1 基于隔离森林的异常检测算法 2 基于LOF的异常检测算法3 基于One-Class SVM的异常检测算法4 基于高斯概率密度异常检测算法5 Opprentice——异常检测经典算法最终篇6 基于重构概率的 VAE 异常检测7 基于条件VAE异常检测8 Donut: …

p2p文件传输小工具

使用webRTC的相关技术栈可以很轻松的开发一个p2p文件传输工具,这里主要讲下使用datachannel开发的一个文件传输工具client程序的使用 客户端A:需要可以访问公网,运行client的主机 客户端B:可以访问公网,可以和客户端…

go微服务项目“商城项目实战开发”整理第一部环境的准备

文章目录 商城项目实战开发环境的准备01、微服务的简单概述02、为什么要用微服务?03、本次课程微服务的技术栈04、微服务的准备工作05、微服务项目的搭建和目录的划分06、微服务项目实战 - 用户服务-srv的搭建07、微服务项目实战 - 用户服务-api的搭建08、微服务项目实战 - 商…

深度神经网络——什么是梯度提升?

在数据科学竞赛中,梯度提升模型(Gradient Boosting)是一种非常强大的工具,它能够将多个弱学习模型组合起来,形成一个强学习模型。这个过程是通过逐步添加弱学习者来实现的,每个新加入的弱学习者都专注于当前…

sudo命令的隐患-要注意安全使用!!严格管理!!严格控制

前言 众所周知,sudo命令非常方便,而且有一定的优点。比如不需要知道root密码就可以执行一些root的命令。相比于su 必须知道root密码来说,减少了root密码泄露的风险。 但是sudo也是一把非常锋利的双刃剑,需要加以限制,…

Python 关于字符串格式化

在Python中,字符串格式化有以下几种方法: 1.可以使用字符串的str.center(width), str.ljust(width), 和 str.rjust(width)方法来实现字符串的居中、左对齐和右对齐操作。 居中对齐: text "Python" centered_text text.center(10…