postgresql|数据库|利用sqlparse和psycopg2库批量按顺序执行SQL语句(psyconpg2新优化版本)

一、

旧版批量执行SQL脚本的python文件缺点,优点,以及更新内容

书接上回,postgresql|数据库开发|python的psycopg2库按指定顺序批量执行SQL文件(可离线化部署)_python sql psycopg2-CSDN博客

这个python脚本写了很久了,最近开始实际使用,发现了很多问题,问题主要集中在以下几点:

1、

SQL语句解析不太标准,遇到;分号就也当SQL语句执行,导致很多不必要的错误,并且很多时候不能有效区分多行SQL,因此,本文计划使用sqlparse库来做更准确的SQL语句解析

2、

批量的SQL文件跑完后,并没有一个比较详细的总结报告,遇到问题不太好排查,因此,本文对此做了优化,当一个目录内的SQL文件都执行完毕后,给一个相对详细的报告

确保即使某些 SQL 文件执行失败,程序也会继续尝试执行其余的文件,并最终给出一个详细的执行报告,帮助用户了解哪些文件和语句被执行以及哪些文件遇到了问题。这样,用户不仅能知道哪些文件未能成功处理,还能清楚地看到哪些文件及其内部的语句已经成功应用到了数据库中。

3、

SQL文件排序使用正则表达式处理,以改善SQL文件排序有时候不正确的问题

4、

所有的指定目录下的SQL文件都执行,但,执行失败的SQL文件仍留在原文件夹,需要将失败的SQL文件拿出来,手动处理有问题的SQL文件

5、

SQL语句执行输出太多,成功的SQL语句不应该输出到控制台,导致脚本执行情况并不是一目了然,因此,对脚本输出进行优化,以方便SQL脚本的调试

优点:

1、

以SQL文件为单位,每一个SQL文件内的SQL语句要么全部执行成功,如果中间有任何错误就回滚,对于数据库的数据安全是有一定的保障的

2、

该工具执行迅速,效率非常高,但对于锁表的情况,现在暂时没有什么想法

二、

优化后的python脚本源码

import os
import re
import sys
import json
import shutil
import psycopg2
from psycopg2 import sql, OperationalError, ProgrammingError
import sqlparse

def print_colored_text(text, color_code):
    """打印带有颜色的文本"""
    print(f"\033[{color_code}m{text}\033[0m")

# 定义颜色代码
COLORS = {
    'BLACK': 30,
    'RED': 31,
    'GREEN': 32,
    'YELLOW': 33,
    'BLUE': 34,
    'MAGENTA': 35,
    'CYAN': 36,
    'WHITE': 37
}

def find_sql_files(path):
    """查找指定路径下的所有 .sql 文件,并按文件名中第一个出现的数字升序排序返回列表"""
    sql_files = [
        os.path.join(root, file)
        for root, _, files in os.walk(path)
        for file in files
        if file.endswith('.sql')
    ]

    def extract_first_number(filename):
        """从文件名中提取第一个出现的数字序列并转换为整数,用于排序"""
        match = re.search(r'\d+', os.path.basename(filename))
        return int(match.group()) if match else float('inf')  # 如果没有匹配到数字,则排到最后

    # 使用 sorted 函数并指定 key 参数来实现升序排序,仅依据第一个数字
    sorted_sql_files = sorted(sql_files, key=extract_first_number)

    return sorted_sql_files
