前言
SQLAlchemy 是一个强大的 Python SQL 工具包和对象关系映射(ORM)系统,是业内比较流行的ORM,设计非常优雅。随着其2.0版本的发布,SQLAlchemy 引入了原生的异步支持,这极大地增强了其在处理高并发和异步I/O场景下的能力。通过结合像greenlet、gevent这样的协程库,SQLAlchemy 使得异步数据库操作成为可能,从而提高了应用程序的性能和响应速度。
这里我将基于SQLAlchemy的异步支持,封装一些常用的增删改查(CRUD)操作到 https://github.com/HuiDBK/py-tools 中,以便在项目开发中更加便捷地使用。
Github: https://github.com/sqlalchemy/sqlalchemy
2.0文档:https://docs.sqlalchemy.org/en/20/index.html
简单使用
封装前,先简单介绍下如何使用 SQLAIchemy。
具体细节可以参考官网文档:https://docs.sqlalchemy.org/en/20/orm/quickstart.html
安装依赖
pip install sqlalchemy[asyncio]==2.0.20
pip install aiomysql==0.2.0
这里安装了 sqlalchemy 2.0版本,以及 aiomysql 异步数据库驱动,进行演示。
创建异步数据库引擎
from sqlalchemy.ext.asyncio import create_async_engine
# db_uri = "{protocol}://{user}:{password}@{host}:{port}/{db}"
db_engine = create_async_engine("mysql+aiomysql://root:123456@127.0.0.1:3306/demo")
声明数据库表映射模型
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class BaseOrmTable(DeclarativeBase):
"""SQLAlchemy Base ORM Model"""
__abstract__ = True
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="主键ID")
class UserTable(BaseOrmTable):
"""用户表"""
__tablename__ = "user"
username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称")
password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码")
phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号")
email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱")
avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")
简单db操作
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# db_uri = "{protocol}://{user}:{password}@{host}:{port}/{db}"
db_engine = create_async_engine("mysql+aiomysql://root:123456@127.0.0.1:3306/hui-demo")
Session = async_sessionmaker(db_engine)
async def create_tables():
# 根据映射创建库表
async with db_engine.begin() as conn:
await conn.run_sync(BaseOrmTable.metadata.create_all)
async def main():
await create_tables()
async with Session.begin() as session:
# 添加用户
new_user = UserTable(username='hui', email='huidbk@163.com')
session.add(new_user)
await session.flush() # 刷新table 对象属性,获取新增的id
print(new_user.id)
print("add user", new_user.__dict__)
# 获取用户
user = await session.get(UserTable, new_user.id)
print("get user", user.__dict__)
# 更新用户
user.email = 'hui@163.com'
await session.merge(user)
print("updated user", user.__dict__)
# 删除用户
await session.delete(user)
if __name__ == '__main__':
# 运行主函数
asyncio.run(main())
常用DB操作封装
SQLAlchemyManager
class SQLAlchemyManager(metaclass=SingletonMetaCls):
DB_URL_TEMPLATE = "{protocol}://{user}:{password}@{host}:{port}/{db}"
def __init__(
self,
host: str = "localhost",
port: int = 3306,
user: str = "",
password: str = "",
db_name: str = "",
pool_size: int = 30,
pool_pre_ping: bool = True,
pool_recycle: int = 600,
log: Union[logging.Logger] = None,
):
self.host = host
self.port = port
self.user = user
self.password = password
self.db_name = db_name
self.pool_size = pool_size
self.pool_pre_ping = pool_pre_ping
self.pool_recycle = pool_recycle
self.log = log or logger
self.db_engine: AsyncEngine = None
self.async_session_maker: async_sessionmaker = None
def get_db_url(self, protocol: str = "mysql+aiomysql"):
db_url = self.DB_URL_TEMPLATE.format(
protocol=protocol, user=self.user, password=self.password, host=self.host, port=self.port, db=self.db_name
)
return db_url
def init_db_engine(self, protocol: str):
"""
初始化db引擎
Args:
protocol: 驱动协议类型
Returns:
self.db_engine
"""
db_url = self.get_db_url(protocol=protocol)
self.log.info(f"init_db_engine => {db_url}")
self.db_engine = create_async_engine(
url=db_url, pool_size=self.pool_size, pool_pre_ping=self.pool_pre_ping, pool_recycle=self.pool_recycle
)
self.async_session_maker = async_sessionmaker(bind=self.db_engine, expire_on_commit=False)
return self.db_engine
def init_mysql_engine(self, protocol: str = "mysql+aiomysql"):
"""
初始化mysql引擎
Args:
protocol: 驱动协议类型
Returns:
self.db_engine
"""
return self.init_db_engine(protocol=protocol)
SQLAlchemyManager 主要封装一些数据库账户配置信息、连接池信息。
pool_size(连接池大小): 指定连接池中允许保持的最大连接数。当应用程序需要访问数据库时,连接池会维护一定数量的数据库连接,以便快速地响应请求。通常情况下,pool_size 的值应该根据应用程序的并发访问量和数据库的性能来进行调整。
pool_pre_ping(预检查连接): 指定是否在数据库连接被使用前对连接进行预检查。预检查可以确保连接处于活动状态,并且可以自动重新连接到数据库服务器,以防止连接由于长时间空闲而失效。启用预检查可以提高应用程序对数据库的可靠性和稳定性。
pool_recycle(连接回收时间): 指定数据库连接在被重新使用之前的最大空闲时间。当连接空闲时间超过 pool_recycle 设置的值时,连接将被关闭并重新创建,以防止连接长时间处于空闲状态而导致的连接问题。pool_recycle 的值通常设置为一个较小的时间间隔,以确保连接能够及时地得到回收和重建,从而提高连接的健壮性和性能。
init_db_engine
方法则是初始化数据库引擎,内部根据数据库配置信息
- 构造异步的数据库引擎 db_engine
- 维护一个 async_session_maker 数据库会话工厂
BaseORMTable 映射库表封装
from datetime import datetime
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class BaseOrmTable(AsyncAttrs, DeclarativeBase):
"""SQLAlchemy Base ORM Model"""
__abstract__ = True
id: Mapped[int] = mapped_column(primary_key=True, comment="主键ID")
def __repr__(self):
return str(self.to_dict())
def to_dict(self, alias_dict: dict = None, exclude_none=True) -> dict:
"""
数据库模型转成字典
Args:
alias_dict: 字段别名字典 eg: {"id": "user_id"}, 把id名称替换成 user_id
exclude_none: 默认排查None值
Returns: dict
"""
alias_dict = alias_dict or {}
if exclude_none:
return {
alias_dict.get(c.name, c.name): getattr(self, c.name)
for c in self.__table__.columns if getattr(self, c.name) is not None
}
else:
return {
alias_dict.get(c.name, c.name): getattr(self, c.name, None)
for c in self.__table__.columns
}
class TimestampColumns(AsyncAttrs, DeclarativeBase):
"""时间戳相关列"""
__abstract__ = True
created_at: Mapped[datetime] = mapped_column(default=datetime.now, comment="创建时间")
updated_at: Mapped[datetime] = mapped_column(default=datetime.now, onupdate=datetime.now, comment="更新时间")
deleted_at: Mapped[datetime] = mapped_column(nullable=True, comment="删除时间")
class BaseOrmTableWithTS(BaseOrmTable, TimestampColumns):
__abstract__ = True
创建一些基础的 ORM 类,以便后续的映射类可以继承并且共享一些公有属性和方法。
-
BaseOrmTable
类:- 定义了一个基础的 ORM 模型类,继承了
AsyncAttrs
和DeclarativeBase
。这样做使得BaseOrmTable
类具有了异步属性访问的能力,为异步编程提供便利,特别是在异步环境中访问具有延迟加载或者异步加载特性的属性。 - 提供了一个
to_dict
方法,用于将数据库模型转换为字典。它支持通过参数alias_dict
指定字段别名,并且可以选择是否排除值为 None 的属性。
- 定义了一个基础的 ORM 模型类,继承了
-
TimestampColumns
类:- 定义了一个包含时间戳相关列的抽象基类。这些列通常在很多数据库表中都会有,用于记录数据的创建时间、更新时间和删除时间。
- 这些列被设置为默认值,比如
created_at
和updated_at
默认使用datetime.now
函数来自动记录当前时间,deleted_at
则允许为空,用于标记数据的删除时间(可用作于逻辑删除)
-
BaseOrmTableWithTS
类:- 继承了
BaseOrmTable
和TimestampColumns
,实际上是一个组合类,集成了基础的 ORM 功能和时间戳相关的列。 - 这个类进一步封装了
BaseOrmTable
和TimestampColumns
,使得后续的映射类只需要继承这个类,就能够拥有基础的 ORM 功能和时间戳相关的列。
- 继承了
通过这种封装,你可以在后续的数据库映射类中更加专注于业务逻辑的实现,而不需要重复编写基础的 ORM 功能和时间戳相关的列,提高了代码的重用性和可维护性。
DBManager 数据库通用操作封装
前置封装说明
from typing import Any, List, Type, TypeVar, Union
from py_tools.connections.db.mysql import BaseOrmTable
from py_tools.meta_cls import SingletonMetaCls
# 泛指 BaseOrmTable 所有子类实例对象类型
T_BaseOrmTable = TypeVar("T_BaseOrmTable", bound=BaseOrmTable)
T_Hints = TypeVar("T_Hints") # 用于修复被装饰的函数参数提示,让IDE有类型提示
def with_session(method) -> T_Hints:
"""
兼容事务会话
Args:
method: orm 的 crud
Notes:
方法中没有带事务连接则,则构造
Returns:
"""
@functools.wraps(method)
async def wrapper(db_manager, *args, **kwargs):
session = kwargs.get("session") or None
if session:
return await method(db_manager, *args, **kwargs)
else:
async with db_manager.transaction() as session:
kwargs["session"] = session
return await method(db_manager, *args, **kwargs)
return wrapper
这里我提供了一个 with_session 装饰器,用于在需要数据库会话(事务)的数据库操作方法中自动开启事务,由于 sqlaichemy 官方推荐每个数据库操作都手动开启事务会话(自动提交),装饰器的设计没有时则构造,有则共享,这样不但可以减少冗余 async with db_manager.transaction() as session 的代码,也可以兼容多个操作共享同一个 session 有问题时进行事务回滚。
由于给方法加了通用的装饰器导致一些版本的IDE无法识别方法真实的签名,使用时会出现不知道方法的入参是什么,对于开发者来说是极其不方便的。
使用 typing 的 TypeVar 自定义类型来构造一个通用的泛型来当作函数返回的类型,进而修复。
from typing import TypeVar
T_Hints = TypeVar("T_Hints") # 用于修复被装饰的函数参数提示,让IDE有类型提示
def with_session(method) -> T_Hints:
...
这里PyCharm 2023.2.4 版本升级到 2024.1 就有提示了,IDE修复了,可以不用 T_Hints 了。
一些旧版本构造 sqlaichemy 的库表对象时也会出现不知道类对象属性入参提示,升级到最新版本都解决了。
from contextlib import asynccontextmanager
class DBManager(metaclass=SingletonMetaCls):
DB_CLIENT: SQLAlchemyManager = None
orm_table: Type[BaseOrmTable] = None
@classmethod
def init_db_client(cls, db_client: SQLAlchemyManager):
cls.DB_CLIENT = db_client
return cls.DB_CLIENT
@classmethod
@asynccontextmanager
async def transaction(cls):
"""事务上下文管理器"""
async with cls.DB_CLIENT.async_session_maker.begin() as session:
yield session
@classmethod
@asynccontextmanager
async def connection(cls) -> AsyncIterator[AsyncConnection]:
"""数据库引擎连接上下文管理器"""
async with cls.DB_CLIENT.db_engine.begin() as conn:
yield conn
- init_db_client 方法用于初始化数据库客户端(引擎)。
- transaction 则是简单的通过 contextlib 中 asynccontextmanager 封装一个异步的上下文管理器方便简洁的开启一个数据库会话(事务)进行数据库相关操作。
- connection 数据库引擎连接上下文管理器。
- orm_table 是具体继承 DBManager 的子类进行指定的,用于操作具体的库表(orm_table)。
- DBManager 通过 SingletonMetaCls 元类实现单例模式。具体单例模式可以了解 https://juejin.cn/post/7272006755265380367 这篇文章有详细的介绍。
DB添加操作封装
class DBManager(metaclass=SingletonMetaCls):
DB_CLIENT: SQLAlchemyManager = None
orm_table: Type[BaseOrmTable] = None
@with_session
async def bulk_add(
self,
table_objs: List[Union[T_BaseOrmTable, dict]],
*,
orm_table: Type[BaseOrmTable] = None,
flush: bool = False,
session: AsyncSession = None
) -> List[T_BaseOrmTable]:
"""
批量插入
Args:
table_objs: orm映射类实例列表
eg.[UserTable(username="hui", age=18), ...] or [{"username": "hui", "age": 18}, ...]
orm_table: orm表映射类
flush: 刷新对象状态,默认不刷新
session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务
Returns:
成功插入的对象列表
"""
orm_table = orm_table or self.orm_table
if all(isinstance(table_obj, dict) for table_obj in table_objs):
# 字典列表转成orm映射类实例列表处理
table_objs = [orm_table(**table_obj) for table_obj in table_objs]
session.add_all(table_objs)
if flush:
await session.flush(table_objs)
return table_objs
@with_session
async def add(
self,
table_obj: [T_BaseOrmTable, dict],
*,
orm_table: Type[BaseOrmTable] = None,
session: AsyncSession = None
) -> int:
"""
插入一条数据
Args:
table_obj: orm映射类实例对象, eg. UserTable(username="hui", age=18) or {"username": "hui", "age": 18}
orm_table: orm表映射类
session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务
Returns: 新增的id
table_obj.id
"""
orm_table = orm_table or self.orm_table
if isinstance(table_obj, dict):
table_obj = orm_table(**table_obj)
session.add(table_obj)
await session.flush(objects=[table_obj]) # 刷新对象状态,获取新增的id
return table_obj.id
这里就是用 session.add 与 add_all 方法封装了数据库添加、批量添加的操作,封装的点主要在于除了 orm_table 实例对象入参还支持字典入参,内部还是转换成库表映射类实例来操作,最后通过 session.flush 方法,单个添加返回新增的主键id,批量添加则是返回实例对象列表。
设计的方法中有一个 * 号是参数的分隔符,它的作用是将其前面的参数声明为位置参数,而将 * 后面的参数声明为关键字参数,* 号后面的参数入参只能使用关键字形式的入参,我在很多的开源库中都看到了这样的设计,可以把一些函数语义连贯、常用必传的参数设置为位置参数,其他的则是关键字参数。这样可以明确参数的作用、提高函数的可读性、防止参数错误等。
具体看下使用案例:
import asyncio
from sqlalchemy import String
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from py_tools.connections.db.mysql import BaseOrmTableWithTS, BaseOrmTable, DBManager, SQLAlchemyManager
class UserTable(BaseOrmTableWithTS):
"""用户表"""
__tablename__ = "user"
username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称")
password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码")
phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号")
email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱")
avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")
async def create_tables():
# 根据映射创建库表(异步)
# async with db_engine.begin() as conn:
# await conn.run_sync(BaseOrmTable.metadata.create_all)
async with DBManager.connection() as conn:
await conn.run_sync(BaseOrmTable.metadata.create_all)
async def init_orm_manager():
db_client = SQLAlchemyManager(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
db_name="hui-demo",
)
db_client.init_mysql_engine()
DBManager().init_db_client(db_client)
async def manager_crud():
user = {"username": "hui", "email": "huidbk.163.com"}
user_id = await DBManager().add(table_obj=user, orm_table=UserTable)
print("user_id", user_id)
users = [
{"username": "zack", "email": "zack.163.com"},
{"username": "wang", "email": "wang.163.com"}
]
add_users = await DBManager().bulk_add(table_objs=users, orm_table=UserTable)
add_user_ids = [user.id for user in add_users]
print("add_user_ids", add_user_ids)
async def main():
await create_tables()
# await normal_crud()
await init_orm_manager()
await manager_crud()
if __name__ == '__main__':
# 运行主函数
asyncio.run(main())
在程序启动时初始化好DBManager 的 DB_CLIENT 就可以直接使用封装的方法,主要就是 DB_CLIENT 作为类属性,后面DBManager 实例与子类实例对象都可以共享这个数据库引擎。但我这里还是不推荐上面的写法,DBManager 是一些通用的DB操作,而具体一些业务操作还是单独封装一些DB业务Manager类来进行会比较好,更利于扩展维护与复用。
class UserManager(DBManager):
orm_table = UserTable
async def get_name_by_email(self, email):
username = await self.query_one(cols=["username"], conds=[self.orm_table.email == email], flat=True)
return username
async def manager_crud():
# demo 2 (推荐)
user = UserTable(username="hui-test01", email="hui-test01.163.com")
user_id = await UserManager().add(table_obj=user)
print("user_id", user_id)
users = [
UserTable(username="hui-test02", email="hui-test02.163.com"),
UserTable(username="hui-test03", email="hui-test03.163.com"),
]
add_users = await UserManager().bulk_add(table_objs=users)
add_user_ids = [user.id for user in add_users]
print("add_user_ids", add_user_ids)
username = await UserManager().get_name_by_email(email="huidbk.163.com")
print("username", username)
>>> out
user_id 4
add_user_ids [5, 6]
username hui
这里 UserManager 单独封装的 get_name_by_email 的方法就是业务中常用查询操作通过邮件获取用户名称,这里就是举一个简单的例子,具体DB业务具体封装而不是全部写在逻辑层,这样别人要用的时候就不用重新组织条件参数、上下文,而是简单传递业务参数进行复用获取数据。
UserManager 调用 add、bulk_add 等方法时也不用像 DBManager 指定 orm_table 参数,使用起来更简洁。具体是因为 UserManager 类指定了 类属性 orm_table = UserTable,再封装时有一句 orm_table = orm_table or self.orm_table 意思就是优先选择入参的orm_table,没有则是 self.orm_table (具体实例对象的orm_table)。这样写也体现出 封装、继承的灵活性。
这里也引出了另一个封装方法 query_one 查询单条数据。由于介绍了一些Demo如果把所有的封装方法混合到一起篇幅就太长,故而我准备分成三篇进行分别介绍,这样也更好阅读。
- SQLAIchemy 异步DBManager封装-01入门理解
- SQLAIchemy 异步DBManager封装-02熟悉掌握
- SQLAIchemy 异步DBManager封装-03得心应手
Github源代码
源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。
HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)