一、pymongo简介
github地址:https://github.com/mongodb/mongo-python-driver
mymongo安装命令:pip install pymongo==4.7.2
mymongo接口文档:PyMongo 4.7.2 Documentation
PyMongo发行版包含Python与MongoDB数据库交互的工具。bson包是用于Python的bson格式的实现。pymongo包是MongoDB的原生Python驱动程序。gridfs包是pymongo之上的一个gridfs实现。pymongo是python与mongodb交互的首选方式。
二、封装接口
import uuid
from datetime import datetime, timezone
from pydantic import BaseModel,ConfigDict
from pycommon.utils.mongo.mongo_collection_pool import mongo_collection_pool
from pymongo.cursor import Cursor
from pymongo.collection import Collection
class BaseDbService:
def __init__(self, mongo_settings: MongoSettings, encryptor_key: str) -> None:
self.mongo_settings = mongo_settings
self.encryptor_key = encryptor_key
def create(self, data: T, user_id: str, tenant_id: str = None, collection_name: str = None) -> str:
collection: Collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
if not get_attr_value(data, 'id'):
set_attr_value(data, 'id', f"{uuid.uuid1()}")
set_attr_value(data, 'created_by', user_id)
set_attr_value(data, 'created_time', datetime.now(timezone.utc))
request = data.model_dump(by_alias = True)
id = collection.insert_one(request).inserted_id
logger.log_info(f'{type(data).__name__} id {id} is created')
return id
def create_list(self, datas: list[T], user_id: str, tenant_id: str = None, collection_name: str = None) -> list[str]:
collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
request_list = []
for data in datas:
if not get_attr_value(data, 'id'):
set_attr_value(data, 'id', f"{uuid.uuid1()}")
set_attr_value(data, 'created_by', user_id)
set_attr_value(data, 'created_time', datetime.now(timezone.utc))
request = data.model_dump(by_alias = True)
request_list.append(request)
ids = collection.insert_many(request_list).inserted_ids
logger.log_info(f'{type(data).__name__} ids {ids} are created')
return ids
def update(self, data: T, user_id: str, tenant_id: str = None, collection_name: str = None):
if not get_attr_value(data, 'id'):
raise ValueError(f'{type(data).__name__} id is null or empty when update, user_id={user_id}, tenant_id={tenant_id}, collection_name={collection_name}')
set_attr_value(data, 'updated_by', user_id)
set_attr_value(data, 'updated_time', datetime.now(timezone.utc))
collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
# response is dict type
original_data = collection.find_one(data.id)
if original_data:
if "createdBy" in original_data:
set_attr_value(data, "created_by", original_data["createdBy"])
if "createdTime" in original_data:
set_attr_value(data, "created_time", original_data["createdTime"])
request = data.model_dump(by_alias = True)
collection.find_one_and_replace({"_id": data.id}, request)
logger.log_info(f'{type(data).__name__} id {id} is updated')
else:
logger.log_info(f'{type(data).__name__} id {id} does not exist when update')
def get(self, id: str, responseType: TResponse, tenant_id: str = None, collection_name: str = None) -> TResponse:
if not id:
logger.log_info(f'{type(responseType).__name__} id is null or empty when get')
return None
collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
result = collection.find_one(id)
return responseType.model_validate(result)
def get_list(self, ids: list[str], entityType: T, tenant_id: str = None, collection_name: str = None) -> list[T]:
if not ids:
logger.log_info(f'{type(entityType).__name__} ids is null or empty when get_list')
return None
collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
response = list(collection.find({"_id": { '$in': ids }}))
if not response:
return []
result = []
for item in response:
result_item = entityType.model_validate(item)
result.append(result_item)
return result
整体思路是创建BaseDbService实例时指定目标数据库,进行具体的CRUD操作时再指定表名以及查询结果要转换成的类型(pymongo查询出结果后默认是dict类型),这样底层的数据库服务就能操作任意库的所有表了,而且最终获取的数据是具体类型的对象。
实际上用户大致会按照下述方式使用BaseDbService:
if __name__ == '__main__':
mongo_settings = MongoSettings(connection_string="mongodb://localhost:27017/", database_name="test-activity")
db_service = BaseDbService(mongo_settings, "bWRHdzkzVDFJbWNB")
collection_name = "pycommon"
tenant_id = "test-tenant-id"
result = db_service.get("74bae4fe-13ec-11ef-be60-7404f152b5a1", ActivityQueryItem, tenant_id, collection_name)
三、数据库连接池机制
第二步中对数据库表操作之前都要先执行mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)来获取collection的连接,mongo_collection_pool是一个单例对象,内部有一个缓存机制来存储所有已经操作过的client、database和collection,连接从不关闭,这样可以保证连接不会重复的创建销毁从而提升性能。
看到这里有些细心的朋友可能会提问连接从不关闭,不会造成当前用户过多导致新用户无法连接的情况吗?本节先展示数据库连接池,大家的疑问咱们在下边有验证
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.collection import Collection
from pycommon.models.mongo.mongo_settings import MongoSettings
class MongoCollectionPool:
def __init__(self):
self.mongo_client_cache = {}
self.mongo_database_cache = {}
self.mongo_collection_cache = {}
def get_collection(self, mongo_settings: MongoSettings, collection_name: str, tenant_id: str) -> Collection:
client_cache_key = mongo_settings.connection_string
database_name = mongo_settings.database_name
database_cache_key = client_cache_key + "_" + mongo_settings.database_name
collection_name = collection_name + "_" + tenant_id if tenant_id else collection_name
collection_cache_key = client_cache_key + "_" + mongo_settings.database_name + "_" + collection_name
mongo_client: MongoClient = None
if client_cache_key in self.mongo_client_cache:
mongo_client = self.mongo_client_cache[client_cache_key]
else:
mongo_client = MongoClient(mongo_settings.connection_string)
self.mongo_client_cache[client_cache_key] = mongo_client
database: Database = None
if database_cache_key in self.mongo_database_cache:
database = self.mongo_database_cache[database_cache_key]
else:
database = mongo_client[database_name]
self.mongo_database_cache[database_cache_key] = database
if collection_cache_key in self.mongo_collection_cache:
return self.mongo_collection_cache[collection_cache_key]
else:
collection = database[collection_name]
self.mongo_collection_cache[collection_cache_key] = collection
return collection
# 单例对象
mongo_collection_pool = MongoCollectionPool()
from typing import Optional
from pycommon.models.base_core_model import BaseCoreModel
class MongoSettings(BaseCoreModel):
connection_string: Optional[str] = None
database_name: Optional[str] = None
四、mongodb数据库连接是否需要手动关闭
在判断是否需要手动关闭连接时,先要了解连接是在什么时机开启的(这样我们就能知道做哪些操作会创建新的连接),以及对象释放时是否会自动关闭(这样我们就可以根据不同的项目类型来设计不同的架构)。
为了查看mongodb当前的数据库状态,我们需要用到mongosh工具(我这里用的是mongosh-2.2.6-win32-x64,大家需要的话我可以私发到个人邮箱)。
mongosh安装方式:解压后把bin目录下的两个文件拷贝到C:\Windows\System32。
查看mongodb数据库状态:打开cmd -> 输入mongosh -> 输入db.serverStatus().connections
connections中文说明:serverStatus - MongoDB 手册 v 6 。 0
connections英文说明:serverStatus - MongoDB Manual v7.0
测试项目情况:定时任务项目(执行完毕立即退出)
下面是我测试时了解到的情况(详情请看截图):
- mongodb默认的可用线程高达100w
- 创建MongoClient时,active+1、current+2、threaded+2、available-2
- 创建DataBase时,exhaustHello+1
- 创建Collection时,数据库状态无变化
- 执行collectionA.find_one()方法时,current+1、threaded+1、available-1
- 继续执行collectionA.find()时,数据库状态无变化
- 继续执行collectionB.find()时,数据库状态无变化
- 项目结束后数据库状态恢复原状
得出结论:
- mongodb的可用性很强大,对于web服务类型这种常活程序来说只处理一个数据库且并发不高时,手动关闭更好不关也可以接受
- 对于定时任务这种用完即销毁的程序来说不需要考虑关闭连接的问题