Python中的多进程、多线程、协程

Python中的多线程、多进程、协程

一、概述

1. 多线程Thread (threading):

  • 优点:同一个进程中可以启动多个线程,充分利用IO时,cpu进行等待的时间
  • 缺点:相对于进程,多线程只能并发执行,不能利用多CPU,相对于协程,多线程的启动数目有限 ,占用内存资源,并且有线程切换的时间开销
  • 使用场景IO密集型计算、同时运行的任务数据要求不多

2. 多进程Process(multiprocessing):

  • 优点:可以利用多核CPU进行并行计算
  • 缺点:占用资源最多,可启动的数目比线程少
  • 使用场景CPU密集型计算

3. 协程Coroutine(asyncio):

  • 优点:内存开销最少、启动协程的数量最多
  • 缺点:支持的库有限制 (request对用的能使用协程的库为:aiohttp)、代码实现复杂
  • 使用场景IO密集型计算、超多任务运行、有现成的库支持的场景

4. Python比C/C++/Java速度慢的原因:

  1. Python是动态类型语言,边解释边执行。
  2. GIL,无法利用多核CPU并发 执行。

5. GIL

GIL名为:全局解释器锁(Global Interpreter Lock 缩写为: GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行,即便在多核心处理器上,使用GIL的解释器也只允许同一时刻执行一个线程。这是为了解决多线程之间数据完整性的状态同步问题。

6. 创建工具代码:

这个代码是一个基本的爬虫代码,创建这个代码的目的时为了下面实现多线程或多进程的时候让代码显的更加简洁,进而能更清晰的观察到多线程或多进程的实现方式。

utils.blog_spider.py

import requests
from bs4 import BeautifulSoup

urls = [
    f'https://www.cnblogs.com/#p{page}'
    for page in range(1, 50 + 1)
]

# 请求url,得到html
def craw(url):
    r = requests.get(url)
    # print(url, len(r.text))
    return r.text

# 解析html
def parse(html):
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return [(link['href'], link.get_text()) for link in links]


if __name__ == '__main__':
    for restult in parse(craw(urls[2])):
        print(restult)

二、多线程

1. 创建多线程的一般流程:

  1. 准备一个函数

    def my_func(a, b):
    	doing(a, b)
    
  2. 创建一个线程

    import threading  # 导入线程包
    t = threading.Threading(target=my_func, args=(100, 200))  # 第一个参数为一个函数名,第二个参数时传入函数的参数
    
  3. 启动线程

    t.start()
    
  4. 等待结束

    t.join()
    

代码示例:

# 多线程爬虫
import threading
import time

import utils.blog_spider as bg


def signal_thread():
    for url in bg.urls:
        bg.craw(url)


def multi_thread():
    threads = []
    for url in bg.urls:
        threads.append(threading.Thread(target=bg.craw, args=(url,)))  # 添加线程
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()


if __name__ == '__main__':
    start = time.time()
    signal_thread()
    end = time.time()
    print("signal_thread cosst :", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi_thread cosst :", end - start, "seconds")

输出结果:

在这里插入图片描述

在这里插入图片描述

2. 多线程之间的数据通信queue.Queue

queue.Queue可以用于多线程之间、线程安全的数据通信。

具体流程如下 :

  1. 导入队列库

    import queue
    
  2. 创建Queue

    q = queue.Queue()
    
  3. 添加元素

    q.put(item)
    
  4. 获取元素

    item = q.get()
    
  5. 查询状态

    # 查看元素的多少
    q.qsize()
                   
    # 判断是否为空
    q.empty()
                   
    # 判断是否已满
    q.full()
    

代码示例:

# 多线程数据通信
import queue
import utils.blog_spider as bg
import time
import random
import threading


def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    while True:
        url = url_queue.get()
        html = bg.craw(url)
        html_queue.put(html)   # 加入队列
        print(threading.current_thread().name, f"craw{url}", "url_queue.size=", url_queue.qsize())
        time.sleep(random.randint(1, 2))  # 进行随机休眠


def do_parse(html_queue: queue.Queue, fout):
    while True:
        html = html_queue.get()  # 从队列中取数据
        results = bg.parse(html)
        for result in results:
            fout.write(str(result) + '\n')
        print(threading.current_thread().name, "results.size=", len(results), "html_queue.size=", html_queue.qsize())
        time.sleep(random.randint(1, 2))


if __name__ == '__main__':
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in bg.urls:
        url_queue.put(url)
    for idx in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
        t.start()
    fout = open("data.txt", "w")
    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
        t.start()

在这里插入图片描述

3. 线程安全

由于线程的执行随时会切换,这会造成不可预料的结果,出现线程不安全的情况。

Lock用于解决线程安全问题,对线程进行加锁,这样会使得该线程运行结束之后在切换线程。

用法1:try-finally模式

import threading
lock = threading.Lock()   # 设置线程锁
lock.acquire()  # 获得锁
try:
    # do something
finally:
    lock.realse()   # 释放锁

用法2:with模式

import threading
lock = threading.Lock()
with lock:
    # do something

sleep语句一定会导致当前线程阻塞,会进行线程的切换(加锁则不会进行进程的切换)。

代码示例:

# 线程安全
import threading
import time

lock = threading.Lock()


class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account, amount):
    with lock:
        if account.balance >= amount:
            time.sleep(0.1)
            print(threading.current_thread().name, "取钱成功")
            account.balance -= amount
            print(threading.current_thread().name, "余额", account.balance)
        else:
            print(threading.current_thread().name, "取钱失败,余额不足")


if __name__ == '__main__':
    account = Account(1000)
    ta = threading.Thread(name="ta", target=draw, args=(account, 800))
    tb = threading.Thread(name='tb', target=draw, args=(account, 800))

    ta.start()
    tb.start()

在这里插入图片描述

4. 线程池

线程池中有一些线程,新来的任务放在任务队列中,在线程池中的线程空闲的时候会自动处理任务队列里的任务。

使用线程池的好处:

  1. 提升性能
    因为减去了大量新建、终止线程的开销,重用了线程资源;
  2. 使用场景
    适合处理突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。
  3. 防御功能
    能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢的问题。
  4. 代码优势
    使用线程池的语法比自己新建线程执行线程更加简洁。

使用方法:

  1. 用法1map函数,注意map结果和入参是顺序对应的

    from concurrent.futures import ThreadPoolExecutor, as_completed
    with ThreadPoolExecutor() as pool:
        results = pool.map(craw, urls)
        for result in results:
            print(result)
    
  2. 用法2future模式更强大,注意如果用as_complete方法,这样的线程执行顺序是不固定的,但是相应的效率会更高。(as_completed方法是不管哪个线程先执行完了,都会直接返回,不用按照顺序返回)

    with ThreadPoolExecutor() as pool:
        futures = [pool.submit(craw, url) for url in urls]
        for future in futures:
            print(future.result())
        for future in as_complated(futures):
            print(future.result())
    

代码示例:

import concurrent.futures
import utils.blog_spider as bg

# 进行爬取html
with concurrent.futures.ThreadPoolExecutor() as pool:
    htmls = pool.map(bg.craw, bg.urls)  # 加入线程池
    htmls = list(zip(bg.urls, htmls))
    for url, html in htmls:
        print(url, len(html))
    print("craw over")

# 进行解析html
with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}
    for url, html in htmls:
        future = pool.submit(bg.parse, html)  # 加入线程池
        futures[future] = url
    for future, url in futures.items():
        print(url, future.result())

    # 顺序不定
    # for future in concurrent.futures.as_completed(futures):
    #     url = futures[future]
    #     print(url, future.result())

