Python 中整洁的并行输出

原文:https://bernsteinbear.com/blog/python-parallel-output/
代码:https://gist.github.com/tekknolagi/4bee494a6e4483e4d849559ba53d067b

Python 并行输出

使用进程和锁并行输出多个任务的状态。
在这里插入图片描述

注:以下代码在linux下可用,windows下可能要进行修改。

假设你有一个程序,它对列表进行一些处理:

def log(repo_name, *args):
    print(f"{repo_name}:", *args)

def randsleep():
    import random
    import time
    time.sleep(random.randint(1, 5))

def func(repo_name):
    log(repo_name, "Starting")
    randsleep()  # Can be substituted for actual work
    log(repo_name, "Installing")
    randsleep()
    log(repo_name, "Building")
    randsleep()
    log(repo_name, "Instrumenting")
    randsleep()
    log(repo_name, "Running tests")
    randsleep()
    log(repo_name, f"Result in {repo_name}.json")

repos = ["repoA", "repoB", "repoC", "repoD"]
for repo in repos:
    func(repo)

这很好。它有效。有点吵,但有效。但随后你发现了一件好事:你的程序是数据并行。也就是说,您可以并行处理:
在这里插入图片描述

import multiprocessing

# ...

with multiprocessing.Pool() as pool:
    pool.map(func, repos, chunksize=1)

不幸的是,输出有点笨拙。虽然每行仍然很好输出一个 repo,但它正在左右喷出行,并且这些行是混合的。
在这里插入图片描述

幸运的是,StackOverflow 用户 Leedehai是终端专业用户,知道如何在控制台中一次重写多行。我们可以根据自己的需要调整这个答案:

def fill_output():
    to_fill = num_lines - len(last_output_per_process)
    for _ in range(to_fill):
        print()

def clean_up():
    for _ in range(num_lines):
        print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole line

def log(repo_name, *args):
    with terminal_lock:
        last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
        clean_up()
        sorted_lines = last_output_per_process.items()
        for repo_name, last_line in sorted_lines:
            print(f"{repo_name}: {last_line}")
        fill_output()

def func(repo_name):
    # ...
    with terminal_lock:
        del last_output_per_process[repo_name]

# ...

repos = ["repoA", "repoB", "repoC", "repoD"]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)
with multiprocessing.Manager() as manager:
    last_output_per_process = manager.dict()
    terminal_lock = manager.Lock()
    fill_output()
    with multiprocessing.Pool() as pool:
        pool.map(func, repos, chunksize=1)
    clean_up()

在这里插入图片描述

这会将每个项目的状态(一次一行)打印到终端。它将按项目添加到的 last_output_per_process 顺序打印,但您可以通过(例如)按字母数字排序来更改它: sorted(last_output_per_process.items())

请注意,我们必须锁定数据结构和终端输出,以避免事情被破坏;它们在过程之间共享(pickled,via Manager )。

如果日志输出有多行长,或者其他人正在用 stdout / stderr (也许是流浪的 print )搞砸,我不确定这会做什么。如果您发现或有整洁的解决方案,请写信。

这种技术对于任何具有线程和锁的编程语言来说可能是相当可移植的。关键的区别在于这些实现应该使用线程而不是进程;我做进程是因为它是 Python。

最终版

import multiprocessing
import random
import time


class Logger:
    def __init__(self, num_lines, last_output_per_process, terminal_lock):
        self.num_lines = num_lines
        self.last_output_per_process = last_output_per_process
        self.terminal_lock = terminal_lock

    def fill_output(self):
        to_fill = self.num_lines - len(self.last_output_per_process)
        for _ in range(to_fill):
            print()

    def clean_up(self):
        for _ in range(self.num_lines):
            print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole line

    def log(self, repo_name, *args):
        with self.terminal_lock:
            self.last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
            self.clean_up()
            sorted_lines = self.last_output_per_process.items()
            for repo_name, last_line in sorted_lines:
                print(f"{repo_name}: {last_line}")
            self.fill_output()

    def done(self, repo_name):
        with self.terminal_lock:
            del self.last_output_per_process[repo_name]


class MultiprocessingLogger(Logger):
    def __init__(self, num_lines, manager):
        super().__init__(num_lines, manager.dict(), manager.Lock())


class FakeLock:
    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, traceback):
        pass