def execute_sql_file(db_config, sql_file_path, script_dir):
    """执行指定的 SQL 文件,并在遇到错误时移动文件"""
    conn = None
    cursor = None
    successful_statements = []  # 新增: 记录成功的SQL语句
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        with open(sql_file_path, 'r', encoding='utf-8') as file:
            parsed_statements = sqlparse.split(file.read())
            for raw_stmt in parsed_statements:
                stmt = raw_stmt.strip()
                if not stmt:  # 忽略空语句
                    continue
                
                statements = sqlparse.parse(stmt)
                for statement in statements:
                    if statement.tokens:  # 确保有非空的SQL语句
                        stmt_str = str(statement).strip(';').strip()
                        if stmt_str:  # 忽略空语句
                           # print(stmt_str)
                           # print_colored_text('<<<<<<<=======<<<<<++++++++<<<<<<上面这条语句将要执行了===----========*****', COLORS['CYAN'])
                            try:
                                cursor.execute(stmt_str)
                                successful_statements.append(stmt_str)  # 添加到成功语句列表
                                #print_colored_text('执行成功!', COLORS['GREEN'])
                            except (Exception, psycopg2.DatabaseError) as e:
                                print(f"执行失败的语句是:")
                                print(f"{stmt_str}")
                                print("=========这是第一个分隔符===================")
                                print("报错详细信息是:")
                                print(e)
                                print("===========这是第二个分隔符===================")
                                print(f"执行的文件名称是:")
                                print(f"{sql_file_path}")
                                conn.rollback()  # 回滚事务
                                move_failed_file(sql_file_path, script_dir)
                                return False, successful_statements
        conn.commit()
        print_colored_text(f"Executed SQL file successfully: {sql_file_path}", COLORS['RED'])
        return True, successful_statements
    except (OperationalError, ProgrammingError) as error:
        print(f"Database error occurred while executing SQL file {sql_file_path}: {error}")
        if conn:
            conn.rollback()
        return False, successful_statements
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

def move_failed_file(src, dst_dir):
    """将失败的SQL文件移动到指定的目标目录"""
    try:
        os.makedirs(dst_dir, exist_ok=True)
        dst = os.path.join(dst_dir, os.path.basename(src))
        print(f"Moving failed SQL file to {dst}")
        shutil.move(src, dst)
    except Exception as e:
        print(f"Failed to move the file {src}: {e}")

def main():
    if len(sys.argv) != 3:
        print("Usage: python script.py <user> <path_to_search>")
        sys.exit(1)

    user = sys.argv[1]
    search_path = sys.argv[2]
    script_dir = os.path.dirname(os.path.abspath(__file__))  # 获取脚本所在目录

    try:
        with open('test.json', 'r', encoding='utf-8') as f:
            params = json.load(f)
            db_config = params.get('db_config', {})
            required_keys = ['dbname', 'user', 'password', 'host', 'port']
            if not all(key in db_config for key in required_keys):
                print("Error: Missing required database configuration in test.json")
                sys.exit(1)
            db_config['user'] = user
    except FileNotFoundError:
        print("Error: test.json not found")
        sys.exit(1)
    except json.JSONDecodeError:
        print("Error: test.json is not a valid JSON file")
        sys.exit(1)

    sql_files = list(find_sql_files(search_path))
    print(f"Found {len(sql_files)} SQL files: {sql_files}")
    print_colored_text(f'|||||二十秒后开始执行{sql_files},以打印出来的顺序依次执行SQL文件|||||||||||', COLORS['MAGENTA'])

    failed_files = []
    executed_statements = []
    successful_files = []  # 新增: 记录成功的SQL文件

    if sql_files:
        for sql_file in sql_files:
            print_colored_text(f'这个文件将要执行: {sql_file}\n\n', COLORS['YELLOW'])
            success, statements = execute_sql_file(db_config, sql_file, script_dir)
            if not success:
                print(f"Failed to execute SQL file: {sql_file}. Continuing with the next file...")
                failed_files.append(sql_file)
            else:
                executed_statements.extend(statements)
                successful_files.append(sql_file)  # 添加到成功文件列表
        # 打印总结信息
        if failed_files:
            print_colored_text("\nThe following SQL files failed to execute:", COLORS['RED'])
            for failed_file in failed_files:
                print(f"- {failed_file}")
        else:
            print_colored_text("\nAll SQL files executed successfully.", COLORS['GREEN'])
#        print_colored_text("\nThe following SQL statements were executed successfully:", COLORS['GREEN'])
#        for stmt in executed_statements:
#            print(f"- {stmt}")
        print_colored_text("\nThe following SQL files were executed successfully:", COLORS['GREEN'])
        for successful_file in successful_files:
            print(f"- {successful_file}")

    else:
        print("No SQL files found in the specified path.")

if __name__ == "__main__":
    main()

该python脚本依赖于python3环境,libpq.so,pyscopg库,sqlparse库,这些库什么的都可以离线安装,相关文件都已放到百度网盘内了

通过网盘分享的文件:批量执行SQL语句项目
链接: https://pan.baidu.com/s/1zCAL78hp2-92NdjIHCjM0w?pwd=gkw1 提取码: gkw1 

 python3.zip 文件里都是rpm包,适用于centos7,解压后,怎么安装就不在这多说了
