python线程通信 消息传递_Python并发编程之线程消息通信机制/任务协调(四)
大家好,并發(fā)編程進(jìn)入第四篇。
本文目錄
前言
Event事件
Condition
Queue隊列
總結(jié)
.前言
前面我已經(jīng)向大家介紹了,如何使用創(chuàng)建線程,啟動線程。相信大家都會有這樣一個想法,線程無非就是創(chuàng)建一下,然后再start()下,實在是太簡單了。
可是要知道,在真實的項目中,實際場景可要我們舉的例子要復(fù)雜的多得多,不同線程的執(zhí)行可能是有順序的,或者說他們的執(zhí)行是有條件的,是要受控制的。如果僅僅依靠前面學(xué)的那點淺薄的知識,是遠(yuǎn)遠(yuǎn)不夠的。
那今天,我們就來探討一下如何控制線程的觸發(fā)執(zhí)行。
要實現(xiàn)對多個線程進(jìn)行控制,其實本質(zhì)上就是消息通信機(jī)制在起作用,利用這個機(jī)制發(fā)送指令,告訴線程,什么時候可以執(zhí)行,什么時候不可以執(zhí)行,執(zhí)行什么內(nèi)容。
經(jīng)過我的總結(jié),線程中通信方法大致有如下三種:
threading.Event
threading.Condition
queue.Queue
先拋出結(jié)論,接下來我們來一一探討下。
.Event事件
Python提供了非常簡單的通信機(jī)制?Threading.Event,通用的條件變量。多個線程可以等待某個事件的發(fā)生,在事件發(fā)生后,所有的線程都會被激活。
關(guān)于Event的使用也超級簡單,就三個函數(shù)
event = threading.Event()
# 重置event,使得所有該event事件都處于待命狀態(tài)
event.clear()
# 等待接收event的指令,決定是否阻塞程序執(zhí)行
event.wait()
# 發(fā)送event指令,使所有設(shè)置該event事件的線程執(zhí)行
event.set()
舉個例子來看下。
import time
import threading
class MyThread(threading.Thread):
def __init__(self, name, event):
super().__init__()
self.name = name
self.event = event
def run(self):
print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
# 等待event.set()后,才能往下執(zhí)行
self.event.wait()
print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))
threads = []
event = threading.Event()
# 定義五個線程
[threads.append(MyThread(str(i), event)) for i in range(1,5)]
# 重置event,使得event.wait()起到阻塞作用
event.clear()
# 啟動所有線程
[t.start() for t in threads]
print('等待5s...')
time.sleep(5)
print('喚醒所有線程...')
event.set()
執(zhí)行一下,看看結(jié)果
Thread: 1 start at Sun May 13 20:38:08 2018
Thread: 2 start at Sun May 13 20:38:08 2018
Thread: 3 start at Sun May 13 20:38:08 2018
Thread: 4 start at Sun May 13 20:38:08 2018
等待5s...
喚醒所有線程...
Thread: 1 finish at Sun May 13 20:38:13 2018
Thread: 4 finish at Sun May 13 20:38:13 2018
Thread: 2 finish at Sun May 13 20:38:13 2018
Thread: 3 finish at Sun May 13 20:38:13 2018
可見在所有線程都啟動(start())后,并不會執(zhí)行完,而是都在self.event.wait()止住了,需要我們通過event.set()來給所有線程發(fā)送執(zhí)行指令才能往下執(zhí)行。
.Condition
Condition和Event 是類似的,并沒有多大區(qū)別。
同樣,Condition也只需要掌握幾個函數(shù)即可。
cond = threading.Condition()
# 類似lock.acquire()
cond.acquire()
# 類似lock.release()
cond.release()
# 等待指定觸發(fā),同時會釋放對鎖的獲取,直到被notify才重新占有瑣。
cond.wait()
# 發(fā)送指定,觸發(fā)執(zhí)行
cond.notify()
舉個網(wǎng)上一個比較趣的捉迷藏的例子來看看
import threading, time
class Hider(threading.Thread):
def __init__(self, cond, name):
super(Hider, self).__init__()
self.cond = cond
self.name = name
def run(self):
time.sleep(1) ?#確保先運行Seeker中的方法
self.cond.acquire()
print(self.name + ': 我已經(jīng)把眼睛蒙上了')
self.cond.notify()
self.cond.wait()
print(self.name + ': 我找到你了哦 ~_~')
self.cond.notify()
self.cond.release()
print(self.name + ': 我贏了')
class Seeker(threading.Thread):
def __init__(self, cond, name):
super(Seeker, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
self.cond.wait()
print(self.name + ': 我已經(jīng)藏好了,你快來找我吧')
self.cond.notify()
self.cond.wait()
self.cond.release()
print(self.name + ': 被你找到了,哎~~~')
cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()
通過cond來通信,阻塞自己,并使對方執(zhí)行。從而,達(dá)到有順序的執(zhí)行。
看下結(jié)果
hider: ? 我已經(jīng)把眼睛蒙上了
seeker: ?我已經(jīng)藏好了,你快來找我吧
hider: ? 我找到你了 ~_~
hider: ? 我贏了
seeker: ?被你找到了,哎~~~
.Queue隊列
終于到了我們今天的主角了。
從一個線程向另一個線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊列了。創(chuàng)建一個被多個線程共享的 Queue 對象,這些線程通過使用put()?和?get()?操作來向隊列中添加或者刪除元素。
同樣,對于Queue,我們也只需要掌握幾個函數(shù)即可。
from queue import Queue
# maxsize默認(rèn)為0,不受限
# 一旦>0,而消息數(shù)又達(dá)到限制,q.put()也將阻塞
q = Queue(maxsize=0)
# 阻塞程序,等待隊列消息。
q.get()
# 獲取消息,設(shè)置超時時間
q.get(timeout=5.0)
# 發(fā)送消息
q.put()
# 等待所有的消息都被消費完
q.join()
# 以下三個方法,知道就好,代碼中不要使用
# 查詢當(dāng)前隊列的消息個數(shù)
q.qsize()
# 隊列消息是否都被消費完,True/False
q.empty()
# 檢測隊列里消息是否已滿
q.full()
函數(shù)會比之前的多一些,同時也從另一方面說明了其功能更加豐富。
我來舉個老師點名的例子,大家感受一下。
from queue import Queue
from threading import Thread
import time
class Student(Thread):
def __init__(self, name, queue):
super().__init__()
self.name = name
self.queue = queue
def run(self):
while True:
# 阻塞程序,時刻監(jiān)聽老師,接收消息
msg = self.queue.get()
# 一旦發(fā)現(xiàn)點到自己名字,就趕緊答到
if msg == self.name:
print("{}:到!".format(self.name))
class Teacher:
def __init__(self, queue):
self.queue=queue
def call(self, student_name):
print("老師:{}來了沒?".format(student_name))
# 發(fā)送消息,要點誰的名
self.queue.put(student_name)
queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name="小明", queue=queue)
s2 = Student(name="小亮", queue=queue)
s1.start()
s2.start()
print('開始點名~')
teacher.call('小明')
time.sleep(1)
teacher.call('小亮')
運行結(jié)果如下
開始點名~
老師:小明來了沒?
小明:到!
老師:小亮來了沒?
小亮:到!
.總結(jié)
學(xué)習(xí)了以上三種通信方法,我們很容易就能發(fā)現(xiàn)Event?和Condition?是threading模塊原生提供的模塊,原理簡單,功能單一,它能發(fā)送?True?和?False?的指令,所以只能適用于某些簡單的場景中。
而Queue則是比較高級的模塊,它可能發(fā)送任何類型的消息,包括字符串、字典等。其內(nèi)部實現(xiàn)其實也引用了Condition模塊(譬如put和get函數(shù)的阻塞),正是其對Condition進(jìn)行了功能擴(kuò)展,所以功能更加豐富,更能滿足實際應(yīng)用。
總結(jié)
以上是生活随笔為你收集整理的python线程通信 消息传递_Python并发编程之线程消息通信机制/任务协调(四)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IM即时通讯结合mui 环信
- 下一篇: python3 获取当前日期_pytho