文章目录
- threading - 基于线程的并行
- 线程对象
- thread 类
- thread方法
- thread 属性
- 例子
- 锁对象
- 递归锁对象
- 条件对象
- 队列
- Queue对象
- SimpleQueque 对象
- 例子
最近的工作需要用到多线程提升程序的运行效率,以前一直没有机会进行多线程编程,所以一直没有机会学习python的多线程编程。这次只好从头学起,幸好python的多线程编程资料比较全,简单易学,目前,我的程序已经能够实力交互了。
以下是我学习多线程编程时的笔记,从不同的网站和分享文章中拼接而来,与大家分享。
threading - 基于线程的并行
- 主线程:应用程序运行即为主线程。(从程序第一行到最后一行执行完毕,中间遇到子线程的start,子线程去执行它的函数,主线程继续往下执行其他语句)
- 用户线程(子线程):在主线程中可以创建和启动新线程,默认为用户线程(子线程)
- daemon线程:守护线程,优先级别最低,一般为其它线程提供服务。通常,daemon线程体是一个无限循环。如果所有的非daemon线程(主线程以及子线程)都结束了,daemon线程自动就会终止。
线程对象
Thread类代表在独立控制线程运行的活动。有两种方式指定活动:传递一个可调用对象给构造函数或者在子类重载 run() 方法。其它方法不应该在子类被(除了构造函数)重载。换句话说,只能重载这个类的__init__() 和run()方法。
当线程对象一旦被创建,其活动必须通过调用线程的 start() 方法开始。 这会在独立的控制线程中发起调用 run() 方法。一旦线程活动开始,该线程会被认为是 ‘存活的’ 。当它的 run() 方法终结了(不管是正常的还是抛出未被处理的异常),就不是’存活的’。 is_alive() 方法用于检查线程是否存活。
thread 类
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。
- target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。
- name 是线程名称。 在默认情况下,会以 “Thread-N” 的形式构造唯一名称,其中 N 为一个较小的十进制数值,或是 “Thread-N (target)” 的形式,其中 “target” 为 target.__name__,如果指定了 target 参数的话。
- args 是用于调用目标函数的参数元组。默认是 ()。
- kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。
- 如果不是 None,daemon 参数将显式地设置该线程是否为守护模式。 如果是 None (默认值),线程将继承当前线程的守护模式属性。
如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__())。
thread方法
- start(): 开始线程活动,它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制线程中被调用。如果同一个线程对象中调用这个方法的次数大于一次,会抛出 RuntimeError。
- run(): 代表线程活动的方法。可以在子类型里重载这个方法。 标准的 run() 方法会对作为 target 参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从 args 和 kwargs 参数分别获取的位置和关键字参数。
- join(timeout=None):等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 – 不管是正常终结还是抛出未处理异常 – 或者直到发生超时,超时选项是可选的。当 timeout 参数存在而且不是 None 时,它应该是一个用于指定操作超时的以秒为单位的浮点数或者分数。因为 join() 总是返回 None ,所以你一定要在 join() 后调用 is_alive() 才能判断是否发生超时 – 如果线程仍然存活,则 join() 超时。当 timeout 参数不存在或者是 None ,这个操作会阻塞直到线程终结。一个线程可以被 join() 很多次。如果尝试加入当前线程会导致死锁, join() 会引起 RuntimeError 异常。如果尝试 join() 一个尚未开始的线程,也会抛出相同的异常。
- is_alive():返回线程是否存活。当 run() 方法刚开始直到 run() 方法刚结束,这个方法返回 True 。模块函数 enumerate() 返回包含所有存活线程的列表。
thread 属性
- name:只用于识别的字符串。它没有语义。多个线程可以赋予相同的名称。 初始名称由构造函数设置。
- ident:这个线程的 ‘线程标识符’,如果线程尚未开始则为 None 。这是个非零整数。参见 get_ident() 函数。当一个线程退出而另外一个线程被创建,线程标识符会被复用。即使线程退出后,仍可得到标识符。
- daemon:一个布尔值,表示这个线程是否是一个守护线程(True)或不是(False)。 这个值必须在调用 start() 之前设置,否则会引发 RuntimeError 。它的初始值继承自创建线程;
主线程不是一个守护线程,因此所有在主线程中创建的线程默认为 daemon = False。
当没有存活的非守护线程时,整个Python程序才会退出。
例子
python中使用线程,有四个基本步骤:
- 首先从threading库中import Thread类
- 其次创建一个Thread实例
- 然后启动Thread实例
- 最后等待Thread实例执行完成
from threading import Thread # 引入Thread类
from time import sleep, ctime
def func(name, sec):
print('---开始---', name, '时间', ctime())
sleep(sec)
print('***结束***', name, '时间', ctime())
# 创建 Thread 实例
t1 = Thread(target=func, args=('第一个线程', 1))
t2 = Thread(target=func, args=('第二个线程', 2))
# 启动线程运行
t1.start()
t2.start()
# 等待所有线程执行完毕
t1.join() # join() 等待线程终止,要不然一直挂起
t2.join()
自定义线程类,并重写__init__方法和run方法。
## 自定义线程类,并重写__init__方法和run方法
import time
class myThread(Thread):
def __init__(self, threadID, name, counter):
Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print('Starting ' + self.name)
print_time(self.name, self.counter, 5)
print("Exiting " + self.name)
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print(threadName,":", time.ctime(time.time()))
counter -= 1
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启线程
thread1.start()
thread2.start()
print("Exiting Main Thread")
锁对象
原始锁是一个在锁定时不属于特定线程的同步基元组件。在Python中,它是能用的最低级的同步基元组件,由 _thread 扩展模块直接实现。
原始锁处于 “锁定” 或者 “非锁定” 两种状态之一。
它被创建时为非锁定状态。它有两个基本方法, acquire() 和 release() 。
- 当状态为非锁定时, acquire() 将状态改为锁定并立即返回。当状态是锁定时, acquire() 将阻塞至其他线程调用 release() 将其改为非锁定状态,然后 acquire() 调用重置其为锁定状态并返回。
- release() 只在锁定状态下调用; 它将状态改为非锁定并立即返回。如果尝试释放一个非锁定的锁,则会引发 RuntimeError 异常。
当多个线程在 acquire() 等待状态转变为未锁定被阻塞,然后 release() 重置状态为未锁定时,只有一个线程能继续执行;至于哪个等待线程继续执行没有定义,并且会根据实现而不同。
import threading
class myThread (Thread): #继承父类threading.Thread
def __init__(self, threadID, name, counter):
Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print("Starting " + self.name)
# 获得锁,成功获得锁定后返回True
threadLock.acquire()
print_time(self.name, self.counter, 3)
# 释放锁
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print(threadName, time.ctime(time.time()))
counter -= 1
threadLock = threading.Lock()
threads = []
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
t.join()
print("Exiting Main Thread")
递归锁对象
若要锁定锁,线程调用其 acquire() 方法;一旦线程拥有了锁,方法将返回。若要解锁,线程调用 release() 方法。 acquire()/release() 对可以嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其他线程继续处理 acquire() 阻塞。
条件对象
条件变量总是与某种类型的锁对象相关联,锁对象可以通过传入获得,或者在缺省的情况下自动创建。当多个条件变量需要共享同一个锁时,传入一个锁很有用。锁是条件对象的一部分,你不必单独地跟踪它。
条件变量遵循 上下文管理协议 :使用 with 语句会在它包围的代码块内获取关联的锁。 acquire() 和 release() 方法也能调用关联锁的相关方法。
其它方法必须在持有关联的锁的情况下调用。 wait() 方法释放锁,然后阻塞直到其它线程调用 notify() 方法或 notify_all() 方法唤醒它。一旦被唤醒, wait() 方法重新获取锁并返回。它也可以指定超时时间。
条件锁的使用:
生成一个条件锁对象 cond = threading.Condition()
上锁cond.acquire()
解锁cond.release()
挂起线程,直到收到一个 notify 通知才会被唤醒cond.wait()
唤醒一个 Condition 的 waiting 池中的线程cond.notify()
唤醒所有 Condition 的 waiting 池中的线程cond.notify_all()
import threading
class Zhou(threading.Thread):
def __init__(self, cond):
super().__init__(name="周杰伦")
self.cond = cond
def run(self):
with self.cond:
# Condition()对象中也实现了__enter__()与__exit__()魔法方法,所以也是可以通过 with 语句调用的
print("{}: 海平面远方开始阴霾, 悲伤要怎么平静纯白".format(self.name))
self.cond.notify() # 唤起
self.cond.wait() # 挂起
print("{}: 你用唇语说你要离开, 那难过无声慢了下来 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{}: 转身离开, 你有话说不出来 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{}: 我们的爱, 差异一直存在, 等待竟累积成伤害 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{}: 蔚蓝的珊瑚海, 错过瞬间苍白 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{}: 热情不再, 笑容勉强不来, 爱深埋珊瑚海".format(self.name))
class Liang(threading.Thread):
def __init__(self, cond):
super().__init__(name="梁心颐")
self.cond = cond
def run(self):
with self.cond:
# 在调用with cond 之后才能调用 wait 或者 notify 方法
self.cond.wait() # 挂起
print("{}: 我的脸上始终挟带, 一抹浅浅的无奈".format(self.name))
self.cond.notify() # 唤醒
self.cond.wait()
print("{}: 汹涌潮水, 你听明白, 不是浪而是泪海 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{}: 海鸟跟鱼相爱, 只是一场意外 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{}: 转身离开, 分手说不出来 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{}: 当初彼此, 不够成熟坦白".format(self.name))
self.cond.notify()
if __name__ == "__main__":
cond = threading.Condition()
liang = Liang(cond)
zhou = Zhou(cond)
# 这里的启动顺序很重要
liang.start()
zhou.start()
输出结果如下所示:
在Python中,主要采用队列和线程的方法来实现多线程。
队列
queue 模块实现了多生产者、多消费者队列。这特别适用于消息必须安全地在多线程间交换的线程编程。模块中的 Queue 类实现了所有所需的锁定语义。
本模块实现了三种类型的队列,它们的区别仅仅是条目的提取顺序。
- 在 FIFO 队列中,先添加的任务会先被提取。 (先进先出)
- 在 LIFO 队列中,最近添加的条目会先被提取 (类似于一个栈)。 (后进先出)
- 在优先级队列中,条目将保持已排序状态 (使用 heapq 模块) 并且值最小的条目会先被提取。
在内部,这三个类型的队列使用锁来临时阻塞竞争线程;然而,它们并未被设计用于线程的重入性处理。
queue 模块定义了下列类和异常:
- class queue.Queue(maxsize=0)
- class queue.LifoQueue(maxsize=0):LIFO 队列构造函数
- class queue.PriorityQueue(maxsize=0):优先级队列构造函数
- class queue.SimpleQueue:无界的 FIFO 队列构造函数。简单的队列,缺少任务跟踪等高级功能
Queue对象
队列对象 (Queue, LifoQueue, 或者 PriorityQueue) 提供下列描述的公共方法。
- Queue.qsize():返回队列的大致大小。
- Queue.empty():如果队列为空,返回True,否则返回False
- Queue.full():如果队列是满的,返回True,否则返回False
- Queue.put(item, block=True, timeout=None):将 item 加入队列。
- Queue.get(block=True, timeout=None):从队列中移除并返回一个项目
- Queue.task_done():表示前面排队的任务已经被完成。每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列,该任务的处理已经完成。
- Queue.join():阻塞至队列中所有的元素都被接收和处理完毕。当一个条目被添加到队列的时候未完成任务的计数将会增加。 每当一个消费者线程调用 task_done() 来表明该条目已被提取且其上的所有工作已完成时未完成计数将会减少。 当未完成计数降为零时,join() 将解除阻塞。
SimpleQueque 对象
SimpleQueue 对象提供下列描述的公共方法。
- SimpleQueue.qsize():返回队列的大致小心。
- SimpleQueue.empty():如果队列为空则返回True,否则返回False。
- SimpleQueue.put(): 将item放入队列。此方法永不阻塞,始终成功。
- SimpleQueue.get(block=True, timeout=None):从队列中移除并返回一个项目。
例子
单线程队列的小应用如下所示。
import queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# Turn on the worker thread
Thread(target=worker, daemon=True).start()
# Send thirty task requests to the worker
for item in range(30):
q.put(item)
# Block until all tasks are done
q.join()
print('All work completed')
队列线程的思想:首先创建一个全局共享的队列,队列中只存在有限个元素,并将所有的数据逐条加入到队列中,并调用队列的join函数进行等待。之后便可以开启若干线程,线程的任务就是不断的从队列中取数据进行处理就可以了。
import threading
import time
import queue
# 下面来通过多线程来处理Queue里面的任务:
def work(q):
while True:
if q.empty():
return
else:
t = q.get()
print("当前线程sleep {} 秒".format(t))
time.sleep(t)
def main():
q = queue.Queue()
for i in range(5):
q.put(i) # 往队列里生成消息
# 多线程
thread_num = 5
threads = []
for i in range(thread_num):
t = threading.Thread(target=work, args=(q,))
# args需要输出的是一个元组,如果只有一个参数,后面加,表示元组,否则会报错
threads.append(t)
for i in range(thread_num):
threads[i].start()
for i in range(thread_num):
threads[i].join()
if __name__ == "__main__":
start = time.time()
main()
print('耗时:', time.time() - start)