Python运维之多进程!!

本节的快速导航目录如下喔!!!

一、创建进程的类Process

二、进程并发控制之Semaphore

三、进程同步之Lock

四、进程同步之Event

五、进程优先队列Queue

六、多进程之进程池Pool

七、多进程之数据交换Pipe


一、创建进程的类Process

multiprocessing模块提供了一个创建进程的类Process,创建进程有两种方法:

  • 创建一个类的实例Process,并指定任务目标函数
  • 自定义一个类并继承Process类,重写init()方法和run方法

第一种方法:

 import os
 import time
 from multiprocessing import Process
 ​
 # 字进程要执行的代码:数据累加耗时函数
 def task_process(delay):
     num = 0
     for i in range(delay * 10000000):
         num += i
     print(f"进程pid为{os.getpid()},执行完成")
 ​
 ​
 if __name__ == '__main__':
     print('父进程pid为 %s.' % os.getpid())
     t0 = time.time()
     # 单进程执行
     task_process(3)
     task_process(3)
     t1 = time.time()
     print(f"顺序执行耗时{t1 - t0}")
     p0 = Process(target=task_process, args=(3,))
     p1 = Process(target=task_process, args=(4,))
     t2 = time.time()
     # 双进程并行执行
     p0.start();p1.start()
     p0.join();p1.join()
     t3 = time.time()
     print(f"多进程并发执行耗时{t3 - t2}")
 # 发现多进程执行相同的操作次数耗时更少

第二种方法(重写方法):

 # -*- coding: UTF-8 -*-
 import os
 import time
 from multiprocessing import Process
 ​
 class MyProcess(Process):
     def __init__(self, delay):
         super().__init__()
         self.delay = delay
 ​
     # 子进程要执行代码
     def run(self):
         num = 0
         for i in range(self.delay * 10000000):
             num += i
         print(f"进程pid为{os.getpid()},执行完成")
 ​
 if __name__ == '__main__':
     print('父进程pid为 %s.' % os.getpid())
     p0 = MyProcess(3)
     p1 = MyProcess(3)
     t0 = time.time()
     p0.start()
     p1.start()
     p0.join()
     p1.join()
     t1 = time.time()
     print(f"多进程并发执行耗时{t1 - t0}")

Process类的构造函数原型:

 class multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)
 ​
 '''参数说明如下:
  Target:表示调用对象,一般为函数,也可以为类。
  Args:表示调用对象的位置参数元组
  Kwargs:为进程调用对象的字典
  Name:为进程的别名
  Group:参数不使用,可忽略
 '''

类提供的常用方法如下:

  • is_alive():返回进程是否是激活的
  • join([timeout])阻塞进程,直到进程执行完成或超时或进程被终止
  • run():代表进程执行的任务函数,可被重写
  • start()激活进程
  • terminate():终止进程

其属性如下:

  • authkey:字节码,进程的准密钥
  • daemon父进程终止后自动终止且不能产生新进程,必须在start()之前设置
  • exicode:退出码,进程在运行时为None,如果为-N,就表示被信号N结束
  • name:获取进程名称
  • pid:进程ID

例子:不设置daemon属性:

 # -*- coding: UTF-8 -*-
 import os
 import time
 from multiprocessing import Process
 ​
 # 子进程要执行的代码
 def task_process(delay):
     print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始")
     print(f"sleep {delay}s")
     time.sleep(delay)
     print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束")
 ​
 if __name__ == '__main__':
     print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始")
     p0 = Process(target=task_process,args=(3,))
     p0.start()
     print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束")
 ​
 '''
 2024-05-08 20:04:09 父进程执行开始
 2024-05-08 20:04:09 父进程执行结束
 2024-05-08 20:04:10 子进程执行开始
 sleep 3s
 2024-05-08 20:04:13 子进程执行结束
 '''
'''
没有使用p0.join()来阻塞进程,父进程并没有等待子进程运行完成就打印了退出信息,程序依旧会等待子进程运行完成
'''

