Python MySQL 进阶用法详解

Python MySQL 进阶用法详解

1. 使用连接池

使用 DBUtils 实现连接池管理:

from dbutils.pooled_db import PooledDB
import pymysql

class DBConnectionPool:
    _pool = None
    
    @staticmethod
    def get_pool():
        if DBConnectionPool._pool is None:
            DBConnectionPool._pool = PooledDB(
                creator=pymysql,        # 使用pymysql作为连接器
                maxconnections=10,      # 连接池最大连接数
                mincached=2,           # 初始化时创建的空闲连接数
                maxcached=5,           # 连接池最大空闲连接数
                maxshared=3,           # 共享连接数
                blocking=True,         # 连接数达到最大时是否阻塞
                maxusage=None,         # 一个连接最多被使用的次数
                setsession=[],         # 开始会话前执行的命令
                ping=0,                # ping MySQL服务端确保连接有效
                host='localhost',
                port=3306,
                user='root',
                password='123456',
                database='test',
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor
            )
        return DBConnectionPool._pool

    @staticmethod
    def get_conn():
        return DBConnectionPool.get_pool().connection()

# 使用示例
class UserDAO:
    def get_user(self, user_id):
        with DBConnectionPool.get_conn() as conn:
            with conn.cursor() as cursor:
                sql = "SELECT * FROM users WHERE id = %s"
                cursor.execute(sql, (user_id,))
                return cursor.fetchone()

2. 实现简单的 ORM 映射

from typing import Dict, Any, Type, TypeVar
from datetime import datetime

T = TypeVar('T', bound='BaseModel')

class Field:
    def __init__(self, field_type: str, primary_key: bool = False):
        self.field_type = field_type
        self.primary_key = primary_key

class BaseModel:
    _table_name: str = ''
    _fields: Dict[str, Field] = {}
    
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)
    
    @classmethod
    def from_db_dict(cls: Type[T], db_dict: Dict[str, Any]) -> T:
        """从数据库字典创建对象"""
        return cls(**db_dict)
    
    def to_db_dict(self) -> Dict[str, Any]:
        """转换为数据库字典"""
        result = {}
        for key in self._fields:
            if hasattr(self, key):
                result[key] = getattr(self, key)
        return result

class User(BaseModel):
    _table_name = 'users'
    _fields = {
        'id': Field('BIGINT', primary_key=True),
        'username': Field('VARCHAR(50)'),
        'password': Field('VARCHAR(100)'),
        'age': Field('INT'),
        'create_at': Field('TIMESTAMP')
    }
    
    def __init__(self, username: str, password: str, age: int, 
                 id: int = None, create_at: datetime = None):
        self.id = id
        self.username = username
        self.password = password
        self.age = age
        self.create_at = create_at

class UserRepository:
    def __init__(self, db_pool):
        self.db_pool = db_pool
    
    def save(self, user: User) -> User:
        with self.db_pool.get_conn() as conn:
            with conn.cursor() as cursor:
                if user.id is None:
                    # Insert
                    sql = """INSERT INTO users (username, password, age) 
                            VALUES (%s, %s, %s)"""
                    cursor.execute(sql, (user.username, user.password, user.age))
                    user.id = cursor.lastrowid
                else:
                    # Update
                    sql = """UPDATE users SET username=%s, password=%s, age=%s 
                            WHERE id=%s"""
                    cursor.execute(sql, (user.username, user.password, 
                                      user.age, user.id))
                conn.commit()
                return user

# 使用示例
user = User(username="张三", password="123456", age=25)
repo = UserRepository(DBConnectionPool())
saved_user = repo.save(user)

3. 读写分离实现

from enum import Enum
from typing import List
import random

class DBType(Enum):
    MASTER = "master"
    SLAVE = "slave"

class DBConfig:
    def __init__(self, host: str, port: int, db_type: DBType):
        self.host = host
        self.port = port
        self.db_type = db_type

