Python—进程、线程、协程
一、線程
線程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位。它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際運(yùn)作單位。一條線程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程,每條線程并行執(zhí)行不同的任務(wù)
方法:
start ? ? ? ? ? ?線程準(zhǔn)備就緒,等待CPU調(diào)度
setName ? ? ?設(shè)置線程名稱
getName ? ? ?獲取線程名稱
setDaemon ? 把一個(gè)主進(jìn)程設(shè)置為Daemon線程后,主線程執(zhí)行過(guò)程中,后臺(tái)線程也在進(jìn)行,主線程執(zhí)行完畢后,后臺(tái)線程不論有沒(méi)執(zhí)行完成,都會(huì)停止
join ? ? ? ? ? ? ?逐個(gè)執(zhí)行每個(gè)線程,執(zhí)行完畢后繼續(xù)往下執(zhí)行,該方法使得多線程變得無(wú)意義
run ? ? ? ? ? ? ?線程被cpu調(diào)度后自動(dòng)執(zhí)行線程對(duì)象的run方法
?
threading模塊
線程的兩種調(diào)用方式:
1.直接調(diào)用(常用)
2.繼承式調(diào)用
'''繼承式調(diào)用''' import threading import time class MyThread(threading.Thread):def __init__(self,name):threading.Thread.__init__(self)self.name = namedef run(self):print("Hello %s"%self.name)time.sleep(3)if __name__ == "__main__":t1=MyThread("zhangsan")t2=MyThread("lisi")t1.start()t2.start()?
?
setDaemon線程
?
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading def run(n):print('Hello..[%s]\n' % n)time.sleep(2)def main():for i in range(5):t = threading.Thread(target=run,args=[i,])t.start()t.join(1)m = threading.Thread(target=main,args=[]) m.setDaemon(True) #將主線程設(shè)置Daemon設(shè)置為True后,主線程執(zhí)行完成時(shí),其它子線程會(huì)同時(shí)退出,不管是否執(zhí)行完任務(wù) m.start() print("--- done----")?
?
線程鎖Lock
一個(gè)進(jìn)程下可以啟動(dòng)多個(gè)線程,多個(gè)線程共享父進(jìn)程的內(nèi)存空間,每個(gè)線程可以訪問(wèn)同一份數(shù)據(jù),所以當(dāng)多個(gè)線程同時(shí)要修改同一份數(shù)據(jù)時(shí),就會(huì)出現(xiàn)錯(cuò)誤
例如:
?
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import timenum = 100 #設(shè)置一個(gè)共享變量 def show():global num #在函數(shù)內(nèi)操作函數(shù)外變量,需設(shè)置為全局變量time.sleep(1)num -= 1 list=[] for i in range(100):t = threading.Thread(target=show)t.start()list.append(t)for t in list:t.join() print(num)?
上面的例子在正常執(zhí)行完成后的num的結(jié)果應(yīng)該是0,但實(shí)際上每次的執(zhí)行結(jié)果都不太一樣,因?yàn)楫?dāng)多個(gè)線程同時(shí)要修改同一份數(shù)據(jù)時(shí),就會(huì)出現(xiàn)一些錯(cuò)誤(只有
在python2.x運(yùn)行才會(huì)出現(xiàn)錯(cuò)誤,python3.x中不會(huì)),所以每個(gè)線程在要修改公共數(shù)據(jù)時(shí),為了避免自己在還沒(méi)改完的時(shí)候別人也來(lái)修改此數(shù)據(jù),可以加上線程鎖
來(lái)確保每次修改數(shù)據(jù)時(shí)只有一個(gè)線程在操作。
加鎖代碼
?
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import timenum = 100 #設(shè)置一個(gè)共享變量 lock=threading.Lock() #生成全局鎖 def show():global num #在函數(shù)內(nèi)操作函數(shù)外變量,需設(shè)置為全局變量time.sleep(1)lock.acquire() #修改前加鎖num -= 1lock.release() #修改后解鎖 list=[] for i in range(100):t = threading.Thread(target=show)t.start()list.append(t)for t in list:t.join()print(num)?
遞歸鎖RLock
就是在一個(gè)大鎖中再包含子鎖
?
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading #遞歸鎖 def run1():lock.acquire() #小鎖global numnum +=1lock.release()return num def run2():lock.acquire() #小鎖global num2num2+=1lock.release()return num2 def run3():lock.acquire() #大鎖res = run1()res2 = run2()lock.release()print(res,res2)if __name__ == '__main__':num,num2 = 0,0lock = threading.RLock() #生成Rlockfor i in range(10):t = threading.Thread(target=run3)t.start()while threading.active_count() != 1:#如果不等于1,說(shuō)明子線程還沒(méi)執(zhí)行完畢pass #打印進(jìn)程數(shù) else:print(num,num2)?
Semaphore
同時(shí)允許一定數(shù)量的線程更改數(shù)據(jù)
?
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time def run(n):semaphore.acquire()time.sleep(1)print("run the thread: %s" %n)semaphore.release()if __name__ == '__main__':semaphore = threading.BoundedSemaphore(3) #設(shè)置最多允許3個(gè)線程同時(shí)運(yùn)行for i in range(20):t = threading.Thread(target=run,args=(i,))t.start() while threading.active_count() != 1:pass else:print('----done---')?
event
實(shí)現(xiàn)兩個(gè)或多個(gè)線程間的交互,提供了三個(gè)方法?set、wait、clear,默認(rèn)碰到event.wait 方法時(shí)就會(huì)阻塞。
event.set(),設(shè)定后遇到wait不阻塞
event.clear(),設(shè)定后遇到wait后阻塞
event.isSet(),判斷有沒(méi)有被設(shè)定
?
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading def start():print("---start---1")event.wait() #阻塞print("---start---2")if __name__ == "__main__":event = threading.Event()t = threading.Thread(target=start)t.start()result=input(">>:")if result == "set":event.set() #設(shè)定set,wait不阻塞?
?
二、進(jìn)程
multiprocessing模塊
進(jìn)程調(diào)用
?
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process import time def start(name):time.sleep(1)print('hello', name)if __name__ == '__main__':p = Process(target=start, args=('zhangsan',))p1 = Process(target=start, args=('lisi',))p.start()p1.start()p.join()?
進(jìn)程間通訊
每個(gè)進(jìn)程都擁有自己的內(nèi)存空間,因此不同進(jìn)程間內(nèi)存是不共享的,要想實(shí)現(xiàn)兩個(gè)進(jìn)程間的數(shù)據(jù)交換,有幾種方法
Queue(隊(duì)列)
?
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Queue def start(q):q.put( 'hello')if __name__ == '__main__':q = Queue()p = Process(target=start, args=(q,))p.start()print(q.get())p.join()?
Pipe(管道,不常用)
把管道的兩頭分別賦給兩個(gè)進(jìn)程,實(shí)現(xiàn)兩個(gè)進(jìn)程的互相通信
?
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Pipedef start(conn):conn.send('hello')#發(fā)送print(conn.recv())#接收conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe() #生成一個(gè)管道p = Process(target=start, args=(child_conn,))p.start()print(parent_conn.recv())#接收parent_conn.send("11111")#發(fā)送p.join()?
Manager(實(shí)現(xiàn)了進(jìn)程間真正的數(shù)據(jù)共享)
?
#!/usr/bin/env python from multiprocessing import Process, Manager def f(dic, list,i):dic['1'] = 1dic['2'] = 2dic['3'] = 3list.append(i)if __name__ == '__main__':manager = Manager()dic = manager.dict()#通過(guò)manager生成一個(gè)字典list = manager.list(range(5))#通過(guò)manager生成一個(gè)列表p_list = []for i in range(10):p = Process(target=f, args=(dic, list,i))p.start()p_list.append(p)for res in p_list:res.join()print(dic)print(list) #執(zhí)行結(jié)果 ''' {'2': 2, '3': 3, '1': 1} [0, 1, 2, 3, 4, 1, 9, 2, 5, 3, 7, 6, 0, 8, 4] '''?
?
進(jìn)程池
進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí),則去進(jìn)程池中獲取一個(gè)進(jìn)程,如果進(jìn)程池序列中沒(méi)有可供使用的進(jìn)程,那么程序就會(huì)等待,直到進(jìn)程池中有可用進(jìn)程為止。
進(jìn)程池中有兩個(gè)方法:
1、apply(同步)
2、apply_async(異步)
?
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process,Pool import timedef Foo(i):time.sleep(1)return i+100def Bar(arg):print('number::',arg)if __name__ == "__main__":pool = Pool(3)#定義一個(gè)進(jìn)程池,里面有3個(gè)進(jìn)程for i in range(10):pool.apply_async(func=Foo, args=(i,),callback=Bar)#pool.apply(func=Foo, args=(i,))pool.close()#關(guān)閉進(jìn)程池pool.join()#進(jìn)程池中進(jìn)程執(zhí)行完畢后再關(guān)閉,(必須先close在join)?
callback是回調(diào)函數(shù),就是在執(zhí)行完Foo方法后會(huì)自動(dòng)執(zhí)行Bar函數(shù),并且自動(dòng)把Foo函數(shù)的返回值作為參數(shù)傳入Bar函數(shù)
?
三、協(xié)程
協(xié)程,又稱微線程,是一種用戶態(tài)的輕量級(jí)線程。協(xié)程能保留上一次調(diào)用時(shí)的狀態(tài),每次過(guò)程重入時(shí),就相當(dāng)于進(jìn)入上一次調(diào)用的狀態(tài),換種說(shuō)法:進(jìn)入上一次離開(kāi)時(shí)所處邏輯流的位置,當(dāng)程序中存在大量不需要CPU的操作時(shí)(IO),適用于協(xié)程。
協(xié)程有極高的執(zhí)行效率,因?yàn)樽映绦蚯袚Q不是線程切換,而是由程序自身控制,因此,沒(méi)有線程切換的開(kāi)銷。
不需要多線程的鎖機(jī)制,因?yàn)橹挥幸粋€(gè)線程,也不存在同時(shí)寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。
因?yàn)閰f(xié)程是一個(gè)線程執(zhí)行,所以想要利用多核CPU,最簡(jiǎn)單的方法是多進(jìn)程+協(xié)程,這樣既充分利用多核,又充分發(fā)揮協(xié)程的高效率。
那符合什么條件就能稱之為協(xié)程:1、必須在只有一個(gè)單線程里實(shí)現(xiàn)并發(fā) 2、修改共享數(shù)據(jù)不需加鎖 3、用戶程序里自己保存多個(gè)控制流的上下文棧 4、一個(gè)協(xié)程遇到IO操作自動(dòng)切換到其它協(xié)程
python中對(duì)于協(xié)程有兩個(gè)模塊,greenlet和gevent。
?
Greenlet(greenlet的執(zhí)行順序需要我們手動(dòng)控制)
?
#!/usr/bin/env python # -*- coding:utf-8 -*- from greenlet import greenlet def test1():print (11)gr2.switch() #手動(dòng)切換print (22)gr2.switch()def test2():print (33)gr1.switch()print (44)gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()?
gevent(自動(dòng)切換,由于切換是在IO操作時(shí)自動(dòng)完成,所以gevent需要修改Python自帶的一些標(biāo)準(zhǔn)庫(kù),這一過(guò)程在啟動(dòng)時(shí)通過(guò)monkey patch完成)
?
from gevent import monkey; monkey.patch_all() import gevent import timedef foo():print('11')time.sleep(3)print('22')def bar():print('33')print('44')gevent.joinall([gevent.spawn(foo),gevent.spawn(bar), ])?
運(yùn)行結(jié)果:(從結(jié)果可以看出,它們是并發(fā)執(zhí)行的)
11 33 44 22?
參考:
http://www.cnblogs.com/wupeiqi/articles/5040827.html
http://www.cnblogs.com/alex3714/articles/5230609.html
總結(jié)
以上是生活随笔為你收集整理的Python—进程、线程、协程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: flask + celery实现定时任务
- 下一篇: 线程,进程,协程详细解释