目录
multiprocessing模块
线程的开发
threading模块
setDaemon
死锁
线程间的通信
multiprocessing模块
运行python的时候,我们都是在创建并运行一个进程,(linux中一个进程可以fork一个子进程,并让这个子进程exec另外一个程序)。在python中,我们通过标准库中的subprocess包来fork一个子进程,并且运行一个外部的程序。subprocess包中定义有数个创建子进程的函数,这些函数分别以不同的方式创建子进程,所欲我们可以根据需要来从中选取一个使用。另外subprocess还提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。
multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。
from multiprocessing import Process
import os
from time import sleep, time
def test1(name):
print("当前进程的ID", os.getpid())
print("父进程的ID", os.getppid())
print("当前进程的名字:", name)
sleep(3)
if __name__ == '__main__':
start = time()
# 创建多个子进程,并且把这些子进程放入列表中
process_list = []
print("主进程的ID", os.getpid())
for i in range(10):
p = Process(target=test1, args=('process-%s' % i,))
p.start()
process_list.append(p)
类包装:自定义一个Process进程类,该类中的run函数由一个子进程调用执行
from multiprocessing import Process
import os
from time import sleep, time
# 自定义一个进程类 继承Process类
class MyProcess(Process):
def __init__(self, name):
Process.__init__(self)
self.name = name
def run(self):
print("当前进程的ID", os.getpid())
print("父进程的ID", os.getppid())
print("当前进程的名字:", self.name)
sleep(3)
if __name__ == '__main__':
print("主进程ID", os.getpid())
start = time()
process_list = []
for i in range(10):
p = MyProcess("process-%s" % i)
p.start()
process_list.append(p)
for p in process_list:
p.join()
end = time() - start
print(end)
线程的开发
Python 的标准库提供了两个模块:thread 和 threading,thread 是低级模块,threading 是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading 这个高级模块。
多线程概念
多线程使得系统可以在单独的进程中执行并发任务。虽然进程也可以在独立的内存空间中并发执行,但是其系统开销会比较大。生成一个新进程必须为其分配独立的地址空间,并维护其代码段、堆栈段和数据段等,这种开销是巨大的。另外,进程间的通信实现也不方便。在程序功能日益复杂的时候,需要有更好的系统模型来满足要求,线程由此产生了。 线程是“轻量级”的,一个进程中的线程使用同样的地址空间,且共享许多资源。启动线程的时间远远小于启动进程的时间和空间,而且,线程间的切换也要比进程间的切换快得多。由于使用同样的地址空间,所以线程之间的数据通信比较方便,一个进程下的线程之间可以直接使用彼此的数据。当然,这种方便性也会带来一些问
题,特别是同步问题。 多线程对于那些I/O受限的程序特别适用。其实使用多线程的一个重要目的,就是最大化地利用CPU的资源。当某一线程在等待I/O的时候,另外一个线程可以占用CPU资源。如最简单的GUI程序,一般需要有一个任务支持前台界面的交互,还要有一个任务支持后台的处理。
多线程运行的作用:
- 使用线程可以把占据长时间的程序中的任务放到后台去处理。
- 用户界面可以更加吸引人,比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度。
- 程序的运行速度可能加快。
- 在一些等待的任务实现上如用户输入、文件读写和网络收发数据等。在这种情况下我们可以释放一些珍贵的资源如内存占用。
线程可以分为内核线程和用户线程,内核线程由操作系统内核创建和撤销;用户线程不需要内核支持而在用户程序中实现的线程。
创建线程
Python3 通过两个标准库 _thread 和 threading 提供对线程的支持。
threading模块
import time
def say():
print("这是单线程!")
time.sleep(1)
if __name__ == "__main__":
start = time.time()
for i in range(5):
say()
end = time.time()
print(f'使用时间:{end - start}')
import threading
import time
def say():
print("这是多线程")
time.sleep(1)
if __name__ == "__main__":
'''入口'''
start = time.time()
for i in range(5):
# 通过threading下的Thread方法创建线程
t = threading.Thread(target=say)
t.start() # 启动线程
end = time.time()
print(f'使用时间:{end - start}')
可以明显看出使用了多线程并发的操作,花费时间要短很多。
程序是通过threading模块下的Thread的类,去实例化一个此对象,并调用方法,实现生成多线程,target参数表示线程需要执行的方法,通过对象的start的方法,开启线程。
import threading
from time import sleep,ctime
def sing():
for i in range(3):
print("唱歌...%d"%i)
sleep(1)
def dance():
for i in range(3):
print("跳舞...%d"%i)
sleep(1)
if __name__ == '__main__':
print('---开始---:%s'%ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
sleep(5)
print('---结束---:%s'%ctime())
join方法
该方法将等待,一直到它调用的线程终止,它的名字表示调用的线程会一直等待,直到指定的线程加入它。join所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程再终止。
from threading import Thread
from time import sleep, time
def run(name):
print("Threading:{} start".format(name))
sleep(3)
print("Threading:{} end".format(name))
if __name__ == '__main__':
# 开始时间
start = time()
# 创建线程列表
t_list = []
# 循环创建线程
for i in range(10):
t = Thread(target=run, args=('t{}'.format(i),))
t.start()
t_list.append(t)
# 等待线程结束
for t in t_list:
t.join()
# 计算使用时间
end = time() - start
print(end)
setDaemon
将线程声明为守护线程,在start() 方法调用之前设置,这个方法和join是相反的。
有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,
这时就可以用setDaemon方法。
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
t1 = Thread(target=foo)
t2 = Thread(target=bar)
t1.daemon = True
t1.start()
t2.start()
print("~~~")
同步锁与GIL的关系
GIL本质是一把互斥锁,但GIL锁住的是解释器级别的数据,同步锁,锁的是解释器以外的共享资源,例如:硬盘上的文件 控制台,对于这种不属于解释器的数据资源就应该自己加锁处理 。
GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。
死锁
在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
产生死锁的四个必要条件:
- 互斥条件:一个资源每次只能被一个线程使用。
- 请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
- 不剥夺条件:线程已获得的资源,在末使用完之前,不能强行剥夺。
- 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。
from threading import Thread,Lock
from time import sleep
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print("------Task 1 -----")
sleep(0.5)
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print("------Task 2 -----")
sleep(0.5)
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print("------Task 3 -----")
sleep(0.5)
lock1.release()
lock1 = Lock()
#创建另外一把锁
lock2 = Lock()
lock2.acquire()
lock3 = Lock()
lock3.acquire()
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
线程间的通信
在加锁的情况下,程序就变成了串行,也就是单线程,而有时,我们在不用考虑数据安全时,不用加锁,程序就变成了并行,也就是多线程。为了避免业务开启过多的线程时。我们就可以通过信号量(Semaphore)来设置指定个数的线程。
from threading import Thread, BoundedSemaphore
from time import sleep
def an_jian(num):
semapshore.acquire()
print('第{}个人安检完成!'.format(num))
sleep(2)
semapshore.release()
if __name__ == '__main__':
semapshore = BoundedSemaphore(3)
for i in range(20):
thread = Thread(target=an_jian, args=(i,))
thread.start()