三、多进程

CPU密集型计算线程的自动的切换反而变成了负担,多线程甚至减慢了运行速度。multiprocessing模块就是Python为了解决GIL缺陷引入的一个模块,原理是用多进程在多CPU上并行执行。

多进程multiprocessing知识梳理(对比多线程threading

多线程:

  1. 引入模块

    from threading import Thread
    
  2. 新建

    t = Thread(target=func, args=(100, ))
    
  3. 启动

    t.start()
    
  4. 等待结束

    t.join()
    
  5. 数据通信

    import queue
    q = queue.Queue()
    q.put(item)
    item = q.get()
    
  6. 线程安全加锁

    from threading import Lock
    lock = Lock()
    with lock:
        # do something
    
  7. 池化技术

    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor() as executor:
        # 方法1
        results = executor.map(func, [1, 2, 3])
        # 方法2
        future = executor.submit(func, 1)
        result = future.result()
    

多进程:

  1. 引入模块:

    from multiprocessing import Process
    
  2. 新建

    p = process(target=f, args=('bob', ))
    
  3. 启动

    p.start()
    
  4. 等待结束

    p.join()
    
  5. 数据通信

    from multiprocessing import Queue
    q = Queue()
    q.put([42, None, 'hello'])
    item = q.get()
    
  6. 线程安全加锁

    from  multiprocessing import Lock
    lock = Lock()
    with lock:
        # do something
    
  7. 池化技术

    from concurrent.futures import ProcessPoolExecutor
    with ProcessPoolExecutor() as executor:
        # 方法1:
        results = executor.map(func, [1, 2, 3])
        # 方法2:
        future = executor.submit(func, 1)
        result = funture.result()
    

代码示例:

import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor  # 导入线程池与进程池
import time

PRIMES = [112272535095293] * 100


def is_prime(n):
    '''
    判断一个数是否是素数
    :param n:
    :return:
    '''
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def signal_thread():
    '''
    单线程
    :return:
    '''
    for number in PRIMES:
        is_prime(number)


def multi_thread():
    '''
    多线程
    :return:
    '''
    with ThreadPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


def multi_process():
    '''
    多进程
    :return:
    '''
    with ProcessPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


if __name__ == '__main__':
    start = time.time()
    signal_thread()
    end = time.time()
    print("signal_thread, cost:", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi_thread, cost:", end - start, "seconds")


    start = time.time()
    multi_process()
    end = time.time()
    print("multi_process, cost:", end - start, "seconds")

输出结果:

在这里插入图片描述

四、协程

协程是在单线程内实现并发

  • 核心原理:用一个超级循环(其实就是while Treu)循环
  • 配合IO多路复用原理(IOCPU可以干其他事情)

异步IO库介绍:asyncio

1. 创建协程的一般流程:

  1. 导入库

    import asyncio
    
  2. 获取事件循环

    loop = asyncio.get_enent_loop()
    
  3. 定义协程

    async def myfunc(url):
        await get_url(url)
    
  4. 创建task任务列表

    task = [loop.create_task(myfunc(url)) for url in urls]
    
  5. 执行爬虫事件列表

    loop.run_until_complete(asyncio.wait(tasks))
    

注意:要用在异步IO编程中,依赖的库必须支持异步IO特性,爬虫引用中:requests不支持异步,需要用aiohttp

代码示例:

import asyncio
import aiohttp
import utils.blog_spider as bg   # 该模块是工具代码中的模块
import time


async def async_craw(url):
    print("craw url:", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url: {url}, {len(result)}")


loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url=url))   # 添加任务
         for url in bg.urls]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))   # 启动协程
