Python MySQL 进阶用法详解
1. 使用连接池
使用 DBUtils 实现连接池管理:
from dbutils.pooled_db import PooledDB
import pymysql
class DBConnectionPool:
_pool = None
@staticmethod
def get_pool():
if DBConnectionPool._pool is None:
DBConnectionPool._pool = PooledDB(
creator=pymysql, # 使用pymysql作为连接器
maxconnections=10, # 连接池最大连接数
mincached=2, # 初始化时创建的空闲连接数
maxcached=5, # 连接池最大空闲连接数
maxshared=3, # 共享连接数
blocking=True, # 连接数达到最大时是否阻塞
maxusage=None, # 一个连接最多被使用的次数
setsession=[], # 开始会话前执行的命令
ping=0, # ping MySQL服务端确保连接有效
host='localhost',
port=3306,
user='root',
password='123456',
database='test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return DBConnectionPool._pool
@staticmethod
def get_conn():
return DBConnectionPool.get_pool().connection()
# 使用示例
class UserDAO:
def get_user(self, user_id):
with DBConnectionPool.get_conn() as conn:
with conn.cursor() as cursor:
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
2. 实现简单的 ORM 映射
from typing import Dict, Any, Type, TypeVar
from datetime import datetime
T = TypeVar('T', bound='BaseModel')
class Field:
def __init__(self, field_type: str, primary_key: bool = False):
self.field_type = field_type
self.primary_key = primary_key
class BaseModel:
_table_name: str = ''
_fields: Dict[str, Field] = {}
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
@classmethod
def from_db_dict(cls: Type[T], db_dict: Dict[str, Any]) -> T:
"""从数据库字典创建对象"""
return cls(**db_dict)
def to_db_dict(self) -> Dict[str, Any]:
"""转换为数据库字典"""
result = {}
for key in self._fields:
if hasattr(self, key):
result[key] = getattr(self, key)
return result
class User(BaseModel):
_table_name = 'users'
_fields = {
'id': Field('BIGINT', primary_key=True),
'username': Field('VARCHAR(50)'),
'password': Field('VARCHAR(100)'),
'age': Field('INT'),
'create_at': Field('TIMESTAMP')
}
def __init__(self, username: str, password: str, age: int,
id: int = None, create_at: datetime = None):
self.id = id
self.username = username
self.password = password
self.age = age
self.create_at = create_at
class UserRepository:
def __init__(self, db_pool):
self.db_pool = db_pool
def save(self, user: User) -> User:
with self.db_pool.get_conn() as conn:
with conn.cursor() as cursor:
if user.id is None:
# Insert
sql = """INSERT INTO users (username, password, age)
VALUES (%s, %s, %s)"""
cursor.execute(sql, (user.username, user.password, user.age))
user.id = cursor.lastrowid
else:
# Update
sql = """UPDATE users SET username=%s, password=%s, age=%s
WHERE id=%s"""
cursor.execute(sql, (user.username, user.password,
user.age, user.id))
conn.commit()
return user
# 使用示例
user = User(username="张三", password="123456", age=25)
repo = UserRepository(DBConnectionPool())
saved_user = repo.save(user)
3. 读写分离实现
from enum import Enum
from typing import List
import random
class DBType(Enum):
MASTER = "master"
SLAVE = "slave"
class DBConfig:
def __init__(self, host: str, port: int, db_type: DBType):
self.host = host
self.port = port
self.db_type = db_type
class DBRouter:
def __init__(self):
self.master_config = DBConfig("master.mysql", 3306, DBType.MASTER)
self.slave_configs: List[DBConfig] = [
DBConfig("slave1.mysql", 3306, DBType.SLAVE),
DBConfig("slave2.mysql", 3306, DBType.SLAVE),
]
# 创建连接池
self.master_pool = self._create_pool(self.master_config)
self.slave_pools = [self._create_pool(cfg) for cfg in self.slave_configs]
def _create_pool(self, config: DBConfig):
return PooledDB(
creator=pymysql,
maxconnections=10,
host=config.host,
port=config.port,
user='root',
password='123456',
database='test',
charset='utf8mb4'
)
def get_connection(self, for_write: bool = False):
if for_write:
return self.master_pool.connection()
# 随机选择一个从库
slave_pool = random.choice(self.slave_pools)
return slave_pool.connection()
class UserService:
def __init__(self, db_router: DBRouter):
self.db_router = db_router
def get_user(self, user_id: int):
# 读操作从从库获取
with self.db_router.get_connection(for_write=False) as conn:
with conn.cursor() as cursor:
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
def create_user(self, user: User):
# 写操作使用主库
with self.db_router.get_connection(for_write=True) as conn:
with conn.cursor() as cursor:
sql = """INSERT INTO users (username, password, age)
VALUES (%s, %s, %s)"""
cursor.execute(sql, (user.username, user.password, user.age))
conn.commit()
return cursor.lastrowid
4. 分库分表实现
from hashlib import md5
from typing import Tuple
class ShardingConfig:
DB_COUNT = 2 # 数据库数量
TABLE_COUNT = 4 # 每个库中的表数量
class ShardingRouter:
@staticmethod
def get_db_table(user_id: int) -> Tuple[int, int]:
"""获取分库分表位置"""
# 使用用户ID做hash
hash_val = int(md5(str(user_id).encode()).hexdigest(), 16)
db_index = hash_val % ShardingConfig.DB_COUNT
table_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNT
return db_index, table_index
def get_connection(self, db_index: int):
"""获取指定分库的连接"""
# 这里简化处理,实际应该维护多个连接池
config = {
'host': f'mysql{db_index}.example.com',
'port': 3306,
'user': 'root',
'password': '123456',
'database': f'test_{db_index}'
}
return pymysql.connect(**config)
class ShardingUserRepository:
def __init__(self):
self.router = ShardingRouter()
def get_user(self, user_id: int) -> Optional[Dict]:
db_index, table_index = self.router.get_db_table(user_id)
with self.router.get_connection(db_index) as conn:
with conn.cursor() as cursor:
sql = f"SELECT * FROM users_{table_index} WHERE id = %s"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
def create_user(self, user: User) -> int:
# 这里使用用户名作为分片键
hash_val = int(md5(user.username.encode()).hexdigest(), 16)
db_index = hash_val % ShardingConfig.DB_COUNT
table_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNT
with self.router.get_connection(db_index) as conn:
with conn.cursor() as cursor:
sql = f"""INSERT INTO users_{table_index}
(username, password, age) VALUES (%s, %s, %s)"""
cursor.execute(sql, (user.username, user.password, user.age))
conn.commit()
return cursor.lastrowid
5. 主从复制配置
5.1 主库配置 (my.cnf)
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
sync_binlog = 1
5.2 从库配置 (my.cnf)
[mysqld]
server-id = 2
relay-log = slave-relay-bin
read_only = 1
5.3 主从复制设置
在主库执行:
-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;
-- 获取主库状态
SHOW MASTER STATUS;
在从库执行:
CHANGE MASTER TO
MASTER_HOST='master_host',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=123;
-- 启动从库复制
START SLAVE;
-- 查看从库状态
SHOW SLAVE STATUS\G
5.4 Python 监控主从状态
class ReplicationMonitor:
def __init__(self, master_pool, slave_pool):
self.master_pool = master_pool
self.slave_pool = slave_pool
def check_replication_status(self) -> Dict:
master_status = self._get_master_status()
slave_status = self._get_slave_status()
return {
'master': master_status,
'slave': slave_status,
'delay': self._calculate_delay(master_status, slave_status)
}
def _get_master_status(self) -> Dict:
with self.master_pool.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("SHOW MASTER STATUS")
return cursor.fetchone()
def _get_slave_status(self) -> Dict:
with self.slave_pool.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("SHOW SLAVE STATUS")
return cursor.fetchone()
def _calculate_delay(self, master_status: Dict, slave_status: Dict) -> int:
# 计算主从延迟
if not master_status or not slave_status:
return -1
return slave_status.get('Seconds_Behind_Master', -1)
# 使用示例
monitor = ReplicationMonitor(master_pool, slave_pool)
status = monitor.check_replication_status()
print(f"主从延迟: {status['delay']} 秒")