Python Threading 多线程编程
寫在篇前
??threading模塊是python多線程處理包,使用該模塊可以很方便的實(shí)現(xiàn)多線程處理任務(wù),本篇文章的基礎(chǔ)是需要掌握進(jìn)程、線程基本概念,對(duì)PV原語(yǔ)、鎖等傳統(tǒng)同步處理方法有一定的了解。另外,threading模塊的實(shí)現(xiàn)是參考java多線程處理方式,并且只實(shí)現(xiàn)了其中的一個(gè)子集。必須說明的是,由于GIL的存在,多線程的應(yīng)用主要用于IO密集型任務(wù),不適合CPU密集型任務(wù),如果要提高CPU的利用率,需要利用協(xié)程或則多進(jìn)程編程。
Threading 方法屬性
- threading.active_count() 返回當(dāng)前活動(dòng)的線程數(shù)
- threading.current_thread() 返回當(dāng)前線程的Thread對(duì)象
- threading.get_ident() 獲取當(dāng)前線程的(唯一)標(biāo)識(shí)符
- threading.enumerate() 返回一個(gè)包含當(dāng)前處于活動(dòng)狀態(tài)Thread 對(duì)象的list
- threading.main_thread() 返回當(dāng)前線程的主線程
- threading.settrace(func) 為每個(gè)線程設(shè)置一個(gè)trace函數(shù),在調(diào)用run方法之前會(huì)被執(zhí)行
- threading.setprofile(func) 同上,為每一個(gè)線程設(shè)置一個(gè)profile函數(shù)
- threading.stack_size([size]) 設(shè)置每個(gè)線程私有棧空間大小,默認(rèn)為0,若不為0不可低于32KB(32768)
- threading.TIMEOUT_MAX 允許線程被堵塞的最長(zhǎng)時(shí)間,按我的理解一般用不上
Thread對(duì)象
?使用Thread一共有兩種方式,同Java多線程模型:
?????(1) 繼承Thread類,并重寫且僅僅重寫__init__和 run方法,特別注意的是重寫__init__方法首先應(yīng)該調(diào)用父類構(gòu)造方法Thread.__init__
?????(2)將一個(gè)函數(shù)傳入到Thread類的構(gòu)造函數(shù)
?Thread對(duì)象的用法以及主要方法介紹,我們用以下例子來說明一下:
#! /usr/bin/python # _*_ coding: utf-8 _*_ __author__ = 'Jeffery' __date__ = '2018/9/15 11:26'import threading import timedef func1(num):time.sleep(3)print(str(num)+':you can if you will')def func2(num):time.sleep(2)print(str(num)+':the devil is in the details')def func3(num):time.sleep(1)print(str(num)+':life is short, u need what?')def main():start = time.time()print('start mian Thread')ts = (threading.Thread(group=None, target=func1, args=(1,)),threading.Thread(group=None, target=func2, args=(2,)),threading.Thread(group=None, target=func3, args=(3,))) # 【重要】創(chuàng)建三個(gè)線程,分別執(zhí)行func1~3for _t in ts:_t.start() # 【重要方法】啟動(dòng)線程# _t.join() # 【重要方法】join方法用來告訴父線程,您要等我子線程執(zhí)行完畢,你再繼續(xù)往下走,這里注釋了,所以你應(yīng)該會(huì)發(fā)現(xiàn)程序輸出 mian Thread ends, cost 0 secsend = time.time()print('mian Thread ends, cost %d secs' % (end-start))if __name__ == '__main__':main()# 以下是執(zhí)行結(jié)果 start mian Thread mian Thread ends, cost 0 secs 3:life is short, u need what? 2:the devil is in the details 1:you can if you willLock對(duì)象
?因?yàn)橘Y源總是有限的,如果多個(gè)線程對(duì)同一個(gè)對(duì)象進(jìn)行操作,則有可能造成資源的爭(zhēng)用,甚至導(dǎo)致死鎖。引入鎖,是一種便捷的解決方式。主要包括兩個(gè)方法,acquire()和release():
acquire(blocking=True)
??當(dāng)阻塞參數(shù)設(shè)置為True(默認(rèn)值)時(shí)調(diào)用,然后就會(huì)進(jìn)入到locked狀態(tài)直到解鎖,返回True。
release()
??釋放占有的鎖,無返回值
#! /usr/bin/python # _*_ coding: utf-8 _*_ __author__ = 'Jeffery' __date__ = '2018/9/15 13:56'import threading import time# 臨界資源 num = 0lock = threading.Lock()def num_add(t_i):global numtime.sleep(3)if lock.acquire(): # 這句話相當(dāng)于 if lock.acquire()num += 1lock.release()print('thread %d set num %d' % (t_i, num))def num_sub(t_i):global numtime.sleep(3)if lock.acquire(): # 這句話相當(dāng)于 if lock.acquire()num -= 1lock.release()print('thread %d set num %d' % (t_i, num))def main():ts = []for i in range(5):t = threading.Thread(target=num_add, args=(i+1,))t.start()ts.append(t)for i in range(5):t = threading.Thread(target=num_sub, args=(5+i+1,))t.start()ts.append(t)for t in ts:t.join()print('end')if __name__ == '__main__':main() # 以下是結(jié)果 thread 4 set num 1 thread 3 set num 2 thread 5 set num 3 thread 1 set num 4 thread 2 set num 5 thread 9 set num 4 thread 10 set num 3 thread 7 set num 2 thread 6 set num 1 thread 8 set num 0 endRLock對(duì)象
??RLock稱之為遞歸鎖,從某種角度來講屬于一種比Lock更安全的鎖,和Lock的區(qū)別在于:在同一線程內(nèi),對(duì)RLock進(jìn)行多次acquire()操作(或則說RLock允許遞歸加鎖),程序不會(huì)阻塞;而如果是Lock那么將會(huì)發(fā)生堵塞。RLock所擁有的方法同上,下面例子在上例基礎(chǔ)上稍微改動(dòng):
#! /usr/bin/python # _*_ coding: utf-8 _*_ __author__ = 'Jeffery' __date__ = '2018/9/15 13:56'import threading import time# 臨界資源 num = 0lock = threading.RLock() # 【重點(diǎn)】這里必須是RLock,不能是Lockdef num_add(t_i):with lock: # 這是屬于上下文管理器的一個(gè)使用用法global numtime.sleep(3)if lock.acquire(): # 這句話相當(dāng)于 if lock.acquire()num += 1lock.release()print('thread %d set num %d' % (t_i, num))def num_sub(t_i):global numtime.sleep(3)if lock.acquire(): # 這句話相當(dāng)于 if lock.acquire()num -= 1lock.release()print('thread %d set num %d' % (t_i, num))def main():ts = []for i in range(5):t = threading.Thread(target=num_add, args=(i+1,))t.start()ts.append(t)for i in range(5):t = threading.Thread(target=num_sub, args=(5+i+1,))t.start()ts.append(t)for t in ts:t.join()print('end')if __name__ == '__main__':main()Semaphore對(duì)象
?信號(hào)量是計(jì)算機(jī)科學(xué)最古老的一種同步原語(yǔ),由荷蘭計(jì)算機(jī)科學(xué)家Edsger W. Dijkstra提出,通常也稱之為PV原語(yǔ)。在對(duì)象內(nèi)部會(huì)維護(hù)一個(gè)計(jì)數(shù)器,這個(gè)計(jì)數(shù)器的初值由Semaphore初始化時(shí)給出,這個(gè)值我們一般稱之為 臨界資源數(shù)量。當(dāng)臨界資源數(shù)量大于0時(shí),線程不會(huì)被阻塞;當(dāng)臨界資源等于0時(shí)再有線程請(qǐng)求,那將會(huì)被阻塞,直到某個(gè)線程釋放臨界資源。主要方法同RLock、Lock,下面用PV原語(yǔ)最(也許吧)經(jīng)典的例子,哲學(xué)家問題來說明一下:
??哲學(xué)家問題:話說有五個(gè)哲學(xué)家,他們的生活方式是交替地進(jìn)行思考和進(jìn)餐。他們共用一張圓桌,分別坐在五張椅子上,在圓桌上有五個(gè)碗和五支筷子,平時(shí)一個(gè)哲學(xué)家進(jìn)行思考,饑餓時(shí)便試圖取用其左、右最靠近他的筷子,只有在他拿到兩支筷子時(shí)才能進(jìn)餐;進(jìn)餐完畢,放下筷子又繼續(xù)思考。請(qǐng)?jiān)O(shè)計(jì)一種方法,使哲學(xué)家不要餓死。
#! /usr/bin/python # _*_ coding: utf-8 _*_ __author__ = 'Jeffery' __date__ = '2018/9/15 15:35'import threading import timedef philosopher(i, chopstick_sema, sema):"""描述哲學(xué)家活動(dòng):type i: int 表示第幾個(gè)哲學(xué)家:return:no return"""while True:think(i + 1)sema.acquire(blocking=True) # 請(qǐng)求吃飯chopstick_sema[i].acquire() # 拿起左邊筷子chopstick_sema[((i + 1) % 5)].acquire() # 拿起右邊筷子eat(i + 1)chopstick_sema[((i + 1) % 5)].release() # 放下右邊筷子chopstick_sema[i].release() # 放下左邊筷子sema.release() # 吃飯完畢think(i + 1)def eat(phil_NO):print('philosopher %d is eating' % phil_NO)time.sleep(1)print('philosopher %d finish eating' % phil_NO)def think(phil_NO):print('philosopher %d is thinking' % phil_NO)time.sleep(2)print('philosopher %d finish thinking' % phil_NO)def main():# 為了避免死鎖,同時(shí)只允許四個(gè)哲學(xué)家吃飯sema = threading.Semaphore(value=4)chopstick_sema = [threading.Semaphore(value=1) for i in range(5)]ts = []for i in range(5):t = threading.Thread(target=philosopher, args=(i, chopstick_sema, sema))t.start()ts.append(t)for t in ts:t.join()if __name__ == '__main__':main()Condition對(duì)象
??Condition對(duì)象總是與某種鎖相關(guān)聯(lián),鎖對(duì)象可以通過構(gòu)造函數(shù)傳入,或者它會(huì)默認(rèn)創(chuàng)建一個(gè)鎖,并且鎖是Condition對(duì)象的一部分,不必單獨(dú)跟蹤它。
acquire()
????嘗試獲取鎖
release()
????釋放已獲得的鎖
wait(timeout=-1)
????主動(dòng)進(jìn)入等待阻塞狀態(tài),直到被其他線程喚醒或則超時(shí)
wait_for(predicate, timeout=None)
????進(jìn)入等待狀態(tài),直到條件被置為True,timeout參數(shù)意思如上,predicate參數(shù)表示一個(gè)callable對(duì)象。
notify()
????喚醒其中一個(gè)線程
notify_all()
?????喚醒所有進(jìn)程
舉個(gè)例子:
假設(shè)有一個(gè)緩沖區(qū)大小為5,要實(shí)現(xiàn)互斥存取數(shù)據(jù),代碼如下: #! /usr/bin/python # _*_ coding: utf-8 _*_ __author__ = 'Jeffery' __date__ = '2018/9/15 16:26'import threading import timecon = threading.Condition(threading.RLock())num = 0class Producer(threading.Thread):"""生產(chǎn)者類"""def __init__(self):"""構(gòu)造函數(shù)"""threading.Thread.__init__(self)def run(self):"""重寫 run方法:return:"""global numwith con:while True:num += 1print('生產(chǎn)一個(gè)產(chǎn)品,現(xiàn)在有產(chǎn)品%d個(gè)' % num)time.sleep(2)if num >= 5:print('緩沖區(qū)已滿')con.notify() # 喚醒等待池con.wait() # 主動(dòng)進(jìn)入等待池,掛起class Consumers(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):global numwith con:while True:num -= 1print("取出一個(gè)產(chǎn)品,還剩%d個(gè)" % num)time.sleep(3)if num <= 0:print('緩沖區(qū)已空')con.notify()con.wait()if __name__ == '__main__':p = Producer()c = Consumers()p.start()c.start()p.join()c.join()Event對(duì)象
?Event,翻譯作事件,個(gè)人覺得該詞是計(jì)算機(jī)編程中的一個(gè)專有名詞。在這里用來線程之間的通信,當(dāng)某一個(gè)event發(fā)生之后,其他線程做出一定的反應(yīng);或者說某一個(gè)線程等待另一個(gè)線程某個(gè)事件的發(fā)生。每一個(gè)Event對(duì)象內(nèi)部維護(hù)一個(gè)flag,表征事件是否已經(jīng)發(fā)生。
is_set()
???? 當(dāng)且僅當(dāng)Event對(duì)象內(nèi)部維護(hù)的flag為True時(shí),返回True,否則返回False
set()
????將flag置為True
clear()
????將flag置為False
wait(timeout=-1)
????當(dāng)flag是False時(shí),阻塞;一旦為True或超時(shí),立刻喚醒
舉個(gè)例子,好比像心意女生表白:
#! /usr/bin/python # _*_ coding: utf-8 _*_ __author__ = 'Jeffery' __date__ = '2018/9/15 14:58'import threading import timeevent = threading.Event()# 假設(shè)你想表白某位心儀女孩 # 你想,表白時(shí)送點(diǎn)花,更有氣氛,于是乎你打電話去花店,叫老板送一束花過來 # 然后再表白def confess_to_girl():print('u r preparing to confess')event.wait()print('start to confess: hi , my loving girl, I....')def flower_delivering():print('flower is delivering, plea wait')time.sleep(5)print('flower comes')event.set()def main():t1 = threading.Thread(target=confess_to_girl)t2 = threading.Thread(target=flower_delivering)t1.start()t2.start()t1.join()t2.join()print('\nstory,maybe new start, maybe ending!')if __name__ == '__main__':main()Timer對(duì)象
?計(jì)時(shí)器,即讓一個(gè)線程在一個(gè)指定的之間之后再開始執(zhí)行,是屬于Thread的一個(gè)封裝子類,故用法基本相似。
def hello():print("hello, world")t = Timer(30.0, hello) # t.cancel() # 在開始之前,還可以使計(jì)時(shí)器停止 t.start() # after 30 seconds, "hello, world" will be printedBarrier對(duì)象
??Barrier,顧名思義就是障礙的意思,只有人多力量大的時(shí)候,才能一起跨過一個(gè)個(gè)障礙,繼續(xù)前行。Barrier則是一個(gè)線程障礙,只有線程數(shù)量達(dá)到指定數(shù)量之后,才能喚醒所有等待進(jìn)程。這個(gè)功能不是很常用,簡(jiǎn)單介紹一下吧,主要方法有:
wait(timeout=None)
????使線程進(jìn)入等待狀態(tài),直到達(dá)到一定數(shù)量,這時(shí)將會(huì)使barrier進(jìn)入broken狀態(tài),從而一齊釋放被堵塞的線程
reset()
????使Barrier恢復(fù)默認(rèn)‘?dāng)r截’狀態(tài)
abort()
????使Barrier進(jìn)入Broken狀態(tài)
parties
????即上面提到的 指定數(shù)量的線程,int類型
n_waiting
????正在等待的線程
broken
????bool類型,指示Barrier是否處于Broken狀態(tài)
總結(jié)
以上是生活随笔為你收集整理的Python Threading 多线程编程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Pandas处理数据缺失值
- 下一篇: python mutilprocessi