end = time.time()
print("use time seconds :", end - start)

在这里插入图片描述

2. 控制协程的并发度

可以使用信号量(semaphore)来控制并发度

实现方式1:

sem = asyncio.Semaphore(10)
# later
async with sem:
    # work with shared resource

实现方式2:

sem = asyncio.Semaphore(10)
# later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/611591.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python写了for i in range(10)却只打印一遍?

题目&#xff1a;定义一个两个参数的重复打印函数&#xff0c;第一个参数指定要打印的字符串&#xff0c;第二个参数指定要重复打印的次数&#xff0c;在主程序中调用该函数&#xff0c;打印10遍你的学号姓名。 为什么调用函数后结果只打印了一遍? 看了题目感觉就很诡异&#…

AS-VJ900实时视频拼接系统产品介绍:两画面视频拼接方法和操作

目录 一、实时视频拼接系统介绍 &#xff08;一&#xff09;实时视频拼接的定义 &#xff08;二&#xff09;无缝拼接 &#xff08;三&#xff09;AS-VJ900功能介绍 1、功能 2、拼接界面介绍 二、拼接前的准备 &#xff08;一&#xff09;摄像机选择 &#xff08;二&a…

169.招式拆解 II(unordered_map)

刷算法题&#xff1a; 第一遍&#xff1a;1.看5分钟&#xff0c;没思路看题解 2.通过题解改进自己的解法&#xff0c;并且要写每行的注释以及自己的思路。 3.思考自己做到了题解的哪一步&#xff0c;下次怎么才能做对(总结方法) 4.整理到自己的自媒体平台。 5.再刷重复的类…