其中,里面的sqlpaser.tar.gz 需要解压到/usr/local/lib/python3.6目录下,如果没有python3.6目录,建立即可

psycopg2.gz这个文件里的内容解压到/usr/lib64/python3.6目录下

解压完毕后,需要执行python3 -V 命令,以激活sqlparse库

最终目录如下所示即可:

[root@centos7 ~]# ls /usr/lib64/python3.6/site-packages/
psycopg2  psycopg2-2.8.6-py3.6.egg-info  __pycache__  README.txt
[root@centos7 ~]# ls /usr/local/lib/python3.6/site-packages/
sqlparse  sqlparse-0.4.4.dist-info

三、

重点代码解析

1、

代码编程结构

本次代码编写仍然是延续上个版本,主要功能封装为方法,在main方法统一集中调用

主要是一个main主方法+4个功能方法,分别是控制台颜色渲染方法,SQL文件搜寻方法,SQL文件调用psyconpg2,sqlparse库逐行执行SQL语句方法,移动执行失败SQL文件到脚本当前目录方法,整体调用main方法

2、

控制台颜色渲染方法

该方法主要是使用了字典,形参调用形式,例如该方法的调用:

print_colored_text(f"Executed SQL file successfully: {sql_file_path}", COLORS['RED'])

没什么好说的,主要就是字典的应用是难点 

3、

SQL文件搜寻方法

def find_sql_files(path):
    """查找指定路径下的所有 .sql 文件,并按文件名中第一个出现的数字升序排序返回列表"""
    sql_files = [
        os.path.join(root, file)
        for root, _, files in os.walk(path)
        for file in files
        if file.endswith('.sql')
    ]

    def extract_first_number(filename):
        """从文件名中提取第一个出现的数字序列并转换为整数,用于排序"""
        match = re.search(r'\d+', os.path.basename(filename))
        return int(match.group()) if match else float('inf')  # 如果没有匹配到数字,则排到最后

    # 使用 sorted 函数并指定 key 参数来实现升序排序,仅依据第一个数字
    sorted_sql_files = sorted(sql_files, key=extract_first_number)

    return sorted_sql_files

这一段代码难点主要在方法嵌套,首先迭代寻找指定的路径下所有的SQL文件,然后通过子方法,调用正则表达式重新排序,主要是以数字开始的文件

这里需要注意,搜寻到的SQL文件是返回一个列表

4、

SQL文件调用psyconpg2,sqlparse库逐行执行SQL语句方法

该方法主要是业务逻辑实现,主要逻辑是以单个SQL文件为整体事务,如果某个SQL文件有错误,事务回滚,并调用移动执行失败SQL文件到脚本当前目录方法;如果该SQL文件顺利执行完成,则提交事务

这样做的目的是保护数据库,避免不必要的数据混乱

无论如何,当一个SQL文件执行完毕,都会将数据库连接关闭,游标关闭

编写的时候考虑了一下,还是需要捕获错误并将详细错误信息打印,并收集失败的SQL文件和成功的SQL文件,在main方法内将要调用这两个列表,list

5、

移动执行失败SQL文件到脚本当前目录方法

该方法没什么特别的,主要是调用此方法的形式,在main方法内,这里比较绕,当时编写的时候考虑了很久

6、

main方法

整体逻辑封装都在此方法内

    user = sys.argv[1]
    search_path = sys.argv[2]
    script_dir = os.path.dirname(os.path.abspath(__file__))  # 获取脚本所在目录

__file__ 是一个非常有用的内置变量,它帮助你在编写脚本时能够动态地确定文件的位置,而不需要硬编码路径。这对于提高代码的可移植性和维护性非常有帮助。如果你在开发过程中遇到需要根据脚本位置来定位其他文件的情况,说实话这个变量我是差点给忘记掉的

那么,为什么要有user这个变量呢?考虑到很多时候并不是有高权限的数据库账号

其它的就没什么好说的了

四、

数据库信息文件json

有需要实践的同学按实际情况填写此json文件

{
    "db_config": {
        "dbname": "postgres",
        "user": "postgres",
        "password": "xxxxx",
        "host": "192.168.xxx.xx",
        "port": "1543"
    }
}

四、

执行方式和执行结果示例

