python 使用进程池并发执行 SQL 语句

        这段代码使用了 Python 的 multiprocessing 模块来实现真正的并行处理,绕过 Python 的全局解释器锁(GIL)限制,从而在多核 CPU 上并发执行多个 SQL 语句。

from pyhive import hive
import multiprocessing

# 建立连接
conn = hive.Connection(host="localhost", port=10000, username="your_username", password="your_password")

# SQL 语句列表
sql_statements = [
    "INSERT INTO table1 VALUES (1, 'value1')",
    "INSERT INTO table1 VALUES (2, 'value2')",
    "INSERT INTO table1 VALUES (3, 'value3')"
]

# 定义执行函数
def execute_sql(sql):
    with conn.cursor() as cursor:
        cursor.execute(sql)

# 确保多进程代码只在主进程中执行
if __name__ == '__main__':

    # 使用进程池并发执行
    with multiprocessing.Pool() as pool:
        pool.map(execute_sql, sql_statements)

    # 关闭连接
    conn.close()

1. 导入模块

from pyhive import hive
import multiprocessing
  • pyhive: 这是用于连接和操作 Hive 数据库的 Python 库。hive.Connection 用于建立与 Hive 数据库的连接。
  • multiprocessing: 这是 Python 的标准库,用于创建和管理进程。通过 multiprocessing,我们可以绕过 Python 的 GIL(全局解释器锁)限制,实现真正的并行处理。

2. 建立数据库连接

conn = hive.Connection(host="localhost", port=10000, username="your_username", password="your_password")
  • 这里我们使用 hive.Connection 建立一个到 Hive 数据库的连接。
  • 参数
    • host: HiveServer2 的主机地址,通常是 localhost 或 HiveServer2 运行的服务器 IP。
    • port: HiveServer2 的端口号,默认是 10000
    • username: 连接 Hive 使用的用户名。
    • password: 连接 Hive 使用的密码。

这个连接对象 conn 将在后续的代码中用于创建游标(cursor),并通过游标执行 SQL 语句。

3. 定义 SQL 语句列表

sql_statements = [
    "INSERT INTO table1 VALUES (1, 'value1')",
    "INSERT INTO table1 VALUES (2, 'value2')",
    "INSERT INTO table1 VALUES (3, 'value3')"
]
  • 这里定义了一个包含多个 SQL 语句的列表 sql_statements。每个语句都是一个插入操作,将数据插入到 Hive 表 table1 中。
  • 你可以根据实际需求修改这些 SQL 语句。

4. 定义执行函数

def execute_sql(sql):
    with conn.cursor() as cursor:
        cursor.execute(sql)
  • execute_sql 函数是用于执行单个 SQL 语句的函数。
  • with conn.cursor() as cursor:为当前数据库连接创建一个游标对象 cursor,这个游标用于执行 SQL 语句。
    • cursor.execute(sql):执行传入的 SQL 语句。
  • 这个函数会被进程池中的每个进程调用,每个进程都会独立执行一个 SQL 语句。

5. 使用进程池并发执行

with multiprocessing.Pool() as pool:
    pool.map(execute_sql, sql_statements)
  • multiprocessing.Pool():创建一个进程池。进程池可以管理一组工作进程,并将任务分配给这些进程。
    • 默认情况下,Pool() 会根据系统的 CPU 核心数创建相应数量的工作进程。
    • 你可以通过参数指定池中的进程数量,例如 Pool(4) 表示创建 4 个工作进程。
  • pool.map(execute_sql, sql_statements)
    • pool.map 方法会将 execute_sql 函数应用到 sql_statements 列表中的每个元素上。
    • pool.map 方法会自动将 SQL 语句列表分配给进程池中的工作进程,每个进程独立执行一个 SQL 语句。
    • 这个过程是并行的,多个进程可以同时执行不同的 SQL 语句,从而提高执行效率。

6. 关闭数据库连接

conn.close()
  • 在所有 SQL 语句执行完毕后,我们关闭数据库连接,释放资源。

进程池的工作原理