例子:设置daemon属性:

 # 在上述代码主程序中添加
 p0.deamon = True
 ​
 '''
 2024-05-08 20:07:11 父进程执行开始
 2024-05-08 20:07:11 父进程执行结束
 '''
 # 程序并没有等待子进程结束而结束,主要主程序结束,程序即退出

二、进程并发控制之Semaphore

semaphore用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数

 # 实例:多进程同步控制
 import multiprocessing
 import time
 ​
 def worker(s,i):
     s.acquire()                         print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name+"获得锁运行");
     time.sleep(i)
     print(time.strftime('%H:%M:%S'), multiprocessing.current_process().name +"释放锁运行");
     s.release()
 ​
 if __name__ == '__main__':
     s = multiprocessing.Semaphore(2)    # 同一时刻只有两个进程在执行
     for i in range(6):
         p = multiprocessing.Process(target=worker(s,2))
         p.start()

三、进程同步之Lock

多进程的目的是并发执行,提高资源利用率,从而提高效率,但有时候我们需要在某一时间只能有一个进程访问某个共享资源,这种情况就需要使用锁Lock

 # 多个进程输出信息,不加锁:
 import multiprocessing
 import time
 ​
 def task1():
     n = 5
     while n > 1:
         print(f"{time.strftime('%H:%M:%S')}task1 输出信息")
         time.sleep(1)
         n -= 1
 ​
 def task2():
     n = 5
     while n > 1:
         print(f"{time.strftime('%H:%M:%S')}task2 输出信息")
         time.sleep(1)
         n -= 1
 ​
 def task3():
     n = 5
     while n > 1:
         print(f"{time.strftime('%H:%M:%S')}task3 输出信息")
         time.sleep(1)
         n -= 1
 ​
 if __name__ == '__main__':
     p1 = multiprocessing.Process(target=task1)
     p2 = multiprocessing.Process(target=task2)
     p3 = multiprocessing.Process(target=task3)
     p1.start();p2.start();p3.start()

未使用锁,生成三个子进程,每个进程都打印自己的信息。下面使用锁:

 import multiprocessing
 import time
 ​
 def task1(lock):
     with lock:      # 也可使用上下文关键字加锁
         n = 5
         while n > 1:
             print(f"{time.strftime('%H:%M:%S')}task1 输出信息")
             time.sleep(1)
             n -= 1
 ​
 def task2(lock):
     lock.acquire()
     n = 5
     while n > 1:
         print(f"{time.strftime('%H:%M:%S')}task2 输出信息")
         time.sleep(1)
         n -= 1
     lock.release()
 ​
 def task3(lock):
     lock.acquire()  # 调用前后加锁
     n = 5
     while n > 1:
         print(f"{time.strftime('%H:%M:%S')}task3 输出信息")
         time.sleep(1)
         n -= 1
     lock.release()
 ​
 if __name__ == '__main__':
     lock = multiprocessing.Lock()       # 1、实例化一个锁
     p1 = multiprocessing.Process(target=task1,args=(lock,))
     p2 = multiprocessing.Process(target=task2,args=(lock,))
     p3 = multiprocessing.Process(target=task3,args=(lock,))
     p1.start()
     p2.start()
     p3.start()

可以发现,同一时刻只有一个进程在输出信息。

四、进程同步之Event

Event用来实现进程之间同步通信

import multiprocessing
import time

def with_for_event(e):
    e.wait()
    time.sleep(1)
    # 唤醒后清除Event状态,为后续继续等待
    e.clear()
    print(f"{time.strftime('%H:%M:%S')}:进程A:我们是兄弟,我等你...")
    e.wait()
    print(f"{time.strftime('%H:%M:%S')}:进程A:好的,是兄弟一起走")