class DBRouter:
    def __init__(self):
        self.master_config = DBConfig("master.mysql", 3306, DBType.MASTER)
        self.slave_configs: List[DBConfig] = [
            DBConfig("slave1.mysql", 3306, DBType.SLAVE),
            DBConfig("slave2.mysql", 3306, DBType.SLAVE),
        ]
        
        # 创建连接池
        self.master_pool = self._create_pool(self.master_config)
        self.slave_pools = [self._create_pool(cfg) for cfg in self.slave_configs]
    
    def _create_pool(self, config: DBConfig):
        return PooledDB(
            creator=pymysql,
            maxconnections=10,
            host=config.host,
            port=config.port,
            user='root',
            password='123456',
            database='test',
            charset='utf8mb4'
        )
    
    def get_connection(self, for_write: bool = False):
        if for_write:
            return self.master_pool.connection()
        # 随机选择一个从库
        slave_pool = random.choice(self.slave_pools)
        return slave_pool.connection()

class UserService:
    def __init__(self, db_router: DBRouter):
        self.db_router = db_router
    
    def get_user(self, user_id: int):
        # 读操作从从库获取
        with self.db_router.get_connection(for_write=False) as conn:
            with conn.cursor() as cursor:
                sql = "SELECT * FROM users WHERE id = %s"
                cursor.execute(sql, (user_id,))
                return cursor.fetchone()
    
    def create_user(self, user: User):
        # 写操作使用主库
        with self.db_router.get_connection(for_write=True) as conn:
            with conn.cursor() as cursor:
                sql = """INSERT INTO users (username, password, age) 
                        VALUES (%s, %s, %s)"""
                cursor.execute(sql, (user.username, user.password, user.age))
                conn.commit()
                return cursor.lastrowid

4. 分库分表实现

from hashlib import md5
from typing import Tuple

class ShardingConfig:
    DB_COUNT = 2      # 数据库数量
    TABLE_COUNT = 4   # 每个库中的表数量

class ShardingRouter:
    @staticmethod
    def get_db_table(user_id: int) -> Tuple[int, int]:
        """获取分库分表位置"""
        # 使用用户ID做hash
        hash_val = int(md5(str(user_id).encode()).hexdigest(), 16)
        db_index = hash_val % ShardingConfig.DB_COUNT
        table_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNT
        return db_index, table_index

    def get_connection(self, db_index: int):
        """获取指定分库的连接"""
        # 这里简化处理,实际应该维护多个连接池
        config = {
            'host': f'mysql{db_index}.example.com',
            'port': 3306,
            'user': 'root',
            'password': '123456',
            'database': f'test_{db_index}'
        }
        return pymysql.connect(**config)

class ShardingUserRepository:
    def __init__(self):
        self.router = ShardingRouter()
    
    def get_user(self, user_id: int) -> Optional[Dict]:
        db_index, table_index = self.router.get_db_table(user_id)
        with self.router.get_connection(db_index) as conn:
            with conn.cursor() as cursor:
                sql = f"SELECT * FROM users_{table_index} WHERE id = %s"
                cursor.execute(sql, (user_id,))
                return cursor.fetchone()
    
    def create_user(self, user: User) -> int:
        # 这里使用用户名作为分片键
        hash_val = int(md5(user.username.encode()).hexdigest(), 16)
        db_index = hash_val % ShardingConfig.DB_COUNT
        table_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNT
        
        with self.router.get_connection(db_index) as conn:
            with conn.cursor() as cursor:
                sql = f"""INSERT INTO users_{table_index} 
                         (username, password, age) VALUES (%s, %s, %s)"""
                cursor.execute(sql, (user.username, user.password, user.age))
                conn.commit()
                return cursor.lastrowid

5. 主从复制配置

5.1 主库配置 (my.cnf)

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
sync_binlog = 1

5.2 从库配置 (my.cnf)

[mysqld]
server-id = 2
relay-log = slave-relay-bin
read_only = 1

5.3 主从复制设置

在主库执行:

-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;

-- 获取主库状态
SHOW MASTER STATUS;

在从库执行:

CHANGE MASTER TO
    MASTER_HOST='master_host',
    MASTER_USER='repl',
    MASTER_PASSWORD='password',
    MASTER_LOG_FILE='mysql-bin.000001',
    MASTER_LOG_POS=123;

-- 启动从库复制
START SLAVE;

-- 查看从库状态
SHOW SLAVE STATUS\G

5.4 Python 监控主从状态