multiprocessing.Pool 提供了一种方便的方式来并行化执行函数。其工作原理如下:

  1. 创建进程池:当你创建一个 Pool 对象时,会启动多个工作进程(数量可以指定,或默认根据 CPU 核心数决定)。
  2. 任务分配:当你调用 pool.map 时,进程池会将任务(在这里是 execute_sql 函数)分配给空闲的工作进程。
  3. 并行执行:每个工作进程独立执行分配给它的任务,互不干扰。
  4. 结果收集pool.map 会收集所有工作进程的执行结果,并按照原始任务列表的顺序返回结果。

为什么使用进程池而不是线程池?

  1. GIL 限制:Python 的全局解释器锁(GIL)限制了多线程的并行执行能力,尤其是在 CPU 密集型任务中,多线程并不能充分利用多核 CPU。
  2. 进程并行multiprocessing 模块通过创建多个进程来绕过 GIL 限制,每个进程都有自己的 Python 解释器和内存空间,因此可以实现真正的并行执行。
  3. 适用场景
    • 线程池:适合 I/O 密集型任务(例如,等待数据库查询结果)。
    • 进程池:适合 CPU 密集型任务(例如,并行计算、数据处理等),或者你需要绕过 GIL 限制时。

注意事项

  1. 数据库连接:在多进程环境中,每个进程都有自己的内存空间,因此每个进程需要独立的数据库连接。在上述代码中,每个进程都通过 conn.cursor() 创建了自己的游标。
  2. 进程开销:创建和销毁进程有一定的开销,因此对于非常短小的任务,进程池可能不会显著提高性能。在这种情况下,可以考虑调整进程池的大小或使用其他优化手段。
  3. 连接池:如果你的程序需要频繁访问数据库,可以考虑使用数据库连接池来复用数据库连接,减少连接建立和关闭的开销。

总结

  • 进程池:通过 multiprocessing.Pool 实现,可以绕过 Python 的 GIL 限制,实现真正的并行处理。
  • 适用场景:适合 CPU 密集型任务或需要并行执行多个独立任务的场景。
  • 代码结构
    • 建立数据库连接。
    • 定义 SQL 语句列表。
    • 定义执行函数 execute_sql
    • 使用进程池并发执行 SQL 语句。
    • 关闭数据库连接。

通过这种方式,你可以充分利用多核 CPU 的优势,并发执行多个 SQL 语句,从而提高程序的执行效率。

解决多进程报错

你遇到的错误是 RuntimeError,这是因为你在使用 multiprocessing 时没有正确地保护代码的入口点。具体来说,在 Windows 系统上(以及其他非 fork 的启动方式),你必须将多进程相关的代码放在 if __name__ == '__main__': 语句块中,以避免子进程在启动时重新导入主模块并执行不必要的代码。

错误原因:

在 Windows 系统中,Python 的 multiprocessing 模块使用 spawn 启动子进程,这意味着子进程会重新导入当前脚本。如果不加以保护,子进程会再次执行主模块中的代码,导致递归创建进程并抛出错误。

解决方案:

你需要将多进程相关的代码放在 if __name__ == '__main__': 语句块中,确保只有主进程会执行这些代码,而子进程不会。

修改后的代码:

 

python

import multiprocessing

data = [
    "1",
    "2",
    "3"
]

# 定义执行函数
def print_str(data):
    print(data)

# 确保多进程代码只在主进程中执行
if __name__ == '__main__':
    # 使用进程池并发执行
    with multiprocessing.Pool() as pool:
        pool.map(print_str, data)

解释:

  • if __name__ == '__main__': 确保了只有在直接运行当前脚本时,才会执行其中的多进程代码。子进程不会执行这个代码块,从而避免了递归创建进程的问题。
  • 在 Windows 系统上,这是使用 multiprocessing 时必须遵循的惯用写法。

其他注意事项:

  • 如果你打算将脚本打包成可执行文件(例如使用 pyinstaller),你还需要调用 multiprocessing.freeze_support(),不过在大多数脚本运行的情况下,这个调用不是必须的。

例如:

 

python

if __name__ == '__main__':
    multiprocessing.freeze_support()  # 如果需要打包成可执行文件,可以加上这行
    with multiprocessing.Pool() as pool:
        pool.map(print_str, data)

参考文档:

你可以参考 Python 官方文档中关于 multiprocessing 的部分,了解更多关于安全导入主模块的信息:

  • multiprocessing — Process-based parallelism — Python 3.13.0 documentation