def wait_for_event_timeout(e, t):
    e.wait()
    time.sleep(1)
    e.clear()
    print(f"{time.strftime('%H:%M:%S')}:进程B:好吧,最多等你{t}秒")
    e.wait(t)
    print(f"{time.strftime('%H:%M:%S')}:进程B:我要继续往前走了")

if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(target=with_for_event,args=(e,))
    w2 = multiprocessing.Process(target=wait_for_event_timeout,args=(e,5))
    w1.start()
    w2.start()
    # 主进程发话
    print(f"{time.strftime('%H:%M:%S')}:主进程:谁等我下,我需要8秒时间")
    # 唤醒等待的进程
    e.set()
    time.sleep(8)
    print(f"{time.strftime('%H:%M:%S')}:好的,我赶上了")
    # 再次唤醒等待的进程
    e.set()
    w1.join()
    w2.join()
    print(f"{time.strftime('%H:%M:%S')}:主进程:退出")

上述定义了两个函数,一个用于等待事件的发生;一个用于等待事件发生并设置超时时间。主进程调用事件的set()方法唤醒等待事件的进程,事件唤醒后调用claer()方法清除事件的状态并重新等待,以此达到进程同步的控制。

五、进程优先队列Queue

  • 多进程安全:Queue是为多进程环境设计的,确保在多进程之间进行数据传递时的安全性。
  • put方法
    • 用于向队列中插入数据
    • 有两个可选参数:blocked timeout
    • 默认情况下,blocked 设置为 True
    • 如果 timeout 是一个正值,并且队列已满,put 方法会阻塞直到队列有空间或者超时
    • 超时会抛出 Queue.Full 异常。
    • 如果 blocked 设置为 False 且队列已满,会立即抛出 Queue.Full 异常。
  • get方法
    • 用于从队列中读取并删除一个元素
    • 同样有两个可选参数:blockedtimeout
    • 默认情况下,blocked 设置为 True
    • 如果 timeout 是一个正值,并且在指定时间内队列中没有元素可取,会抛出 Queue.Empty 异常。
    • 如果block设置为False则有两种情况:
      • 如果队列为空,会立即抛出 Queue.Empty 异常。
      • 如果队列中有值可用,会立即返回该值。

这些特性确保了在多进程环境中,队列操作的同步性和数据的一致性。

# 实现消费者-生产者模式
from multiprocessing import Process,Queue
import time


def ProducerA(q):
    count = 1
    while True:
        q.put(f"冷饮 {count}")
        print(f"{time.strftime('%H:%M:%S')} A 放入:[冷饮 {count}]")
        count +=1
        time.sleep(1)

def ConsumerB(q):
    while True:
        print(f"{time.strftime('%H:%M:%S')} B 放入:[取出 {q.get()}]")
        time.sleep(5)

if __name__ == '__main__':
    q = Queue(maxsize=5)
    p = Process(target=ProducerA,args=(q,))
    c = Process(target=ConsumerB,args=(q,))
    c.start()
    p.start()
    c.join()
    p.join()

上述代码定义了生产者函数和消费者函数,设置其队列的最大容量是5,生产者不停的生产冷饮,消费者就不停的取出冷饮消费,当队列满时,生产者等待,当队列空时,消费者等待。

六、多进程之进程池Pool

进程池(Pool)按需创建进程,池满则请求排队等待

import multiprocessing
import time

def task(name):
    print(f"{time.strftime('%H:%M:%S')}: {name} 开始执行")
    time.sleep(3)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    for i in range(10):
        # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.apply_async(func=task, args=(i,))
    pool.close()
    pool.join()

从结果来看,同一时刻只有三个进程在执行,使用Pool实现了对进程并发数的控制

七、多进程之数据交换Pipe

在Python的多进程编程中,multiprocessing.Pipe() 方法可以创建一个管道,用于进程间的数据传输。

默认情况下,管道是全双工的,即两个进程可以互相发送和接收数据。如果需要设置为半双工(单向传输),可以通过传递参数duplex=False 来实现。