class SingleProcessLogger(Logger):
    def __init__(self, num_lines):
        super().__init__(num_lines, {}, FakeLock())


def randsleep():
    time.sleep(random.randint(1, 2) / random.randint(1, 5))


def func(repo_name):
    logger.log(repo_name, "Starting")
    randsleep()
    logger.log(repo_name, "Installing")
    randsleep()
    logger.log(repo_name, "Building")
    randsleep()
    logger.log(repo_name, "Instrumenting")
    randsleep()
    logger.log(repo_name, "Running tests")
    randsleep()
    logger.log(repo_name, f"Result in {repo_name}.json")
    randsleep()
    logger.done(repo_name)


def multi_process_demo():
    ascii_uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    repos = [f"repo{letter}" for letter in ascii_uppercase]
    num_procs = multiprocessing.cpu_count()
    num_lines = min(len(repos), num_procs)
    with multiprocessing.Manager() as manager:
        global logger
        logger = MultiprocessingLogger(num_lines, manager)
        # Make space for our output
        logger.fill_output()
        with multiprocessing.Pool(num_procs) as pool:
            pool.map(func, repos, chunksize=1)
        logger.clean_up()


def single_process_demo():
    repo = "repoA"
    num_lines = 1
    global logger
    logger = SingleProcessLogger(num_lines)
    logger.fill_output()
    func(repo)
    logger.clean_up()

if __name__ == "__main__":
    multi_process_demo()
    # single_process_demo()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/560587.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Tcpdump -r 解析pcap文件

当我们使用命令抓包后,想在命令行直接读取筛选怎么办?-r参数就支持了这个 当你使用 tcpdump 的 -r 选项读取一个之前捕获的数据包文件,并想要筛选指定 IP 地址和端口的包时,你可以在命令中直接加入过滤表达式。这些过滤表达式可以…

数据可视化(六):Pandas爬取NBA球队排名、爬取历年中国人口数据、爬取中国大学排名、爬取sina股票数据、绘制精美函数图像

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊! 喜欢我的博客的话,记得…

基于ThinkPHP框架开发的的站长在线工具箱网站PHP源码(可以作为流量站)

这是一套基于ThinkPHP框架开发的站长在线工具箱网站PHP源码,包含了多种在线工具,可以作为流量站使用。 项 目 地 址 : runruncode.com/php/19742.html 部署教程: 环境要求: - PHP版本需要大于等于7.2.5 - MySQL版…

element-ui合计逻辑踩坑

element-ui合计逻辑踩坑 1.快速实现一个合计 ​ Element UI所提供的el-table中提供了方便快捷的合计逻辑实现: ​ https://element.eleme.cn/#/zh-CN/component/table ​ 此实现方法在官方文档中介绍详细,此处不多赘述。 ​ 这里需要注意&#xff0c…

设备连接IoT云平台指南

一、简介 设备与IoT云间的通讯协议包含了MQTT,LwM2M/CoAP,HTTP/HTTP2,Modbus,OPC-UA,OPC-DA。而我们设备端与云端通讯主要用的协议是MQTT。那么设备端与IoT云间是如何创建通信的呢?以连接华为云IoT平台为例…

React中redux、react-redux、@reduxjs/toolkit状态管理库的使用方式

效果 下载依赖 npm install redux react-redux reduxjs/toolkit --save在src目录下创建文件 创建index.ts文件 import { configureStore } from reduxjs/toolkit import userSlice from ./userReducerconst store configureStore({reducer: {user: userSlice.reducer} }) //…

java实现识别图片上的文字(OCR识别身份证等证件信息)

利用第三方jar包,实现识别图片上的文字。第三方支持地址:Spire.OCR for Java | 专业的图文识别组件,用以读取图片格式中的文本Spire.OCR for Java 是专为 Java 开发者设计的强大OCR库,提供高效的文字识别功能,能够从图…

储存器的专有名词辨析

位:存放一个二进制位字节:8位存放一个二进制数储存单元:一个八位的储存器,叫做一个储存单元储存单元地址:储存单元唯一的固定编号储存单元数据:存放于储存单元的数字储存单元容量:一排能储存单元…

imx6ull设备树

概念 什么是设备树 描述设备树的文件叫DTS,实际上就是在这个DTS文件里面,用树状的结构存储设备之间的关系。在以前这棵树就是设备树。 什么是DTS、DTB、DTC DTS就是我们上面的设备树源码文件、DTB是它的二进制文件、DTC是我们编译DTS的工具&#xff…

