pymysql是python操作mysql的标准库,可以通过pip install快速导入pymysql包操作数据库
使用pymysql操作mysql
简单demo
import pymysql
connect = pymysql.connect(
host="localhost",
port=3306,
user="root",
password="root",
database="my_database",
# charset="utf8mb4"
)
cursor = connect.cursor()
# 查询语句1
sql = "select * from user where name = %(name)s"
ret = cursor.execute(sql, {"name": "ls"})
# 查询语句2
sql = "select * from user where name = %s"
ret = cursor.execute(sql, "ls")
print(ret)
result = cursor.fetchall()
print("result", result)
cursor.close()
connect.close()
自定义SqlHelper
import pymysql
class MySQLClient(object):
def __init__(self, **kwargs):
self.conn = pymysql.connect(
**kwargs
)
self.cursor = self.conn.cursor()
def query(self, sql, *args):
try:
rowcount = self.cursor.execute(sql, *args)
return rowcount
except Exception as e:
raise e
def update(self, sql, *args):
self.cursor.execute(sql, *args)
self.conn.commit()
def insert(self, sql, *args):
self.cursor.execute(sql, *args)
self.conn.commit()
def fetch_one(self, sql, *args):
self.query(sql, *args)
result = self.cursor.fetchone()
return result
def fetch_all(self, sql, *args):
self.query(sql, *args)
result = self.cursor.fetchone()
return result
def close(self):
self.cursor.close()
self.conn.close()
config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "root",
"database": "my_database",
}
mysql_client = MySQLClient(**config)
sql = "select * from user where name=%s"
ret = mysql_client.fetch_one(sql, "ls")
print(ret)
# mysql_client.close()
借助DButils创建数据库连接池
DButils模块可以通过创建数据库连接池,提升数据库操作性能;
实现思路:
- 定义SqlHelper类
- 通过
__init__
方法定义pool=PoolDB(**kwargs),_local=threading.local()- 定义
__enter__
获取connection与cursor和__exit__
关闭connection与cursor,可支持with 上下文操作- 为了保证每次获取的connection与cursor不会将之前的覆盖掉,引入threading.local进行保存;self._local = {thread_id: {“stack”: [(connection, cursor)]}}
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from dbutils.pooled_db import PooledDB
from threading import local
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
# maxcached=5, # 链接池中最多闲置的链接,0和None不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表
host='localhost',
port=3306,
user='root',
password='root',
database='my_database',
charset='utf8'
)
self._local = local()
def open(self):
connection = self.pool.connection()
cursor = connection.cursor()
return connection, cursor
def close(self, cursor, conn):
cursor.close()
conn.close()
def __enter__(self):
conn, cursor = self.open()
rv = getattr(self._local, "stack", None)
if not rv:
self._local.stack = [(conn, cursor)]
else:
self._local.stack.append((conn, cursor))
return cursor
def __exit__(self, exc_type, exc_val, exc_tb):
rv = getattr(self._local, "stack", None)
if not rv:
# del self._local.stack
return
elif len(rv) == 1:
conn, cursor = rv[-1]
# del self._local.stack
return
else:
conn, cursor = rv.pop()
cursor.close()
conn.close()
def fetchone(self, sql, *args):
conn, cursor = self.open(self)
try:
rowcount = cursor.execute(sql, *args)
ret = cursor.fetchone()
return ret
except Exception as e:
raise
def fetchall(self, sql, *args):
conn, cursor = self.open(self)
try:
rowcount = cursor.execute(sql, *args)
ret = cursor.fetchall()
return ret
except Exception as e:
raise
db = SqlHelper()
sql = "select * from user"
with db as c1:
ret = c1.execute(sql)
print(ret)
with db as c2:
ret = c2.execute(sql)
print(ret)
使用DButils的另一种写法
使用这种写法,每次都实例化SqlHelper,保证每次获取的connection和cursor不被覆盖
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
1. 定义全局变量POOL=pooledDB(**kwargs)
2. 每次用到db就实例化一次
"""
import pymysql
from dbutils.pooled_db import PooledDB
from threading import local
pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=0, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
# maxcached=5, # 链接池中最多闲置的链接,0和None不限制
blocking=False, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表
host='localhost',
port=3306,
user='root',
password='root',
database='my_database',
charset='utf8'
)
class SqlHelper(object):
def __init__(self):
self.conn = None
self.cursor = None
def open(self):
self.connection = pool.connection()
self.cursor = self.connection.cursor()
return self.connection, self.cursor
def close(self):
self.cursor.close()
self.conn.close()
def __enter__(self):
self.conn, self.cursor = self.open()
return self.cursor
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
db = SqlHelper()
sql = "select * from user"
with db as c1:
ret = c1.execute(sql)
print("c1.cursor: ", db.cursor)
print(ret)
with db as c2:
ret = c2.execute(sql)
print("c2.cursor: ", db.cursor) # 一个实例对象是可以多次调用enter方法的,但db.cursor发生了改变,即上一次的连接丢了
print(ret)
print(type(c1), type(c2))
print(c1 is c2) # false
print("c1.cursor: ", db.cursor) # c2.cursor将c1.cursor覆盖了