管道对象提供了send() 方法用于发送消息,以及recv() 方法用于接收消息。如果recv()在没有消息的情况下被调用,它会阻塞;如果管道已关闭,recv()会抛出EOFError异常。

import multiprocessing
import time

def task1(pipe):
    for i in range(5):
        str = f"task1-{i}"
        print(f"{time.strftime('%H:%M:%S')}: task1 发送:{str}")
        pipe.send(str)
    time.sleep(2)
    for i in range(5):
        print(f"{time.strftime('%H:%M:%S')}: task1 接收:{ pipe.recv() }")

def task2(pipe):
    for i in range(5):
        print(f"{time.strftime('%H:%M:%S')}: task2 接收:{pipe.recv()}")
    time.sleep(2)
    for i in range(5):
        str = f"task1-{i}"
        print(f"{time.strftime('%H:%M:%S')}: task2 发送:{ str }")

if __name__ == '__main__':
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=task1,args=(pipe[0],))
    p2 = multiprocessing.Process(target=task2,args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

首先程序定义了两个子进程函数:task1先发送5条消息,再接收消息;task2先接收消息,再发送消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/608182.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

5.9gunplot绘图堆叠柱状图

gunplot绘图堆叠柱状图 plot"要用的数据(后缀名是.dat)" using 2 t(或者title) 跟着是要命名的属性名称 这个名称可以用.dat里的每列列名,也可以直接在后面跟着定义 plot "data.dat" using 2 t columnheader(2), using 3 t column…

PLC数据采集网关的功能和特点-天拓四方

一、引言 随着工业自动化程度的不断提高,数据在生产线上的作用愈发重要。PLC作为工业自动化的核心设备,其数据采集和处理能力直接影响到整个生产线的效率和稳定性。而PLC数据采集网关,作为连接PLC与外部系统的桥梁,正日益受到人们…

vue3—win7搭建vue3环境

背景 vue3环境要求node.js18.3及以上版本,所以我们需要安装更高版本node.js,然而win7无法支持高版本node.js。下面我介绍一种安装方法。 步骤 1、下载 node-v13.14.0-x64.msi 安装,默认安装即可。安装完成后,进入cmd&#xff0c…

Hibernate认识

一、定义 Hibernate 是一种开源的 Java 对象关系映射 (ORM) 框架,用于将面向对象的领域模型持久化到关系数据库中。它为开发人员提供了一种简便的方法来操作数据库,而无需编写繁琐的 SQL 代码。 ORM(对象关系映射):Ob…

Linux 第二十章

🐶博主主页:ᰔᩚ. 一怀明月ꦿ ❤️‍🔥专栏系列:线性代数,C初学者入门训练,题解C,C的使用文章,「初学」C,linux 🔥座右铭:“不要等到什么都没有了…

提速25倍!MoonBit 新增后端支持,一周内成为热议焦点

MoonBit 新增 JS 后端支持 近期,MoonBit 迎来了重要更新:新增对 JavaScript 后端的支持,为用户带来了前所未有的性能提升。 MoonBit 诞生于2022年,是专为云计算及边缘计算设计的 AI 云原生编程语言及开发者平台。作为一门诞生于 …

Orange3数据可视化(小提琴图)

小提琴图 小提琴图和箱线图类似,用来显示数据分布和概率密度。结合了箱线图和密度图的特征,用来显示数据的分布形状。 输入 数据: 输入数据集 输出 选中的数据: 从图中选中的实例 数据: 增加了一列,显示数据点是否被选中 …

STM32--4G DTU 及 阿里云

模块概述 ATK-IDM750C/IDM751C 是正点原子(ALIENTEK)团队开发的一款高性能 4G Cat1 DTU 产品, 支持移动 4G、联通 4G 和电信 4G 手机卡。它以高速率、低延迟和无线数传作为核心功能, 可快速解决应用场景下的无线数传方案。 它支持 TCP/UDP/HTTP/MQTT/DN…

酷开科技AI技术支持,酷开系统根据你的喜好量身定制节目

在当今数字化时代,个性化推荐已成为提升消费者体验的关键因素。酷开科技的智慧AI,为消费者提供了精彩的内容推荐服务,更大地丰富了消费者的娱乐生活。 酷开系统中的AI推荐引擎通过学习消费者的观看习惯和偏好,能够快速识别其兴趣…

Python进行excel处理-01

最近干采购,每个月要对供应商的对账单,对对应的采购订单号和物料编号的价格和数量,是不是和物料管控总表里面的价格数量是不是一致,于是写了一个代码。 从总表里面找到,对账单里对应采购订单和物料编码的数据&#xf…

ACPI高级配置和电源接口规范概览

安全之安全(security)博客目录导读 目录 1 Overview 背景 What is ACPI ACPI初始化 2 Introduction 主要目标 Power management 基本原理 ACPI框架 Target Audience 3 术语 通用术语General ACPI Terminology 全局系统状态定义Global System State Definitions 设…

【全开源】JAVA智慧养老养老护理帮忙代办陪诊陪护小程序APP源码

一、特色功能 护理服务功能: 基础护理:提供日常生活中的基础护理服务,如洗漱、穿衣、进食等,用户可以通过小程序预约,由专业的护理人员上门提供服务。康复护理:针对术后康复或慢性病康复的老年人&#xf…

Ansys Mechanical|内嵌nCode疲劳仿真工具

疲劳分析是分析结构受到周期性载荷作用下,结构应力远小于强度极限情况,甚至结构应力比弹性极限还低的情况下就可能发生破坏的情况。Ansys nCode是国际著名的疲劳耐久性仿真分析软件,其多个版本以前已经可以和Ansys Mechanical进行无缝以进行联…

如果你这样使用电路仿真软件,你就无敌了!

在电子设计领域,电路仿真软件如同一把锋利的宝剑,掌握它,你就能在复杂的电子世界中游刃有余。今天,就让我们一起探讨如何高效利用电路仿真软件,让你在电子设计领域所向披靡! 一、熟悉软件界面与基础操作 …

Java文件与IO操作

1. 文件与IO操作 1.1 文件 什么是文件: 文件,对我们并不陌生,文件是保存数据的地方,比如大家经常使用的word文档,txt文件.excel文件...都是文件。它既可以保存一张图片,也可以保持视频,声音.… 1.1.1 文件流: 1.1.2 常用的文件操作: 创建文件对象相关构造器和方法: 案例&a…

从C向C++16——常见容器2

一.stack容器 1.stack理解 概念: stack是一种先进后出的数据结构,它只有一个出口。 它在C中也叫栈,类似于我们在《数据结构和算法》里面的栈,只不过在C中把其封装成库,我们可以直接使用。 注意:栈中只有…

【因特网中自治系统内部的路由选择,RIP 进程处理 OSPFOSPF(Open Shortest Path First)最短路径优先协议】

文章目录 因特网中自治系统内部的路由选择RIP(Routing Information Protocol)内部网关协议RIP通告(advertisements)RIP: 链路失效和恢复RIP 进程处理OSPF(Open Shortest Path First)最短路径优先协议OSPF “高级” 特性(在RIP中的…

C++ 继承篇

面向对象语言的三大特性:封装,继承和多态 根据目前学到的知识,对于封装的理解,大致有两层: 将数据和方法封装,不想让外面看到用private/protected修饰,想让外面看到用public修饰类型的行为不满…

Mybatis 源码分析

《黑马架构师_源码系列-主流框架&中间件》-- MyBatis (讲师:子慕) * 手写持久层框架-仿写mybatis * Mybatis架构设计&主要组件 * Mybatis如何完成的初始化? * Mybatis如何完成的sql解析及执行? * Mybatis如何设置的参数? * Mybat…