class ReplicationMonitor:
    def __init__(self, master_pool, slave_pool):
        self.master_pool = master_pool
        self.slave_pool = slave_pool
    
    def check_replication_status(self) -> Dict:
        master_status = self._get_master_status()
        slave_status = self._get_slave_status()
        
        return {
            'master': master_status,
            'slave': slave_status,
            'delay': self._calculate_delay(master_status, slave_status)
        }
    
    def _get_master_status(self) -> Dict:
        with self.master_pool.get_conn() as conn:
            with conn.cursor() as cursor:
                cursor.execute("SHOW MASTER STATUS")
                return cursor.fetchone()
    
    def _get_slave_status(self) -> Dict:
        with self.slave_pool.get_conn() as conn:
            with conn.cursor() as cursor:
                cursor.execute("SHOW SLAVE STATUS")
                return cursor.fetchone()
    
    def _calculate_delay(self, master_status: Dict, slave_status: Dict) -> int:
        # 计算主从延迟
        if not master_status or not slave_status:
            return -1
        return slave_status.get('Seconds_Behind_Master', -1)

# 使用示例
monitor = ReplicationMonitor(master_pool, slave_pool)
status = monitor.check_replication_status()
print(f"主从延迟: {status['delay']} 秒")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/939294.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LWIP协议:三次握手和四次挥手、TCP/IP模型

一、三次握手:是客户端与服务器建立连接的方式; 1、客户端发送建立TCP连接的请求。seq序列号是由发送端随机生成的,SYN字段置为1表示需要建立TCP连接。(SYN1,seqx,x为随机生成数值);…

Kafka Streams 在监控场景的应用与实践

作者:来自 vivo 互联网服务器团队- Pang Haiyun 介绍 Kafka Streams 的原理架构,常见配置以及在监控场景的应用。 一、背景 在当今大数据时代,实时数据处理变得越来越重要,而监控数据的实时性和可靠性是监控能力建设最重要的一环…

Medium是什么,Medium能干嘛,如何用开通medium会员

1.背景介绍 1.1 什么是medium medium是国外一个内容创作和分享平台。 主要用户来自美国,每月有26万的访问量。 网址: Medium官网 平台注重优质、专业的内容。 这个平台有2点比较吸引人: ① 内容优质、专业 ② 在上面写作,能…

【实验17】不同优化算法的比较分析

目录 1 不同优化算法比较分析-2D可视化实验 1.1 优化算法的实验设定(以函数为例) 1.2 学习率调整优化策略 1.1.2 AdaGrad算法 1.1.2 RMSprop算法 1.3 梯度估计修正优化策略 1.3.1 动量法 1.3.2 Adam算法 1.4 完整代码 1.5 函数 的优化算法比较 2 不同优化算法比较分…

复习打卡大数据篇——Hadoop HDFS 01

目录 1. HDFS简介 2. HDFS基本操作 3. HDFS原理 1. HDFS简介 HDFS概念: HDFS是一个分布式的文件系统。分布式意味着多台机器存储,文件系统,就是用来存储文件、存储数据。是大数据最底层一个服务。 HDFS设计目标: 故障的检测…

Odoo:免费开源ERP的AI技术赋能出海企业电子商务应用介绍

概述 伴随电子商务的持续演进,客户对于便利性、速度以及个性化服务的期许急剧攀升。企业务必要探寻创新之途径,以强化自身运营,并优化购物体验。达成此目标的最为行之有效的方式之一,便是将 AI 呼叫助手融入您的电子商务平台。我们…

基于base32的兑换码算法(思路)

base32编码指的是基于32个可打印字符对任意字节数据进行编码:大写字母A-Z以及数字2-7。 兑换码要求:长度为10个字符 如果将这32个字符依次放到一个base数组中,那么最大的下标就是31。我们将要编码的任意字节数据按照五个bit为一组进行划分,…

前端开发环境(vue)

1. 安装nvm管理nodejs的版本 1. 配置nvm 2. 用npm安装nodejs,选则nodejs版本,这是js的运行环境 3 . 安装npm,这是前端的包管理器 npm是nodejs开发的包管理器,现在下载了nodejs就默认下载npm了,绑在一块了,不用 1. npm的中央仓库 2. npm私服仓库 换库 npm config set r…

第十七章:反射+设计模式