[root@centos7 ~]# python3 test7.py postgres ./
Found 2 SQL files: ['./12323.sql', './33333.sql']
|||||二十秒后开始执行['./12323.sql', './33333.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./12323.sql
Moving failed SQL file to /root/12323.sql
Failed to execute SQL file: ./12323.sql. Continuing with the next file...
这个文件将要执行: ./33333.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./33333.sql
Moving failed SQL file to /root/33333.sql
Failed to execute SQL file: ./33333.sql. Continuing with the next file...

The following SQL files failed to execute:
- ./12323.sql
- ./33333.sql

The following SQL files were executed successfully:

全失败的:

有失败有成功的情况:

[root@centos7 ~]# python3 test7.py postgres ./
Found 2 SQL files: ['./12323.sql', './33333.sql']
|||||二十秒后开始执行['./12323.sql', './33333.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


Executed SQL file successfully: ./12323.sql
这个文件将要执行: ./33333.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./33333.sql
Moving failed SQL file to /root/33333.sql
Failed to execute SQL file: ./33333.sql. Continuing with the next file...

The following SQL files failed to execute:
- ./33333.sql

The following SQL files were executed successfully:
- ./12323.sql

全部成功的情况:

[root@centos7 ~]# python3 test7.py postgres ./
Found 1 SQL files: ['./12323.sql']
|||||二十秒后开始执行['./12323.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


Executed SQL file successfully: ./12323.sql

All SQL files executed successfully.

The following SQL files were executed successfully:
- ./12323.sql

 

🆗,这个SQL文件批量执行的小工具就介绍到这了,欢迎各位大拿指正错误!!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/953143.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Node.js——http 模块(二)

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

基于element UI el-dropdown打造表格操作列的“更多⌵”上下文关联菜单

<template><div :class"$options.name"><el-table :data"tableData"><el-table-column type"index" label"序号" width"60" /><!-- 主要列 BEGIN---------------------------------------- --&g…

javascrip基础语法

为什么学习 JavaScript? JavaScript 是 web 开发人员必须学习的 3 门语言中的一门&#xff1a; HTML 定义了网页的内容CSS 描述了网页的布局JavaScript 控制了网页的行为 1. JavaScript 输出 1.1 console.log()&#xff1a;用于将信息输出到浏览器控制台&#xff0c;例如con…

大语言模型预训练、微调、RLHF

转发&#xff0c;如有侵权&#xff0c;请联系删除&#xff1a; 1.【LLM】3&#xff1a;从零开始训练大语言模型&#xff08;预训练、微调、RLHF&#xff09; 2.老婆饼里没有老婆&#xff0c;RLHF里也没有真正的RL 3.【大模型微调】一文掌握7种大模型微调的方法 4.基于 Qwen2.…

django基于Python的校园个人闲置物品换购平台

Django 基于 Python 的校园个人闲置物品换购平台 一、平台概述 Django 基于 Python 的校园个人闲置物品换购平台是专为校园师生打造的一个便捷、环保且充满活力的线上交易场所。它借助 Django 这一强大的 Python Web 开发框架&#xff0c;整合了校园内丰富的闲置物品资源&…

abap安装cl_json类

文章来自 SAP根据源码导入/ui2/cl_json类 - pikeduo - 博客园 新建一个se38程序&#xff0c;把源码放到里&#xff0c;源码如下 *----------------------------------------------------------------------* * CLASS zcl_json DEFINITION *----------------------------…

[OPEN SQL] ORDER BY排序数据

本次操作使用的数据库表为SFLIGHT&#xff0c;其字段内容如下所示 航班(SFLIGHT) 该数据库表中的部分值如下所示 OPEN SQL中的ORDER BY语句用于对数据库表中的数据进行排序 在查询数据的时候使用ORDER BY语句&#xff0c;则查询出来的结果会按照ORDER BY指定的字段进行排序 排序…

STM32F103ZET6战舰版单片机开发板PCB文件 电路原理图

资料下载地址&#xff1a;STM32战舰版单片机开发板PCB文件 电路原理图 1、原理图 2、PCB 3、板子介绍 一、核心芯片与性能 核心芯片&#xff1a;STM32F103ZET6&#xff0c;这是一款基于ARM Cortex-M3内核的高性能单片机。处理器频率&#xff1a;高达72MHz&#xff0c;确保了…

An FPGA-based SoC System——RISC-V On PYNQ项目复现

本文参考&#xff1a; &#x1f449; 1️⃣ 原始工程 &#x1f449; 2️⃣ 原始工程复现教程 &#x1f449; 3️⃣ RISCV工具链安装教程 1.准备工作 &#x1f447;下面以LOCATION代表本地源存储库的安装目录&#xff0c;以home/xilinx代表在PYNQ-Z2开发板上的目录 ❗ 下载Vivad…

GAN的应用

5、GAN的应用 ​ GANs是一个强大的生成模型&#xff0c;它可以使用随机向量生成逼真的样本。我们既不需要知道明确的真实数据分布&#xff0c;也不需要任何数学假设。这些优点使得GANs被广泛应用于图像处理、计算机视觉、序列数据等领域。上图是基于GANs的实际应用场景对不同G…

centos9设置静态ip

CentOS 9 默认使用 NetworkManager 管理网络&#xff0c;而nmcli是 NetworkManager 命令行接口的缩写&#xff0c;是一个用来进行网络配置、管理网络连接的命令工具&#xff0c;可以简化网络设置&#xff0c;尤其是在无头&#xff08;没有图形界面&#xff09;环境下。 1、 cd…

Idea日志乱码

问题描述 前提&#xff1a;本人使用windows Idea运行sh文件&#xff0c;指定了utf-8编码&#xff0c;但是运行过程中还是存在中文乱码 Idea的相关配置都已经调整 字体调整为雅黑 文件编码均调整为UTF-8 调整Idea配置文件 但是还是存在乱码&#xff0c;既然Idea相关配置已经…

R4-LSTM学习笔记

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 LSTM-火灾温度预测 导入数据数据可视化设置X、y构建模型调用模型个人总结LSTM 的基本结构细胞状态&#xff08;Cell State&#xff09;LSTM 的优点 导入数据 i…

uniapp实现H5页面内容居中与两边留白,打造类似微信公众号阅读体验

在 UniApp 中&#xff0c;由于需要兼容多端应用&#xff0c;我们通常使用 rpx 作为尺寸单位。然而&#xff0c;在某些情况下&#xff0c;如需要实现内容居中且两边留白时&#xff0c;直接使用 rpx 可能会带来一些限制。这时&#xff0c;我们可以考虑使用 px 或 rem 等单位&…

网工_网络体系结构

2024.01.09&#xff1a;网络工程学习笔记&#xff08;网工老姜&#xff09; 第1节 网络体系结构 1.1 计算机一切皆011.2 网络协议1.3 协议的分层模型1.4 主机1向主机2发送数据过程1.5 本章小结 1.1 计算机一切皆01 在计算机内部&#xff0c;所有的数据最终都是以01的方式存在的…

CI/CD 流水线

CI/CD 流水线 CI 与 CD 的边界CI 持续集成CD&#xff08;持续交付/持续部署&#xff09;自动化流程示例&#xff1a; Jenkins 引入到 CI/CD 流程在本地或服务器上安装 Jenkins。配置 Jenkins 环境流程设计CI 阶段&#xff1a;Jenkins 流水线实现CD 阶段&#xff1a;Jenkins 流水…

编程题-二分查找

题目&#xff1a; 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1 解法一&#xff08;循环遍历查找&#xff09;&#xff…

OOM排查思路

K8S 容器的云原生生态&#xff0c;改变了服务的交付方式&#xff0c;自愈能力和自动扩缩等功能简直不要太好用。 有好的地方咱要夸&#xff0c;不好的地方咱也要说&#xff0c;真正的业务是部署于容器内部&#xff0c;而容器之外&#xff0c;又有一逻辑层 Pod 。 对于容器和…

Github Copilot学习笔记

&#xff08;一&#xff09;Prompt Engineering 利用AI工具生成prompt设计好的prompt结构使用MarkDown语法&#xff0c;按Role, Skills, Constrains, Background, Requirements和Demo这几个维度描述需求。然后收输入提示词&#xff1a;作为 [Role], 拥有 [Skills], 严格遵守 […

在 Rider 中使用 C# 创建 Windows 窗体应用 Winforms

1&#xff0c;创建项目 new solution 创建一个解决方案 2&#xff0c;打开设计器 在 Form1.cs 上右键打开设计器 认识一下 Rider 的界面 参考微软官方的例子&#xff0c;添加如下属性&#xff1a;注&#xff1a;这里 Listbox 的大小设置成 120, 94 失败&#xff0c;默认的是 12…