执行sql 简单示例

import multiprocessing

data = [  ]

# 定义执行函数
def print_str(data):
    print(data)

# 确保多进程代码只在主进程中执行
if __name__ == '__main__':

    data2 = [
        "1",
        "2",
        "3"
    ]

    for i in data2:
        data_str = f"""
        inset into {i}
        """
        data.append(data_str)


    # 使用进程池并发执行
    with multiprocessing.Pool() as pool:
        pool.map(print_str, data)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/904571.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Ajax:请求 响应

Ajax:请求 & 响应 AjaxjQuery的Ajax接口$.get$.post$.ajax PostMan 接口测试getpost Ajax 浏览器中看到的数据,并不是保存在浏览器本地的,而是实时向服务器进行请求的。当服务器接收到请求,就会发回一个响应,此时浏…

ALIGN_ Tuning Multi-mode Token-level Prompt Alignment across Modalities

文章汇总 当前的问题 目前的工作集中于单模提示发现,即一种模态只有一个提示,这可能不足以代表一个类[17]。这个问题在多模态提示学习中更为严重,因为视觉和文本概念及其对齐都需要推断。此外,仅用全局特征来表示图像和标记是不…

Linux-计算机网络-epoll的LT,ET模式

一.epoll的LT和ET模式介绍 epol 对文件描述符有两种操作模式:LT(Level Trigger,电平触发)模式和 ET(EdgeTrigger,边沿触发)模式。LT模式是默认的工作模式。当往epol 内核事件表中注册一个文件描述符上的 EPOLLET 事件时,epoll将以高效的 ET …

新160个crackme - 087-d4ph1-crackme2

