线程、进程池与线程池
文章目录
- 线程、进程池与线程池
- 一、线程
- 二、线程的相关操作
- 2.1创建线程的两种方式
- 2.2线程的其他操作
- 2.3死锁现象和递归锁
- 2.4条件
- 2.5定时器
- 2.6 队列与堆栈
- 三、进程池与线程池
一、线程
线程是指cpu上实际执行计算的单位,而进程是将计算所需资源的一个集合(内存分配释放,资源的申请销毁等等)。举个例子,进程好比是车间,而线程则是车间的流水线,流水线是实际的生产工具,而车间是集合生产资源的地方。
线程的特点以及进程线程的区别:
- 当一个进程被创建以后,默认会带着一个线程,称之为主线程
- 一个进程中可以创建多条线程,各个线程间共享资源;而父进程创建子进程以后,子进程会获得父进程的数据副本,但是父进程与子进程间数据是隔离开的
- 线程创建切换的速度快,创建消耗小,而进程创建切换的速度慢,创建消耗大
- cpu的并发本质是在多个线程之间进行切换
- 多进程之间使用不同的地址空间,而一个进程中的线程共用进程的地址空间
何时使用多进程?何时使用多线程?
(cpu多核情况下)若执行的任务是cup密集型的(大量计算任务),则使用多进程。因为python中的多线程无法使用多核cpu的优势,为了充分发挥多核cpu的计算力则需要使用多进程。
若执行任务是I/O密集型的(如网络通信等等),则使用多线程。I/O密集型任务涉及大量的I/O操作,而多个进程间的来回切换对比于多个线程之间而言消耗更多时间与资源。
多线程工作的具体例子:你打开一个文字处理软件,该软件需要监听键盘输入、处理文字信息、将内容保存至硬盘。这三件事情都涉及操作内存中的同一块数据,显然使用多进程并不合适,所以使用多线程的方式,开启三个线程同时处理三个任务,相互配合完成文字处理软件的功能。
二、线程的相关操作
2.1创建线程的两种方式
开启线程的方式与进程相似,这里就不详细介绍了。
#方式一
#currentThread方法可以查看当前运行线程的相关信息
from threading import Thread,currentThread
import time
def task():
print('%s is running' %currentThread().name)
time.sleep(3)
print('%s is done' %currentThread().name)
#由于线程间共享主进程的资源,创建子线程不会导入主线程的代码,所以下方代码可以不写
#if __name__ == '__main__':
t=Thread(target=task,name='子线程')
t.start()
print('主')
#方式二
from threading import Thread
import time
class Mythread(Thread):
def run(self):
print('%s is running' %self.name)
time.sleep(3)
print('%s is done' %self.name)
#if __name__ == '__main__':
t=Mythread(name='子线程')
t.start()
print('主')
同样的主进程(主线程)需要等其所有的子线程运行结束以后才会结束。
2.2线程的其他操作
由于多个线程共享主进程的内存空间,所以多个线程的pid号和主进程是相同的
from threading import Thread
import os
class Mythread(Thread):
def run(self):
print('%s的pid:%s' %(self.name,os.getpid()))
for i in range(5):
t=Mythread(name='子线程%d'%i)
t.start()
print('主进程的pid:%s'%os.getpid())
运行结果:
线程中的join、is_alive、name属性方法的使用和进程相同,这边不在介绍了。
threading.currentThread()该方法可以查看当前线程的相关信息
threading.enumerate()该方法可以返回一个包含正在运行线程的列表
threading.activeCount()该方法可以返回正在运行线程的数量
守护线程的创建方式和守护进程相同,但是守护线程需要主进程内的所有子线程都运行完以后才会结束,而守护进程则是主进程的代码运行结束就会结束了。
补充一句:上面说的守护进程/线程的结束是指主进程代码结束/主进程线程结束时守护进程/线程还没运行结束会被强制结束。
from threading import Thread
import time
def foo():
print(123)
time.sleep(2)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
t1=Thread(target=foo)
t2=Thread(target=bar)
t1.daemon=True
t1.start()
t2.start()
print('主进程运行结束')
运行结果:
123
456
主进程运行结束
end123
end456
线程的互斥锁使用方式与进程相同
from threading import Thread,Lock
import time
mutex=Lock()
x=100
def task():
global x
mutex.acquire()
temp=x
time.sleep(0.1)
x=temp-1
mutex.release()
if __name__ == '__main__':
t_l=[]
for i in range(100):
t=Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
stop=time.time()
print(x)
2.3死锁现象和递归锁
进程与线程都有死锁现象(进程中忘了介绍,在这补充),死锁现象是指两个或以上的进程/线程争抢资源时出现相互等待的情况。举个通俗点的例子就是甲乙两人一人在厨房,一人在厕所,甲现在想上厕所需要等乙出来,而乙现在想去厨房又需要等甲出来。两人相互等待对方停滞在原地的行为就是死锁现象。
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class Mythread1(Thread):
def run(self):
self.f1()
def f1(self):
mutexA.acquire()
print('%s 锁上厕所' %self.name)
time.sleep(1)
mutexB.acquire()
print('%s 锁上厨房' %self.name)
mutexA.release()
print('%s 开锁厕所' % self.name)
mutexB.release()
print('%s 开锁厨房' % self.name)
class Mythread2(Thread):
def run(self):
self.f2()
def f2(self):
mutexB.acquire()
print('%s 锁上厨房' % self.name)
time.sleep(1)
mutexA.acquire()
print('%s 锁上厨房' % self.name)
mutexB.release()
print('%s 开锁厕所' % self.name)
mutexA.release()
print('%s 开锁厨房' % self.name)
t1=Mythread1(name='线程1')
t2=Mythread2(name='线程2')
t1.start()
t2.start()
上述代码中线程1拿到A锁后等待1s,期间线程2拿到B锁,线程1与线程2相互等待对方释放锁从而停滞在了原地。为了解决这个问题可以将锁改为递归锁。
递归锁和互斥锁的区别在于互斥锁的使用者每次只能申请上锁一次,而递归锁的使用者可以多次申请上锁。互斥锁和递归锁同样要求只有使用者上的所有锁释放以后其他使用者才能使用。
from threading import Thread,RLock
import time
#递归锁相当于一把锁可以锁多个资源
mutexA=RLock()
class Mythread1(Thread):
def run(self):
self.f1()
def f1(self):
mutexA.acquire()
print('%s 锁上厕所' %self.name)
time.sleep(1)
mutexA.acquire()
print('%s 锁上厨房' %self.name)
mutexA.release()
print('%s 开锁厕所' % self.name)
mutexA.release()
print('%s 开锁厨房' % self.name)
class Mythread2(Thread):
def run(self):
self.f2()
def f2(self):
mutexA.acquire()
print('%s 锁上厨房' % self.name)
time.sleep(1)
mutexA.acquire()
print('%s 锁上厨房' % self.name)
mutexA.release()
print('%s 开锁厕所' % self.name)
mutexA.release()
print('%s 开锁厨房' % self.name)
t1=Mythread1(name='线程1')
t2=Mythread2(name='线程2')
t1.start()
t2.start()
运行结果:
线程1 锁上厕所
线程1 锁上厨房
线程1 开锁厕所
线程1 开锁厨房
线程2 锁上厨房
线程2 锁上厨房
线程2 开锁厕所
线程2 开锁厨房
线程的信号量和事件使用方法和进程相同,这里不在介绍了,只要注意导入方式需从multiprocessing变为threading
2.4条件
条件:使用线程只有满足某个条件时才能被释放。
from threading import Thread,Condition
def user_input():
while True:
if input("输入1退出")=='1':
return True
def work():
con.acquire()
con.wait_for(user_input)
con.release()
con=Condition()
t=Thread(target=work)
t.start()
from threading import Thread,Condition
import time
def work():
con.acquire()
print(11111111111111111111111111111111)
con.wait()
con.release()
con=Condition()
t=Thread(target=work)
t.start()
con.acquire()
time.sleep(3)
#该函数配合con.wait()使用,可以随机唤醒一个等待中的线程
con.notify()
con.release()
2.5定时器
#1s后打印hello
from threading import Timer
def p():
print('hello')
t=Timer(1,p)
t.start()
2.6 队列与堆栈
queue中一些方法的具体使用方式和进程中介绍的Queue一样,这里就不再介绍了。
import queue
q=queue.Queue(2)
#入队列
q.put(1)
#出队列
q.get()
#含有优先级的队列
qq=queue.PriorityQueue(2)
#元组第一个是优先级,数字越小优先级越高(也可以是字母的比较)
qq.put((30,50))
qq.put((10,40))
print(qq.get())
print(qq.get())
#堆栈
qqq=queue.LifeQueue(2)
q.put(1)
q.get()
三、进程池与线程池
池的功能是限制启动的进程数或线程数。当并发的任务数远远超过了计算机的承受能力时,即无法一次性开启过多的进程数或线程数时,就应该用池将开启的进程数或线程数限制在计算机可承受的范围内。
#使用Pool的同步调用的进程池
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3)
res_l=[]
for i in range(4):
#apply为同步调用(一个进程执行完后才执行下一个进程),res为进程的返回值
res=p.apply(work,args=(i,))
res_l.append(res)
print(res_l)
#使用Pool的异步调用的进程池
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
def parse(res):
time.sleep(1)
print('返回值:%s'%(res**2))
if __name__ == '__main__':
p=Pool(3)
for i in range(4):
#apply_async为异步调用,主进程提交申请后会继续执行下方的代码
#当子进程运行结束后主进程才会调用回调函数处理子进程的返回值
#res返回的是子进程的对象
res=p.apply_async(work,args=(i,),callback=parse)
#下面两行功能是等待子进程运行结束
p.close()
p.join()
#concurrent.futures模块提供了高度封装的异步调用的进程池/线程池
#ProcessPoolExecutor/ThreadPoolExecutor表示进程池/线程池
#若想使用同步调用的进程池可以使用multiprocessing.Pool模块
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from multiprocessing import current_process
import time
def task(n):
print('%s run...' %current_process().name)
time.sleep(5)
#进程池只支持返回picklable的对象
return n**2
def parse(future):
time.sleep(1)
#获取返回值
res=future.result()
print('%s 处理了 %s' %(current_process().name,res))
if __name__ == '__main__':
# 开启进程池,限制启动进程数为4(默认为cpu核数,而线程池限制数默认为cpu核数*5)
pool=ProcessPoolExecutor(4)
for i in range(1,5):
#向进程池提交进程(开启进程)
future=pool.submit(task,i)
#回调函数,当子进程运行结束后会执行回调函数处理子进程的返回值
#进程池中是主进程执行回调函数,线程池中是空闲的子线程执行回调函数
future.add_done_callback(parse)
#关闭进程池(无法在向池中提交进程)
#如果wait=True则主进程阻塞等待子进程运行结束后在执行下方代码,反则则会立即结束所有子进程
pool.shutdown(wait=True)
print('主',current_process().name)