数据中心法

数据中心法是实现词法分析器的结构化方法。通过设计主表和子表分开存储状态转移信息&#xff0c;实现词法分析器的控制逻辑和数据结构分离。 主要解决了状态爆炸、难以维护和复杂性的问题。 状态爆炸是指当状态和转移较多时&#xff0c;单一使用一个表来存储所有的信息的话会导…

Paddle 实现DCGAN

传统GAN 传统的GAN可以看我的这篇文章&#xff1a;Paddle 基于ANN&#xff08;全连接神经网络&#xff09;的GAN&#xff08;生成对抗网络&#xff09;实现-CSDN博客 DCGAN DCGAN是适用于图像生成的GAN&#xff0c;它的特点是&#xff1a; 只采用卷积层和转置卷积层&#x…

如何在 CentOS 上安装并配置 Redis

如何在 CentOS 上安装并配置 Redis 但是太阳&#xff0c;他每时每刻都是夕阳也都是旭日。当他熄灭着走下山去收尽苍凉残照之际&#xff0c;正是他在另一面燃烧着爬上山巅散烈烈朝晖之时。 ——史铁生 环境准备 本教程将在 CentOS 7 或 CentOS 8 上进行。确保你的系统已更新到最…

自托管站点监控工具 Uptime Kuma 搭建与使用

本文首发于只抄博客&#xff0c;欢迎点击原文链接了解更多内容。 前言 Uptime Kuma 是一个类似 Uptime Robot 的站点监控工具&#xff0c;它可以自托管在自己的 Nas 或者 VPS 上&#xff0c;用来监控各类站点、数据库等 监控类型&#xff1a;支持监控 HTTP(s) / TCP / HTTP(s…

Day 43 1049. 最后一块石头的重量 II 494. 目标和 474.一和零

最后一块石头重量Ⅱ 有一堆石头&#xff0c;每块石头的重量都是正整数。 每一回合&#xff0c;从中选出任意两块石头&#xff0c;然后将它们一起粉碎。假设石头的重量分别为 x 和 y&#xff0c;且 x < y。那么粉碎的可能结果如下&#xff1a; 如果 x y&#xff0c;那么两…

【LLM 论文】Step-Back Prompting:先解决更高层次的问题来提高 LLM 推理能力

论文&#xff1a;Take a Step Back: Evoking Reasoning via Abstraction in Large Language Models ⭐⭐⭐⭐ Google DeepMind, ICLR 2024, arXiv:2310.06117 论文速读 该论文受到的启发是&#xff1a;人类再解决一个包含很多细节的具体问题时&#xff0c;先站在更高的层次上解…

【Git】Github创建远程仓库并与本地互联

创建仓库 点击生成新的仓库 创建成功后会生成一个这样的文件 拉取到本地 首先先确保本地安装了git 可以通过终端使用 git --version来查看是否安装好了git 如果显示了版本信息&#xff0c;说明已经安装好了git&#xff0c;这时候我们就可以进入我们想要clone到问目标文件夹 …

计算机系列之算法分析与设计