一、反射 1. 反射(Reflection):允许在程序运行状态中,可以获取任意类中的属性和方法,并且可以操作任意对象内部的属 性和方法,这种动态获取类的信息及动态操作对象的属性和方法对应的机制称为反射机制。 2. 类对象 和 类的对象(实…

arduino继电器与电机水泵的使用

首先说一句,真受不了网上的教程,大海里捞金,要不上来了就讲原理,怎么具体使用一句不说,要么炫技来了。 继电器,简单来说把他当开关看,通过小电流控制大电流(原理去看其他视频),要记…

【Java Web】Axios实现前后端数据异步交互

目录 一、Promise概述 二、Promise基本用法 三、async和await关键字 四、Axios介绍 4.1 Axios基本用法 4.2 Axios简化用法之get和post方法 五、Axios拦截器 六、跨域问题处理 一、Promise概述 axios是代替原生的ajax实现前后端数据交互的一套新解决方案,而…

网络编程 03:端口的定义、分类,端口映射,通过 Java 实现了 IP 和端口的信息获取

一、概述 记录时间 [2024-12-19] 前置文章: 网络编程 01:计算机网络概述,网络的作用,网络通信的要素,以及网络通信协议与分层模型 网络编程 02:IP 地址,IP 地址的作用、分类,通过 …

webdriver 反爬虫 (selenium反爬虫) 绕过

1. webdriver 反爬虫原理 爬虫程序可以借助渲染工具从动态网页中获取数据。 在这个过程中,“借助”其实是通过对应的浏览器驱动(即WebDriver)向浏览器发出指令的行为。因此,开发者可以根据客户端是否包含浏览器驱动这一特征来区分…

JAVA 零拷贝技术和主流中间件零拷贝技术应用

目录 介绍Java代码里面有哪些零拷贝技术java 中文件读写方式主要分为什么是FileChannelmmap实现sendfile实现 文件IO实战需求代码编写实战IOTest.java 文件上传阿里云,测试运行代码看耗时为啥带buffer的IO比普通IO性能高?BufferedInputStream为啥性能高点…

系统移植——Linux 内核顶层 Makefile 详解

一、概述 Linux Kernel网上下载的版本很多NXP等有自己对应的版本。需要从网上直接下载就可以。 二、Linux内核初次编译 编译内核之前需要先在 ubuntu 上安装 lzop 库 sudo apt-get install lzop 在 Ubuntu 中 新 建 名 为 “ alientek_linux ” 的 文 件夹 , …

Reactor

文章目录 正确的理解发送double free问题解决 1.把我们的reactor进行拆分2.链接管理3.Reactor的理论 listensock只需要设置_recv_cb,而其他sock,读,写,异常 所以今天写nullptr其实就不太对,添加为空就没办法去响应事件…

【深度学习】 零基础介绍卷积神经网络(CNN)

CNN学习 零基础介绍写个CNN最简单的代码一. 概述二. 搭建CNN1. 输入层2. 卷积层3. 激活层4. 池化层5. 全连接层6. 网络搭建小结7. 损失函数8. 梯度下降9. 反向传播10. 模型评估与正则化11. 尝试搭建自己的第一个CNN 三. 经典CNN结构四. 猫狗识别项目实践1. Paddle实现版本&…

Leetcode打卡:找到稳定山的下标

执行结果:通过 题目: 3258 找到稳定山的下标 有 n 座山排成一列,每座山都有一个高度。给你一个整数数组 height ,其中 height[i] 表示第 i 座山的高度,再给你一个整数 threshold 。 对于下标不为 0 的一座山&#xf…

leetcode刷题日记03——javascript

题目3: 回文数https://leetcode.cn/problems/palindrome-number/ 给你一个整数 x ,如果 x 是一个回文整数,返回 true ;否则,返回 false 。 回文数是指正序(从左向右)和倒序(从右向…

服务器数据恢复—RAIDZ离线硬盘数超过热备盘数导致阵列崩溃的数据恢复案例

服务器存储数据恢复环境: ZFS Storage 7320存储阵列中有32块硬盘。32块硬盘分为4组,每组8块硬盘,共组建了3组RAIDZ,每组raid都配置了热备盘。 服务器存储故障: 服务器存储运行过程中突然崩溃,排除人为误操…