编写一个Python脚本,每天凌晨3点开始备份
脚本具有以下特点
- 不需要安装mysql-client,并且Windows Linux都可以使用
- 支持多个数据库连接的备份
- 每个数据库支持多个表备份
- 日志保存下来,方便第二天早上查看备份结果
首先安装需要的库
pip3 install pymysql loguru
支持多个数据库
可选开始备份时间
备份速率调整
备份有大量数据传输,虽然是在夜间,也要控制速率,防止对服务端压力过大,1000条为limit的查询,可以控制下行带宽在5Mbps以下,不过上百万条数据的表,可能要备份很久。可以凌晨1点就开始备份,或者把1000条延时从0.2调到0.1或更短,或者调整每次查询的limit到5000。
开始备份时的控制台截图:
完整代码(需填入自己的数据库地址、端口、密码)
# author @xuehu96 2024-9-22
import os
import time
from datetime import datetime
import pymysql
from loguru import logger
# author @xuehu96 2024-9-22
def backup_database(host, port, user, password, databases):
logger.info(f"Linking {host} ...")
for db_name in databases:
try:
# 连接到数据库
conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db_name,
charset='utf8mb4')
cursor = conn.cursor()
logger.info(f"Backuping {db_name} ...")
# 创建备份文件夹
backup_folder = f"./backups/{db_name}"
if not os.path.exists(backup_folder):
os.makedirs(backup_folder)
# 获取当前日期
current_date = datetime.now().strftime("%Y-%m-%d")
# 备份数据库
backup_file = f"{backup_folder}/{current_date}.sql"
with open(backup_file, "w", encoding='utf-8') as f:
# 获取所有表名
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
for table in tables:
table_name = table[0]
logger.info(f"Backing table {table_name} ...")
# 导出表结构
cursor.execute(f"SHOW CREATE TABLE {table_name}")
create_table_sql = cursor.fetchone()[1]
f.write(f"{create_table_sql};\n\n")
# 导出表数据
limit = 1000 # 每次查询1000条记录
offset = 0
while True:
cursor.execute(f"SELECT * FROM {table_name} LIMIT {offset}, {limit}")
rows = cursor.fetchall()
if not rows:
break # 当没有更多数据时退出循环
for row in rows:
placeholders = []
for value in row:
if value is None:
placeholders.append('NULL')
elif isinstance(value, (int, float)):
placeholders.append(f"{value}")
else:
placeholders.append(f"'{value}'")
placeholders = ', '.join(placeholders)
query = f"INSERT INTO {table_name} VALUES ({placeholders});"
f.write(f"{query}\n")
f.write("\n")
offset += limit
time.sleep(0.2) # 每处理一批数据后暂停0.2秒
logger.info(f"Backup table {table_name} successfully, inserted {len(rows)} rows.")
# 关闭连接
cursor.close()
conn.close()
logger.info(f"Backup of {db_name} completed successfully.")
except Exception as e:
logger.error(f"Backup of {db_name} failed: {e}")
finally:
time.sleep(1)
# author @xuehu96 2024-9-22
def main():
logger.add("backup.log")
while True:
now = datetime.now()
if now.hour == 3 and now.minute == 0:
logger.info("Starting backup process.")
# 第一个数据库连接信息
host1 = "192.168.127.12" # 填写数据库IP或域名
port1 = 2049
user1 = "root"
password1 = "XXXXXXXXXXXXXX"
databases1 = ['DB1', 'DB2', 'test', 'test2', 'V5', 'V6', 'V7'] # 备份的数据库列表
# 备份第一个数据库连接
backup_database(host1, port1, user1, password1, databases1)
# 第二个数据库连接信息
host2 = "sh-XXXX.com"
port2 = 2599
user2 = "root"
password2 = "XXXXXXXXXXXXXX"
databases2 = ['ERP', 'DEVICE', 'GATEWAY']
# 备份第二个数据库连接
backup_database(host2, port2, user2, password2, databases2)
logger.info("Backup process completed.")
break
time.sleep(60)
if __name__ == "__main__":
main()
# author @xuehu96 2024-9-22