21、算法分析与设计 算法是对特定问题求解步骤的一种描述。它是指令的有限序列&#xff0c;其中每一条指令标识一个或多个操作。 它具有有穷性、确定性&#xff08;含义确定、输入输出确定&#xff0c;相同输入相同输出&#xff1b;执行路径唯一&#xff09;、可行性、输入&a…

【SAP ME 38】SAP ME发布WebService配置及应用

更多WebService介绍请参照 【SAP ME 28】SAP ME创建开发组件&#xff08;DC&#xff09;webService 致此一个WebService应用发布成功&#xff0c;把wsdl文件提供到第三方系统调用接口&#xff01; 注意&#xff1a; 在SAP ME官方开发中默认对外开放的接口是WebService接口&am…

01、vue+openlayers6实现自定义测量功能(提供源码)

首先先封装一些openlayers的工具函数&#xff0c;如下所示&#xff1a; import VectorSource from ol/source/Vector; import VectorLayer from ol/layer/Vector; import Style from ol/style/Style; import Fill from ol/style/Fill; import Stroke from ol/style/Stroke; im…

Android GPU渲染SurfaceFlinger合成RenderThread的dequeueBuffer/queueBuffer与fence机制(2)

Android GPU渲染SurfaceFlinger合成RenderThread的dequeueBuffer/queueBuffer与fence机制&#xff08;2&#xff09; 计算fps帧率 用 adb shell dumpsys SurfaceFlinger --list 查询当前的SurfaceView&#xff0c;然后有好多行&#xff0c;再把要查询的行内容完整的传给 ad…

题目----力扣--移除链表元素

题目 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,6,3,4,5,6], val 6 输出&#xff1a;[1,2,3,4,5]示例 2&#xff1a; 输入&…

智慧公厕:让厕所管理变得更智慧、高效、舒适!

公共厕所是城市的重要组成部分&#xff0c;但常常被忽视。它们的管理和养护往往面临着许多问题&#xff0c;例如卫生状况不佳、环境畏畏缩缩、设施老旧等。为了解决这些问题&#xff0c;智慧公厕应运而生。智慧公厕是一种全方位的应用解决方案&#xff0c;将科技与公共厕所管理…

我在洛杉矶采访到了亚马逊云全球首席信息官CISO(L11)!

在本次洛杉矶举办的亚马逊云Re:Inforce全球安全大会中&#xff0c;小李哥作为亚马逊大中华区开发者社区和自媒体代表&#xff0c;跟着亚马逊云安全产品团队采访了亚马逊云首席信息安全官(CISO)CJ Moses、亚马逊副总裁Eric Brandwine和亚马逊云首席高级安全工程师Becky Weiss。 …

搜索的未来:OpenAI 的 GPT 如何彻底改变行业

搜索的未来&#xff1a;OpenAI 的 GPT 如何彻底改变行业 概述 搜索引擎格局正处于一场革命的风口浪尖&#xff0c;而 OpenAI 的 GPT 处于这场变革的最前沿。最近出现了一种被称为“im-good-gpt-2-chatbot”的神秘聊天机器人&#xff0c;以及基于 ChatGPT 的搜索引擎的传言&am…

【C++ 内存管理】深拷贝和浅拷贝你了解吗?

文章目录 1.深拷贝2.浅拷贝3.深拷贝和浅拷贝 1.深拷贝 &#x1f34e; 深拷⻉: 是对对象的完全独⽴复制&#xff0c;包括对象内部动态分配的资源。在深拷⻉中&#xff0c;不仅复制对象的值&#xff0c;还会复制对象所指向的堆上的数据。 特点&#xff1a; &#x1f427;① 复制对…

DCDC中MOS半桥的自举电容,自举电阻问题

一个免费的翻译英文文章的网站&#xff0c;可以将英文数据手册翻译为中文&#xff08;需要挂梯子&#xff0c;不收费&#xff0c;无广告&#xff0c;不需要注册&#xff09;&#xff0c;链接如下&#xff1a; Google 翻译 翻译效果&#xff1a; // 104电容是0.1uf&#xff1b…