echart实现数据传输动态效果

function setDataTransfer(id) {var chart echarts.init(document.getElementById(id)); var items [{level: 1,name: "传感器",label: beijing,value: [20, 10],symbol: "",symbolSize: [30, 30]},{level: 1,symbol: "",name: "物联中心…

imazing64位2.17.6.0新功能介绍以及 iMazing最新版免费激活下载

iMazing for mac是一款可以在苹果电脑Mac os平台上使用的帮助用户管理手机的Mac手机助手,iMazing for mac是能力远超 iTunes 提供的终极的 iOS 设备管理器。IMazing 与你的 iOS 设备 (iPhone、 iPad 或 iPod)相连,使用起来非常的方…

openstack-图形管理 6

安装并配置组件 重启web服务及会话存储服务 图形化登录 删除云主机 使用管理员登录 删除子网网络 删除云主机网络 创建网络 创建云主机 控制节点配置 配置私有网络,配置虚拟子网: 配置ML2插件 配置Linuxbridge(桥接) 配置laye…

ROS仿真小车(二)——添加摄像头雷达传感器

文章目录 前言一、在 Rviz 中显示一个盒状机器人1.1 创建ROS功能包1.2 在 launch 文件中集成 URDF 与 Rviz1.3 在 Rviz 中显示机器人模型1.4 优化 rviz 启动 二、创建一个四轮圆柱状机器人模型2.1 配置urdf和launch文件2.2 URDF优化_xacro2.2.1 配置xacro文件2.2.2 编写 Xacro …

Windows平台下的Oracle 19c补丁升级

Windows平台下的Oracle 19c补丁升级 文章目录 Windows平台下的Oracle 19c补丁升级第一章 概述第二章 安装前备份2.1 软件目录备份2.2 权限备份2.3 备份数据库 第三章 安装前检查3.1 查看数据库版本3.2 升级opatch版本 第四章 安装补丁4.1 设置环境变量4.2 关闭oracle相关服务4.…

c++11 标准模板(STL)本地化库 - 平面类别(std::collate) - 定义字典序比较和字符串的散列(二)

本地化库 本地环境设施包含字符分类和字符串校对、数值、货币及日期/时间格式化和分析,以及消息取得的国际化支持。本地环境设置控制流 I/O 、正则表达式库和 C 标准库的其他组件的行为。 平面类别 定义字典序比较和字符串的散列 std::collate 类 std::collate 封…

VLOOKUP函数使用,为什么会报错“引用有问题”?

VLOOKUP函数的使用非常广泛,在excel2007版之后的软件中,使用VLOOKUP函数也许会遇到这样的场景,明明公式是没有问题的,公式还会报错“引用有问题”。 一、报错场景 输入公式后,回车确认,显示如下报错&…

【论文阅读】Attention is all you need

摘要 主要的序列转换模型是基于复杂的循环或卷积神经网络,其中包括一个编码器和一个解码器。性能最好的模型还通过一种注意力机制将编码器和解码器连接起来。我们提出了一种新的简单的网络架构,Transformer,完全基于注意机制,完全…

javaEE初阶——多线程(五)

T04BF 👋专栏: 算法|JAVA|MySQL|C语言 🫵 小比特 大梦想 此篇文章与大家分享关于多线程的文章第五篇关于 多线程代码案例二 阻塞队列 如果有不足的或者错误的请您指出! 目录 2.阻塞队列2.1常见队列2.2 生产者消费者模型有利于进行解耦合程序进行削峰填谷…

力扣---填充每个节点的下一个右侧节点指针 II

给定一个二叉树: struct Node {int val;Node *left;Node *right;Node *next; } 填充它的每个 next 指针,让这个指针指向其下一个右侧节点。如果找不到下一个右侧节点,则将 next 指针设置为 NULL 。 初始状态下,所有 next 指针都…

大厂面试精华面试刷题

1.自定义unshift实现相同效果 2.数组去重 用vs2019来写这种练习题可以更直观的查看代码执行的效果,最后的代码是控制控制台执行完毕后不自动关闭 use strict;let arr [1, 1, 2, 2, 3, 3, 4, 5, 6, 7, 8, 9, 10] //1.//查重最简单的方法for循环结合splice从数组中…