运行分析 需破解Name和Serial PE分析 ASM程序,32位,无壳 静态分析&动态调试 ida找到关键字符串 INT_PTR __stdcall DialogFunc(HWND hDlg, UINT a2, WPARAM a3, LPARAM a4) {HICON IconA; // eaxint v5; // ediunsigned int v6; // ebxchar v7; // a…

leetcode 303.区域和检索-数组不可变

1.题目要求: 2.题目代码: class NumArray { public:vector<int> array;NumArray(vector<int>& nums) {array nums;}int sumRange(int left, int right) {int sum 0;while(left < right){sum array[left];left;}return sum;} };/*** Your NumArray obje…

【SVM手把手推导】对偶问题应用之支持向量机SVM(Hard Margin)

1. 对偶问题应用之支持向量机SVM 1.1 SVM 设给定数据集&#xff1a; { ( s i , y i ) : y i ∈ { 1 , − 1 } , i 1 , ⋯ , m } \{(\mathbf{s}^i,y^i):y^i\in\{1,-1\},i1,\cdots,m\} {(si,yi):yi∈{1,−1},i1,⋯,m}&#xff0c;我们想要找到一个决策超平面&#xff08;decis…

大数据技术的前景如何?

在当今数字化迅猛发展的时代&#xff0c;大数据技术的前景显得尤为广阔。随着数据量的激增&#xff0c;如何有效利用这些数据成为了各行各业关注的焦点。未来五年&#xff0c;大数据技术的发展趋势可以从市场规模、技术融合、行业应用和政策支持等多个方面进行深入分析。 1. 市…

【STM32】单片机ADC原理详解及应用编程

本篇文章主要详细讲述单片机的ADC原理和编程应用&#xff0c;希望我的分享对你有所帮助&#xff01; 目录 一、STM32ADC概述 1、ADC&#xff08;Analog-to-Digital Converter&#xff0c;模数转换器&#xff09; 2、STM32工作原理 二、STM32ADC编程实战 &#xff08;一&am…

推荐一款全新的视频编辑软件:CapCut剪映国际版

CapCut是一款全新的视频编辑应用程序&#xff0c;提供了各种功能和工具&#xff0c;让用户可以轻松地创建专业级别的视频。这款应用程序非常易于使用&#xff0c;功能强大&#xff0c;可供任何水平的用户使用。 CapCut包含了各种视频编辑工具&#xff0c;可以添加各种特效、滤镜…

提升用户体验优化全攻略

内容概要 用户体验&#xff08;UX&#xff09;在当今数字化时代扮演着举足轻重的角色。良好的用户体验不仅决定了用户对产品的满意度&#xff0c;还有助于提高转化率与客户忠诚度。因此&#xff0c;深入理解用户体验的重要性是每一个设计师和产品经理必须掌握的基础。在这一部…

关于springboot跨域与拦截器的问题

今天写代码的时候遇到的一个问题&#xff0c;在添加自己设置的token拦截器之后&#xff0c;报错&#xff1a; “ERROR Network Error AxiosError: Network Error at XMLHttpRequest.handleError (webpack-internal:///./node_modules/axios/lib/adapters/xhr.js:112:14) at Axi…

SDK和API

什么是SDK&#xff1f; SDK就像是一个超级工具箱&#xff0c;里面装满了各种工具、说明书和配件&#xff0c;帮你快速、方便地完成一项工作。比如&#xff0c;你要搭建一个乐高模型&#xff0c;SDK就是那个包含了所有乐高积木、拼装图纸、甚至一些特殊工具的大盒子。 什么是A…

【错误描述:“L2TP连接尝试失败,因为安全层在初始化与远程计算机的协商时遇到了一个处理错误”】

解决办法&#xff1a; 一、检查并更改网络协议 &#xff08;如果网络协议更改完成&#xff0c;还是链接失败&#xff0c;直接看 第二点&#xff09; 1、打开网络和Internet 设置 2、找到更改适配器选项 3、先择你要链接VPN&#xff0c;右键选择属性&#xff0c;之后选择安…

【网络】2.TCP通信

TCP通信 server1. 创建套接字2. 填充套接字3. 将套接字和监听文件描述符绑定4. 将_listensock设置为监听状态5. 启动服务器accept()函数read()函数 Server启动client1. 创建套接字2. 填充套接字connect()函数 3. 通过文件描述符向服务端发送信息 client启动 server server的启…

【ArcGISPro】宣布推出适用于 ArcGIS 的 AI 助手

此次分享了ESRI正在开发新的“AI 助手”来扩展ArcGIS应用程序&#xff0c;并且使用经过专门培训、提示工程和 LLM的“AI 助手”为这些应用程序提供特定技能。并且以视频的方式展示了如何使用生成式 AI 在ArcGIS应用程序中自动化和加速工作流。最后表示这些 “AI 助手”将在 202…

Ansible基本使用

目录 介绍 安装 inventory-主机清单 分组 子组 modules-模块 command shell script file copy systemd yum get_url yum_repository user mount cron 介绍 ansible是基于python开发的自动化运维工具。架构相对比较简单&#xff0c;仅需通过ssh连接客户机执行…

volatile如何保证可见性和禁止指令重排序?

当线程对volatile修饰的变量进行写操作时&#xff0c;JMM会插入一个写屏障&#xff0c;会强制的将本地内存中的数据写到主内存中 当线程对volatile修饰的变量进行读操作时&#xff0c;JMM会插入一个读屏障&#xff0c;会强制的让本地内存中数据失效&#xff0c;重新到主内存中读…

AUTOSAR_EXP_ARAComAPI的6章笔记(4)

☞返回总目录 相关总结&#xff1a;《AUTOSAR 自适应应用中原始数据流传输的使用方法》总结 6.4 原始数据流传输的使用方法 本章描述了原始数据流&#xff08;RawDataStreams&#xff09;在 AUTOSAR 自适应应用程序中的使用方法。 目前&#xff0c;原始数据流传输在单播 / …

input子系统的框架和重要数据结构详解

#1024程序员节 | 征文# 往期内容 I2C子系统专栏&#xff1a; 专栏地址&#xff1a;IIC子系统_憧憬一下的博客-CSDN博客具体芯片的IIC控制器驱动程序分析&#xff1a;i2c-imx.c-CSDN博客 – 末篇&#xff0c;有往期内容观看顺序 总线和设备树专栏&#xff1a; 专栏地址&#…

【Linux:网络基础】

网络协议&#xff1a; 协议实际上可以称为一种“约定”&#xff0c;通过网络通信中的数据约定&#xff0c;不同主机必须遵循相同的网络协议才可以实现通信。 协议即为通信双方都认识的结构化的数据类型 协议分层 协议的本质也是软件&#xff0c;在设计上为了更好的进行模块…