并发编程.md
操作系統(tǒng)基礎(chǔ)
-
人機(jī)矛盾: CPU利用率低
-
磁帶存儲(chǔ)+批處理:降低數(shù)據(jù)的讀取時(shí)間,提高CPU的利用率
-
多道操作系統(tǒng)------在一個(gè)任務(wù)遇到IO的時(shí)候主動(dòng)讓出CPU,給其他任務(wù)使用
- 由操作系統(tǒng)完成
- 切換要需要時(shí)間
-
分時(shí)操作系統(tǒng)-------給時(shí)間分片,讓多個(gè)任務(wù)輪流使用CPU
- 短作業(yè)優(yōu)先算法
- 先來(lái)先服務(wù)算法
每個(gè)程序分配一個(gè)時(shí)間片,輪轉(zhuǎn)使用CPU,切換需要時(shí)間,降低CPU利用率,提高用戶體驗(yàn)
-
通用操作系統(tǒng)-------分時(shí)操作系統(tǒng) + 多道操作系統(tǒng) + 實(shí)時(shí)操作系統(tǒng)
- 多個(gè)程序一起在計(jì)算機(jī)中執(zhí)行
- 一個(gè)程序如果遇到IO操作,切出去讓出CPU
- 一個(gè)程序沒(méi)有遇到IO,但是時(shí)間片到時(shí)了,切出去讓出CPU
-
操作系統(tǒng)負(fù)責(zé)什么?
調(diào)度進(jìn)程先后執(zhí)行的順序 控制執(zhí)行的時(shí)間等等
資源的分配
進(jìn)程的概念
進(jìn)程:
- 運(yùn)行中的程序
- 是計(jì)算機(jī)中最小的資源分配單位
- 在操作系統(tǒng)中唯一標(biāo)識(shí)符:PID
操作系統(tǒng)調(diào)度進(jìn)程的算法:
- 短作業(yè)優(yōu)先
- 先來(lái)先服務(wù)
- 時(shí)間片輪轉(zhuǎn)
- 多級(jí)反饋算法
并行與并發(fā):
- 并行:并行是指兩者同時(shí)執(zhí)行,比如賽跑,兩個(gè)人都在不停的往前跑;(資源夠用,比如三個(gè)線程,四核的CPU )
- 并發(fā):并發(fā)是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時(shí)只能過(guò)一個(gè)人,A走一段后,讓給B,B用完繼續(xù)給A ,交替使用,目的是提高效率。
-
就緒(Ready)狀態(tài)
當(dāng)進(jìn)程已分配到除CPU以外的所有必要的資源,只要獲得處理機(jī)便可立即執(zhí)行,這時(shí)的進(jìn)程狀態(tài)稱為就緒狀態(tài)。
-
執(zhí)行/運(yùn)行(Running)
狀態(tài)當(dāng)進(jìn)程已獲得處理機(jī),其程序正在處理機(jī)上執(zhí)行,此時(shí)的進(jìn)程狀態(tài)稱為執(zhí)行狀態(tài)。
-
阻塞(Blocked)狀態(tài)
正在執(zhí)行的進(jìn)程,由于等待某個(gè)事件發(fā)生而無(wú)法執(zhí)行時(shí),便放棄處理機(jī)而處于阻塞狀態(tài)。引起進(jìn)程阻塞的事件可有多種,例如,等待I/O完成、申請(qǐng)緩沖區(qū)不能滿足、等待信件(信號(hào))等。
同步異步:
所謂同步就是一個(gè)任務(wù)的完成需要依賴另外一個(gè)任務(wù)時(shí),只有等待被依賴的任務(wù)完成后,依賴的任務(wù)才能算完成,這是一種可靠的任務(wù)序列。要么成功都成功,失敗都失敗,兩個(gè)任務(wù)的狀態(tài)可以保持一致。
所謂異步是不需要等待被依賴的任務(wù)完成,只是通知被依賴的任務(wù)要完成什么工作,依賴的任務(wù)也立即執(zhí)行,只要自己完成了整個(gè)任務(wù)就算完成了。至于被依賴的任務(wù)最終是否真正完成,依賴它的任務(wù)無(wú)法確定,所以它是不可靠的任務(wù)序列。
在python程序中的進(jìn)程操作
進(jìn)程:
# 創(chuàng)建進(jìn)程 時(shí)間開(kāi)銷大 # 銷毀進(jìn)程 時(shí)間開(kāi)銷大 # 進(jìn)程之間切換 時(shí)間開(kāi)銷大線程:
線程是進(jìn)程的一部分,每個(gè)進(jìn)程中至少有一個(gè)線程 能被CPU調(diào)度的最小單位 一個(gè)進(jìn)程中的多個(gè)線程是可以共享這個(gè)進(jìn)程的數(shù)據(jù)的 —— 數(shù)據(jù)共享 線程的創(chuàng)建、銷毀、切換 開(kāi)銷遠(yuǎn)遠(yuǎn)小于進(jìn)程 —— 開(kāi)銷小進(jìn)程:是計(jì)算機(jī)中最小的資源分配單位(進(jìn)程是負(fù)責(zé)圈資源)
線程:是計(jì)算機(jī)中能被CPU調(diào)度的最小單位 (線程是負(fù)責(zé)執(zhí)行具體代碼的)
os.getpid():獲取當(dāng)前進(jìn)程pid
os.getppid():獲取父級(jí)進(jìn)程pid,可以創(chuàng)建子進(jìn)程,在pycharm中啟動(dòng)的所有py程序都是pycharm的子進(jìn)程
import os import time from multiprocessing import Process # multiprocessing多進(jìn)程模塊Process類 def func():print('start',os.getpid())time.sleep(1)print('end',os.getpid())if __name__ == '__main__':p = Process(target=func) # 將函數(shù)封裝到類,創(chuàng)建一個(gè)要開(kāi)啟func進(jìn)程的對(duì)象p.start() # 異步 調(diào)用開(kāi)啟進(jìn)程的方法 但是并不等待這個(gè)進(jìn)程真的開(kāi)啟print('main :',os.getpid()) #main : 11436 #start 9860 #end 9860 操作系統(tǒng)創(chuàng)建進(jìn)程的方式不同 windows操作系統(tǒng)執(zhí)行開(kāi)啟進(jìn)程的代碼實(shí)際上新的子進(jìn)程需要通過(guò)import父進(jìn)程的代碼來(lái)完成數(shù)據(jù)的導(dǎo)入工作所以有一些內(nèi)容我們只希望在父進(jìn)程中完成,就寫在if __name__ == '__main__':下面 ios linux操作系統(tǒng)創(chuàng)建進(jìn)程 fork,拷貝的方式-
主進(jìn)程和子進(jìn)程之間的關(guān)系
父進(jìn)程會(huì)等待著所有的子進(jìn)程結(jié)束之后才結(jié)束,為了回收資源
- join方法 :阻塞父進(jìn)程,直到對(duì)應(yīng)子進(jìn)程結(jié)束就結(jié)束
補(bǔ)充:Windows開(kāi)啟進(jìn)程,由于創(chuàng)建機(jī)制,必須采用此方式.
print([__name__]) if __name__ == '__main__':# 控制當(dāng)這個(gè)py文件被當(dāng)作腳本直接執(zhí)行的時(shí)候,就執(zhí)行這里面的代碼# 當(dāng)這個(gè)py文件被當(dāng)作模塊導(dǎo)入的時(shí)候,就不執(zhí)行這里面的代碼print('hello hello') # __name__ == '__main__'# 執(zhí)行的文件就是__name__所在的文件 # __name__ == '文件名'# __name__所在的文件被導(dǎo)入執(zhí)行的時(shí)候-
守護(hù)進(jìn)程
隨著主進(jìn)程的代碼結(jié)束而結(jié)束的,所有的子進(jìn)程都必須在主進(jìn)程結(jié)束之前結(jié)束,由主進(jìn)程來(lái)負(fù)責(zé)回收資源
p.daemon = True
其他方法:
p.is_alive() 判斷進(jìn)程是否活著p.terminate() # 可以解釋異步非阻塞, 關(guān)閉需要時(shí)間,并不等到返回結(jié)束進(jìn)程結(jié)果,會(huì)變僵尸 def son1():while True:print('is alive')time.sleep(0.5)if __name__ == '__main__':p = Process(target=son1)p.start() # 異步 非阻塞print(p.is_alive())time.sleep(1)p.terminate() # 異步的 非阻塞print(p.is_alive()) # 進(jìn)程還活著 因?yàn)椴僮飨到y(tǒng)還沒(méi)來(lái)得及關(guān)閉進(jìn)程time.sleep(0.01)print(p.is_alive()) # 操作系統(tǒng)已經(jīng)響應(yīng)了我們要關(guān)閉進(jìn)程的需求,再去檢測(cè)的時(shí)候,得到的結(jié)果是進(jìn)程已經(jīng)結(jié)束了使用面向?qū)ο蠓绞介_(kāi)啟進(jìn)程
import os import time from multiprocessing import Processclass MyProcecss2(Process): #必須繼承Processdef run(self): #必須要有run方法,重寫process的run,start自動(dòng)調(diào)用runwhile True:print('is alive')time.sleep(0.5)class MyProcecss1(Process):def __init__(self,x,y): #傳參數(shù)要定義init函數(shù)self.x = xself.y = ysuper().__init__() #要導(dǎo)入父類的初始化參數(shù)def run(self):print(self.x,self.y,os.getpid())for i in range(5):print('in son2')time.sleep(1)if __name__ == '__main__':mp = MyProcecss1(1,2)mp.daemon = True mp.start()print(mp.is_alive())mp.terminate()# mp2 = MyProcecss2()# mp2.start()# print('main :',os.getpid())# time.sleep(1)Process操作進(jìn)程的方法
# p.start() 開(kāi)啟進(jìn)程 異步非阻塞 # p.terminate() 結(jié)束進(jìn)程 異步非阻塞 # p.join() 同步阻塞 # p.isalive() 獲取當(dāng)前進(jìn)程的狀態(tài) # daemon = True 設(shè)置為守護(hù)進(jìn)程,守護(hù)進(jìn)程永遠(yuǎn)在主進(jìn)程的代碼結(jié)束之后自動(dòng)結(jié)束鎖
# 1.如果在一個(gè)并發(fā)的場(chǎng)景下,涉及到某部分內(nèi)容# 是需要修改一些所有進(jìn)程共享數(shù)據(jù)資源# 需要加鎖來(lái)維護(hù)數(shù)據(jù)的安全 # 2.在數(shù)據(jù)安全的基礎(chǔ)上,才考慮效率問(wèn)題 # 3.同步存在的意義# 數(shù)據(jù)的安全性# 在主進(jìn)程中實(shí)例化 lock = Lock() # 把這把鎖傳遞給子進(jìn)程 # 在子進(jìn)程中 對(duì)需要加鎖的代碼 進(jìn)行 with lock:# with lock相當(dāng)于lock.acquire()和lock.release() # 在進(jìn)程中需要加鎖的場(chǎng)景# 共享的數(shù)據(jù)資源(文件、數(shù)據(jù)庫(kù))# 對(duì)資源進(jìn)行修改、刪除操作 # 加鎖之后能夠保證數(shù)據(jù)的安全性 但是也降低了程序的執(zhí)行效率 mport time import json from multiprocessing import Process,Lockdef search_ticket(user):with open('ticket_count') as f:dic = json.load(f)print('%s查詢結(jié)果 : %s張余票'%(user,dic['count']))def buy_ticket(user,lock):# with lock:# lock.acquire() # 給這段代碼加上一把鎖time.sleep(0.02)with open('ticket_count') as f:dic = json.load(f)if dic['count'] > 0:print('%s買到票了'%(user))dic['count'] -= 1else:print('%s沒(méi)買到票' % (user))time.sleep(0.02)with open('ticket_count','w') as f:json.dump(dic,f)# lock.release() # 給這段代碼解鎖def task(user, lock):search_ticket(user)with lock:buy_ticket(user, lock)if __name__ == '__main__':lock = Lock()for i in range(10):p = Process(target=task,args=('user%s'%i,lock))p.start()進(jìn)程之間通信IPC
進(jìn)程之間的通信 - IPC(inter process communication) 第三方:redis,memcache,kafka,rabbitmq 特點(diǎn):并發(fā)需求,高可用,斷電保存數(shù)據(jù),解耦 from multiprocessing import Queue,Process # 先進(jìn)先出 def func(exp,q):ret = eval(exp)q.put({ret,2,3})q.put(ret*2)q.put(ret*4)if __name__ == '__main__':q = Queue()Process(target=func,args=('1+2+3',q)).start()print(q.get())print(q.get())print(q.get()) # Queue基于 天生就是數(shù)據(jù)安全的# 文件家族的socket pickle lock # pipe 管道(不安全的) = 文件家族的socket pickle # 隊(duì)列 = 管道 + 鎖 # from multiprocessing import Pipe # pip = Pipe() # pip.send() # pip.recv() import queue# from multiprocessing import Queue # q = Queue(5) # q.put(1) # q.put(2) # q.put(3) # q.put(4) # q.put(5) # 當(dāng)隊(duì)列為滿的時(shí)候再向隊(duì)列中放數(shù)據(jù) 隊(duì)列會(huì)阻塞 # print('5555555') # try: # q.put_nowait(6) # 當(dāng)隊(duì)列為滿的時(shí)候再向隊(duì)列中放數(shù)據(jù) 會(huì)報(bào)錯(cuò)并且會(huì)丟失數(shù)據(jù) # except queue.Full: # pass # print('6666666') # # print(q.get()) # print(q.get()) # print(q.get()) # 在隊(duì)列為空的時(shí)候會(huì)發(fā)生阻塞 # print(q.get()) # 在隊(duì)列為空的時(shí)候會(huì)發(fā)生阻塞 # print(q.get()) # 在隊(duì)列為空的時(shí)候會(huì)發(fā)生阻塞 # try: # print(q.get_nowait()) # 在隊(duì)列為空的時(shí)候 直接報(bào)錯(cuò) # except queue.Empty:pass生產(chǎn)者消費(fèi)者模型
什么是生產(chǎn)者消費(fèi)者模型? # 把一個(gè)產(chǎn)生數(shù)據(jù)并且處理數(shù)據(jù)的過(guò)程解耦 # 讓生產(chǎn)的數(shù)據(jù)的過(guò)程和處理數(shù)據(jù)的過(guò)程達(dá)到一個(gè)工作效率上的平衡 # 中間的容器,在多進(jìn)程中我們使用隊(duì)列或者可被join的隊(duì)列,做到控制數(shù)據(jù)的量# 當(dāng)數(shù)據(jù)過(guò)剩的時(shí)候,隊(duì)列的大小會(huì)控制這生產(chǎn)者的行為# 當(dāng)數(shù)據(jù)嚴(yán)重不足的時(shí)候,隊(duì)列會(huì)控制消費(fèi)者的行為# 并且我們還可以通過(guò)定期檢查隊(duì)列中元素的個(gè)數(shù)來(lái)調(diào)節(jié)生產(chǎn)者消費(fèi)者的個(gè)數(shù)第一種方式:
import time import random from multiprocessing import Process,Queuedef producer(q,name,food):for i in range(10):time.sleep(random.random())fd = '%s%s'%(food,i)q.put(fd)print('%s生產(chǎn)了一個(gè)%s'%(name,food))def consumer(q,name):while True:food = q.get()if not food:breaktime.sleep(random.randint(1,3))print('%s吃了%s'%(name,food))def cp(c_count,p_count):q = Queue(10)for i in range(c_count):Process(target=consumer, args=(q, 'alex')).start()p_l = []for i in range(p_count):p1 = Process(target=producer, args=(q, 'wusir', '泔水'))p1.start()p_l.append(p1)for p in p_l:p.join()for i in range(c_count):q.put(None) if __name__ == '__main__':cp(2,3) 流程:消費(fèi)者開(kāi)啟進(jìn)程get,生產(chǎn)者開(kāi)啟進(jìn)程put,加入隊(duì)列,全部結(jié)束后(jion),隊(duì)列put(None),消費(fèi)者get到空終止第二種方式:
import time import random from multiprocessing import JoinableQueue,Processdef producer(q,name,food):for i in range(10):time.sleep(random.random())fd = '%s%s'%(food,i)q.put(fd)print('%s生產(chǎn)了一個(gè)%s'%(name,food))q.join()def consumer(q,name):while True:food = q.get()time.sleep(random.random())print('%s吃了%s'%(name,food))q.task_done()if __name__ == '__main__':jq = JoinableQueue()p =Process(target=producer,args=(jq,'wusir','泔水'))p.start()c = Process(target=consumer,args=(jq,'alex'))c.daemon = Truec.start()p.join()JoinableQueue同樣通過(guò)multiprocessing使用。
創(chuàng)建隊(duì)列的另外一個(gè)類:
? JoinableQueue([maxsize]):這就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來(lái)實(shí)現(xiàn)的。
參數(shù)介紹:
? maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無(wú)大小限制。
方法介紹:
? JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外還具有:
? q.task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常
? q.join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止
進(jìn)程間數(shù)據(jù)共享
from multiprocessing import Manager,Process,Lockdef func(dic,lock):with lock:dic['count'] -= 1if __name__ == '__main__':# m = Manager()with Manager() as m:l = Lock()dic = m.dict({'count':100})p_l = []for i in range(100):p = Process(target=func,args=(dic,l))p.start()p_l.append(p)for p in p_l:p.join()print(dic)mulprocessing中有一個(gè)manager類,封裝了所有和進(jìn)程相關(guān)的 數(shù)據(jù)共享 數(shù)據(jù)傳遞相關(guān)的數(shù)據(jù)類型,但是對(duì)于 字典 列表這一類的數(shù)據(jù)操作的時(shí)候會(huì)產(chǎn)生數(shù)據(jù)不安全, 需要加鎖解決問(wèn)題,并且需要盡量少的使用這種方式.
線程
GIL鎖:全局解釋器鎖,cpython解釋器中特殊的垃圾回收機(jī)制,導(dǎo)致了在同一個(gè)進(jìn)程中多個(gè)線程不能同時(shí)利用多核 —— python的多線程只能是并發(fā)不能是并行
所以使用所線程并不影響高io型的操作,只會(huì)對(duì)高計(jì)算型的程序由效率上的影響
主線程什么時(shí)候結(jié)束?等待所有子線程結(jié)束之后才結(jié)束 主線程如果結(jié)束了,主進(jìn)程也就結(jié)束了 # multiprocessing 是完全仿照這threading的類寫的 from threading import Thread def func():print('start son thread') # 啟動(dòng)線程 start Thread(target=func).start() # 開(kāi)啟多個(gè)子線程 def func(i):print('start son thread',i)time.sleep(1)print('end son thread',i)for i in range(10):Thread(target=func,args=(i,)).start() print('main') # join方法 阻塞 直到子線程執(zhí)行結(jié)束 import time import os from threading import Thread def func(i):print('start son thread',i)time.sleep(1)print('end son thread',i,os.getpid()) t_l = [] for i in range(10):t = Thread(target=func,args=(i,))t.start()t_l.append(t) for t in t_l:t.join() print('子線程執(zhí)行完畢') # 使用面向?qū)ο蟮姆绞絾?dòng)線程 class MyThread(Thread):def __init__(self,i):self.i = isuper().__init__()def run(self):print('start',self.i,self.ident)time.sleep(1)print('end',self.i)for i in range(10):t = MyThread(i)t.start()print(t.ident) #線程id # 線程里的一些其他方法 from threading import current_thread,enumerate,active_count def func(i):t = current_thread() #當(dāng)前線程對(duì)象print('start son thread',i,t.ident)time.sleep(1)print('end son thread',i,os.getpid())t = Thread(target=func,args=(1,)) t.start() print(t.ident) print(current_thread().ident) # 水性楊花 在哪一個(gè)線程里,current_thread()得到的就是這個(gè)當(dāng)前線程的信息 print(enumerate()) #活著的線程列表 print(active_count()) # =====len(enumerate()) terminate 結(jié)束進(jìn)程,在線程中不能從主線程結(jié)束一個(gè)子線程 # 守護(hù)線程 import time from threading import Thread def son1():while True:time.sleep(0.5)print('in son1') def son2():for i in range(5):time.sleep(1)print('in son2') t =Thread(target=son1) t.daemon = True t.start() Thread(target=son2).start() time.sleep(3) # 守護(hù)線程一直等到所有的非守護(hù)線程都結(jié)束之后才結(jié)束 # 除了守護(hù)了主線程的代碼之外也會(huì)守護(hù)子線程線程鎖
即便是線程 即便有GIL 也會(huì)出現(xiàn)數(shù)據(jù)不安全的問(wèn)題
# 1.操作的是全局變量 # 2.做一下操作# += -= *= /+ 先計(jì)算再賦值才容易出現(xiàn)數(shù)據(jù)不安全的問(wèn)題# 包括 lst[0] += 1 dic['key']-=1 a = 0 def add_f(lock):global afor i in range(200000):with lock:a += 1def sub_f(lock):global afor i in range(200000):with lock:a -= 1from threading import Thread,Lock lock = Lock() t1 = Thread(target=add_f,args=(lock,)) t1.start() t2 = Thread(target=sub_f,args=(lock,)) t2.start() t1.join() t2.join() print(a)加鎖會(huì)影響程序的執(zhí)行效率,但是保證了數(shù)據(jù)的安全
互斥鎖是鎖中的一種:在同一個(gè)線程中,不能連續(xù)acquire多次
☆帶鎖的單例模式
import time from threading import Lock class A:__instance = Nonelock = Lock()def __new__(cls, *args, **kwargs):with cls.lock:if not cls.__instance:time.sleep(0.1)cls.__instance = super().__new__(cls)return cls.__instancedef __init__(self,name,age):self.name = nameself.age = agedef func():a = A('alex', 84)print(a)from threading import Thread for i in range(10):t = Thread(target=func)t.start()遞歸鎖
from threading import RLock # rlock = RLock() # rlock.acquire() # print('*'*20) # rlock.acquire() # print('-'*20) # rlock.acquire() # print('*'*20)優(yōu)點(diǎn):在同一個(gè)線程中,可以連續(xù)acuqire多次不會(huì)被鎖住
缺點(diǎn):占用了更多資源
死鎖現(xiàn)象:在某一些線程中出現(xiàn)陷入阻塞并且永遠(yuǎn)無(wú)法結(jié)束阻塞的情況就是死鎖現(xiàn)象
1.多把鎖+交替使用
2.互斥鎖在一個(gè)線程中連續(xù)acquire
避免方法:在一個(gè)線程中只有一把鎖,并且每一次acquire之后都要release
解決方法:可以用遞歸鎖解決,也可以通過(guò)優(yōu)化代碼邏輯解決.
import time from threading import RLock,Thread # noodle_lock = RLock() # fork_lock = RLock() noodle_lock = fork_lock = RLock() print(noodle_lock,fork_lock) def eat1(name,noodle_lock,fork_lock):noodle_lock.acquire()print('%s搶到面了'%name)fork_lock.acquire()print('%s搶到叉子了' % name)print('%s吃了一口面'%name)time.sleep(0.1)fork_lock.release()print('%s放下叉子了' % name)noodle_lock.release()print('%s放下面了' % name)def eat2(name,noodle_lock,fork_lock):fork_lock.acquire()print('%s搶到叉子了' % name)noodle_lock.acquire()print('%s搶到面了'%name)print('%s吃了一口面'%name)time.sleep(0.1)noodle_lock.release()print('%s放下面了' % name)fork_lock.release()print('%s放下叉子了' % name)lst = ['alex','wusir','taibai','yuan'] Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start() Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start() Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start() Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()互斥鎖解決
import time from threading import Lock,Thread lock = Lock() def eat1(name,noodle_lock,fork_lock):lock.acquire()print('%s搶到面了'%name)print('%s搶到叉子了' % name)print('%s吃了一口面'%name)time.sleep(0.1)print('%s放下叉子了' % name)print('%s放下面了' % name)lock.release()def eat2(name,noodle_lock,fork_lock):lock.acquire()print('%s搶到叉子了' % name)print('%s搶到面了'%name)print('%s吃了一口面'%name)time.sleep(0.1)print('%s放下面了' % name)print('%s放下叉子了' % name)lock.release()lst = ['alex','wusir','taibai','yuan'] Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start() Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start() Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start() Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()先進(jìn)先出隊(duì)列
from queue import Queue后進(jìn)先出隊(duì)列---棧
from queue import LifoQueue優(yōu)先級(jí)隊(duì)列
自動(dòng)的排序 搶票的用戶級(jí)別 100000 100001 告警級(jí)別 from queue import PriorityQueue pq = PriorityQueue() pq.put((10,'alex')) pq.put((6,'wusir')) pq.put((20,'yuan')) print(pq.get()) print(pq.get())池
預(yù)先的開(kāi)啟固定個(gè)數(shù)的進(jìn)程數(shù),當(dāng)任務(wù)來(lái)臨的時(shí)候,直接提交給已經(jīng)開(kāi)好的進(jìn)程,讓這個(gè)進(jìn)程去執(zhí)行就可以了,節(jié)省了進(jìn)程,線程的開(kāi)啟關(guān)閉的切換時(shí)間,并且減輕了操作系統(tǒng)調(diào)度的負(fù)擔(dān).
開(kāi)啟步驟
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor # 創(chuàng)建一個(gè)池子 tp = ThreadPoolExecutor(池中線程/進(jìn)程的個(gè)數(shù)) # 異步提交任務(wù) ret = tp.submit(函數(shù),參數(shù)1,參數(shù)2....) # 獲取返回值 ret.result() # 在異步的執(zhí)行完所有任務(wù)之后,主線程/主進(jìn)程才開(kāi)始執(zhí)行的代碼 tp.shutdown() 阻塞 直到所有的任務(wù)都執(zhí)行完畢 # 關(guān)閉池之后就不能繼續(xù)提交任務(wù),并且會(huì)阻塞,直到已經(jīng)提交的任務(wù)完成 # map方法 ret = tp.map(func,iterable) #迭代獲取iterable中的內(nèi)容,作為func的參數(shù),讓子線程來(lái)執(zhí)行對(duì)應(yīng)的任務(wù) for i in ret: 每一個(gè)都是任務(wù)的返回值 # 回調(diào)函數(shù) ret.add_done_callback(函數(shù)名) # 要在ret對(duì)應(yīng)的任務(wù)執(zhí)行完畢之后,直接繼續(xù)執(zhí)行add_done_callback綁定的函數(shù)中的內(nèi)容,并且ret的結(jié)果會(huì)作為參數(shù)返回給綁定的函數(shù)帶參數(shù)及返回值
def func(i,name):print('start',os.getpid())time.sleep(random.randint(1,3))print('end', os.getpid())return '%s * %s'%(i,os.getpid()) if __name__ == '__main__':p = ProcessPoolExecutor(5)ret_l = []for i in range(10):ret = p.submit(func,i,'alex')ret_l.append(ret)for ret in ret_l:print('ret-->',ret.result()) # ret.result() 同步阻塞print('main',os.getpid())回調(diào)函數(shù)
import requests from concurrent.futures import ThreadPoolExecutor def get_page(url):res = requests.get(url)return {'url':url,'content':res.text}def parserpage(ret): #必須有參數(shù)dic = ret.result()print(dic) tp = ThreadPoolExecutor(5) url_lst = ['http://www.baidu.com', # 3'http://www.cnblogs.com', # 1'http://www.douban.com', # 1'http://www.tencent.com','http://www.cnblogs.com/Eva-J/articles/8306047.html','http://www.cnblogs.com/Eva-J/articles/7206498.html', ] ret_l = [] for url in url_lst:ret = tp.submit(get_page,url)ret_l.append(ret)ret.add_done_callback(parserpage) 回調(diào)函數(shù)add_done_callback# 執(zhí)行完子線程任務(wù)之后直接調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)# 爬取網(wǎng)頁(yè) 需要等待數(shù)據(jù)傳輸和網(wǎng)絡(luò)上的響應(yīng)高IO的 -- 子線程# 分析網(wǎng)頁(yè) 沒(méi)有什么IO操作 -- 這個(gè)操作沒(méi)必要在子線程完成,交給回調(diào)函數(shù) 是單獨(dú)開(kāi)啟線程進(jìn)程還是池?# 如果只是開(kāi)啟一個(gè)子線程做一件事情,就可以單獨(dú)開(kāi)線程# 有大量的任務(wù)等待程序去做,要達(dá)到一定的并發(fā)數(shù),開(kāi)啟線程池# 根據(jù)你程序的io操作也可以判定是用池還是不用池?# socket的server 大量的阻塞io recv recvfrom socketserver# 爬蟲的時(shí)候 池池的總結(jié)
hreadPoolExecutor中的幾個(gè)常用方法# tp = ThreadPoolExecutor(cpu*5)# obj = tp.submit(需要在子線程執(zhí)行的函數(shù)名,參數(shù))# obj# 1.獲取返回值 obj.result() 是一個(gè)阻塞方法# 2.綁定回調(diào)函數(shù) obj.add_done_callback(子線程執(zhí)行完畢之后要執(zhí)行的代碼對(duì)應(yīng)的函數(shù))# ret = tp.map(需要在子線程執(zhí)行的函數(shù)名,iterable)# 1.迭代ret,總是能得到所有的返回值# shutdown# tp.shutdown()進(jìn)程和線程中的鎖
# 所有在線程中能工作的基本都不能在進(jìn)程中工作 # 在進(jìn)程中能夠使用的基本在線程中也可以使用在多進(jìn)程中啟動(dòng)多線程
在多進(jìn)程里啟動(dòng)多線程 import os from multiprocessing import Process from threading import Threaddef tfunc():print(os.getpid()) def pfunc():print('pfunc-->',os.getpid())Thread(target=tfunc).start()if __name__ == '__main__':Process(target=pfunc).start()協(xié)程
# 協(xié)程:# 用戶級(jí)別的,由我們自己寫的python代碼來(lái)控制切換的# 是操作系統(tǒng)不可見(jiàn)的 # 在Cpython解釋器下 - 協(xié)程和線程都不能利用多核,都是在一個(gè)CPU上輪流執(zhí)行# 由于多線程本身就不能利用多核# 所以即便是開(kāi)啟了多個(gè)線程也只能輪流在一個(gè)CPU上執(zhí)行# 協(xié)程如果把所有任務(wù)的IO操作都規(guī)避掉,只剩下需要使用CPU的操作# 就意味著協(xié)程就可以做到題高CPU利用率的效果 # 多線程和協(xié)程# 線程 切換需要操作系統(tǒng),開(kāi)銷大,操作系統(tǒng)不可控,給操作系統(tǒng)的壓力大# 操作系統(tǒng)對(duì)IO操作的感知更加靈敏# 協(xié)程 切換需要python代碼,開(kāi)銷小,用戶操作可控,完全不會(huì)增加操作系統(tǒng)的壓力# 用戶級(jí)別能夠?qū)O操作的感知比較低gevent模塊開(kāi)啟協(xié)程
import time print('-->',time.sleep) import gevent from gevent import monkey monkey.patch_all() def eat():print('wusir is eating')print('in eat: ')time.sleep(1) #遇到阻塞讓出CPUreturn 'wusir finished eat'def sleep():print('小馬哥 is sleeping')time.sleep(1)print('小馬哥 finished sleep') g_l=[] for i in range(10): # 創(chuàng)造十個(gè)協(xié)程任務(wù)g1 = gevent.spawn(eat) g_l.append(g1) g2 = gevent.spawn(sleep) # 創(chuàng)造一個(gè)協(xié)程任務(wù) g2.join() # 阻塞 直到g1任務(wù)完成為止 gevent.joinall(g_l) #jionall后面加包含gevent對(duì)象的列表 for i in g_l:print(i.value) #value取值asyncio模塊
# 起一個(gè)任務(wù) # async def demo(): # 協(xié)程方法 # print('start') # await asyncio.sleep(1) # 阻塞 # print('end') # # loop = asyncio.get_event_loop() # 創(chuàng)建一個(gè)事件循環(huán) # loop.run_until_complete(demo()) # 把demo任務(wù)丟到事件循環(huán)中去執(zhí)行# 啟動(dòng)多個(gè)任務(wù),并且沒(méi)有返回值 # async def demo(): # 協(xié)程方法 # print('start') # await asyncio.sleep(1) # 阻塞 # print('end') # # loop = asyncio.get_event_loop() # 創(chuàng)建一個(gè)事件循環(huán) # wait_obj = asyncio.wait([demo(),demo(),demo()]) # loop.run_until_complete(wait_obj)# 啟動(dòng)多個(gè)任務(wù)并且有返回值 # async def demo(): # 協(xié)程方法 # print('start') # await asyncio.sleep(1) # 阻塞 # print('end') # return 123 # # loop = asyncio.get_event_loop() # t1 = loop.create_task(demo()) # t2 = loop.create_task(demo()) # tasks = [t1,t2] # wait_obj = asyncio.wait([t1,t2]) # loop.run_until_complete(wait_obj) # for t in tasks: # print(t.result())# 誰(shuí)先回來(lái)先取誰(shuí)的結(jié)果 # import asyncio # async def demo(i): # 協(xié)程方法 # print('start') # await asyncio.sleep(10-i) # 阻塞 # print('end') # return i,123 # # async def main(): # task_l = [] # for i in range(10): # task = asyncio.ensure_future(demo(i)) # task_l.append(task) # for ret in asyncio.as_completed(task_l): # res = await ret # print(res) # # loop = asyncio.get_event_loop() # loop.run_until_complete(main())# import asyncio # # async def get_url(): # reader,writer = await asyncio.open_connection('www.baidu.com',80) # writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n') # all_lines = [] # async for line in reader: # data = line.decode() # all_lines.append(data) # html = '\n'.join(all_lines) # return html # # async def main(): # tasks = [] # for url in range(20): # tasks.append(asyncio.ensure_future(get_url())) # for res in asyncio.as_completed(tasks): # result = await res # print(result) # # # if __name__ == '__main__': # loop = asyncio.get_event_loop() # loop.run_until_complete(main()) # 處理一個(gè)任務(wù)轉(zhuǎn)載于:https://www.cnblogs.com/machangwei-8/p/10885890.html
總結(jié)
- 上一篇: 【转载】端口释放
- 下一篇: freenas安装使用和弃坑