python多线程执行其他模块的文件_python并发编程--进程线程--其他模块-从菜鸟到老鸟(三)...
concurrent模塊
1、concurrent模塊的介紹
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor:進程池,提供異步調用
ProcessPoolExecutor?和?ThreadPoolExecutor:兩者都實現相同的接口,該接口由抽象Executor類定義。
2、基本方法
使用_base.Executor
concurrent.futures.thread.ThreadPoolExecutor #線程池
concurrent.futures.process.ProcessPoolExecutor #進程池#構造函數
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=()):
submit(fn, *args, **kwargs)?:異步提交任務
使用submit函數來提交線程需要執行任務(函數名和參數)到線程池中,并返回該任務的句柄(類似于文件、畫圖),注意submit()不是阻塞的,而是立即返回。
map(func, *iterables, timeout=None, chunksize=1)
取代for循環submit的操作
shutdown(wait=True)?:相當于進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,并不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
note:submit和map必須在shutdown之前
pool.submit()返回的對象是
concurrent.futures._base.Future類
add_done_callback(self,fn)
cancel(self)
cancelled(self)
done(self)
exception(self,timeout=None)
result(self,timeout=None)
running(self)
set_exception(self,exception)
set_result(self,result)
set_running_or_notify_cancel(self)
result(timeout=None)?:取得結果,通過submit函數返回的任務句柄,使用result()方法可以獲取任務的返回值,查看內部代碼,發現這個方法是阻塞的
done()方法判斷該任務是否結束
add_done_callback(fn)?:回調函數
3、進程池和線程池
池的功能:限制進程數或線程數.
什么時候限制: 當并發的任務數量遠遠大于計算機所能承受的范圍,即無法一次性開啟過多的任務數量 我就應該考慮去限制我進程數或線程數,從保證服務器不崩.
3.1 進程池
from concurrent.futures importProcessPoolExecutorimportosimporttimedeftask(i):print("第"+str(i)+"個在執行任務id:"+str(os.getpid()))
time.sleep(1)if __name__ == '__main__':
start=time.time()
pool= ProcessPoolExecutor(4) #進程池里又4個進程
for i in range(5): #5個任務
pool.submit(task,i)#進程池里當前執行的任務i,池子里的4個進程一次一次執行任務
pool.shutdown()print("耗時:",time.time()-start)
3.2 線程池
from concurrent.futures importThreadPoolExecutorfrom threading importcurrentThreadimporttimedeftask(i):print("第"+str(i)+"個在執行任務id:"+currentThread().name)
time.sleep(1)if __name__ == '__main__':
start=time.time()
pool= ThreadPoolExecutor(4) #進程池里又4個線程
for i in range(5): #5個任務
pool.submit(task,i)#線程池里當前執行的任務i,池子里的4個線程一次一次執行任務
pool.shutdown()print("耗時:",time.time()-start)
其他:done() 、 result()
通過submit函數返回的任務句柄,能夠使用done()方法判斷該任務是否結束
使用result()方法可以獲取任務的返回值,查看內部代碼,發現這個方法是阻塞的
3.4列表+as_compelete模擬先進先出
對于線程,這樣可以模擬執行與結果的先進先出。
但是對于進程會報錯。
importtimefrom concurrent.futures importProcessPoolExecutor,as_completed,ThreadPoolExecutordefget_html(i):
times=1time.sleep(times)print("第 NO.{i} get page {times} finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)
start=time.time()
executor= ThreadPoolExecutor(max_workers=2)#executor = ProcessPoolExecutor(max_workers=2) #進程池會導致后面的all_task報錯
all_task= [executor.submit(get_html,(i)) for i in range(5)]for future inas_completed(all_task):
data=future.result()print("in main:get page {} success".format(data))print('主進程結束--耗時',time.time()-start)
結果:
第 NO.0 get page 1finished
第 NO.1 get page 1finishedinmain:get page 第 NO.0 successin main:get page 第 NO.1success
第 NO.2 get page 1finishedin main:get page 第 NO.2success
第 NO.3 get page 1finishedin main:get page 第 NO.3success
第 NO.4 get page 1finishedin main:get page 第 NO.4success
主進程結束--耗時 3.0034666061401367
結果:
3.4 Map的用法
可以將多個任務一次性的提交給進程、線程池。---備注進程是也不行的,也會報錯。
使用map方法,不需提前使用submit方法,map方法與python標準庫中的map含義相同,都是將序列中的每個元素都執行同一個函數。
from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutorimportos,time,randomdeftask(i):print("第"+str(i)+"個在執行任務id:"+str(os.getpid()))
time.sleep(1)if __name__ == '__main__':
start=time.time()
pool=ProcessPoolExecutor(max_workers=3) #也可以換成ThreadPoolExecutor
pool.map(task,range(1,5)) #map取代了for+submit
pool.shutdown()print("耗時:",time.time()-start)
考慮到結果返回值:
importtimefrom random importrandomfrom concurrent.futures importProcessPoolExecutor,as_completed,ThreadPoolExecutordefget_html(i):
times=1+random()/100time.sleep(times)print("第 NO.{i} get page {times}s finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)
start=time.time()
executor= ThreadPoolExecutor(max_workers=2)#executor = ProcessPoolExecutor(max_workers=2) #進程池會導致后面的executor.map報錯
res=executor.map(get_html, range(5))#for future in res: #直接返回結果,不需要get
print("in main:get page {} success".format(future))print('主進程結束--耗時',time.time()-start)
3.5 同步調用,順序返回
因為我們在循環中每次循環都要調用或這說提交任務,并等待結果。所以其實進程之間是串行的。所以是同步的方式。
from concurrent.futures importProcessPoolExecutorfrom multiprocessing importcurrent_processimporttime
n= 1
deftask(i):globaln
time.sleep(1)print(f'{current_process().name} 在執行任務{i}')
n+=ireturn f'得到 {current_process().name} 任務{i} 的結果'
if __name__ == '__main__':
start=time.time()
pool= ProcessPoolExecutor(2) #進程池里又4個線程
pool_lis =[]for i in range(5): #20個任務
future = pool.submit(task,i)#進程池里當前執行的任務i,池子里的4個線程一次一次執行任務
pool_lis.append(future.result())#等待我執行任務得到的結果,如果一直沒有結果,則阻塞。這里會導致我們所有任務編程了串行
#在這里就引出了下面的pool.shutdown()方法
pool.shutdown(wait=True) #關閉了池的入口,不允許在往里面添加任務了,會等帶所有的任務執行完,結束阻塞
for res inpool_lis:print(res)print(n)#這里肯定是拿到0的
print("主進程---耗時",time.time()-start)#可以用join去解決,等待每一個進程結束后,拿到他的結果
結果:
SpawnProcess-2在執行任務0
SpawnProcess-1在執行任務1
SpawnProcess-2在執行任務2
SpawnProcess-1在執行任務3
SpawnProcess-2在執行任務4
得到 SpawnProcess-2任務0 的結果
得到 SpawnProcess-1任務1 的結果
得到 SpawnProcess-2任務2 的結果
得到 SpawnProcess-1任務3 的結果
得到 SpawnProcess-2任務4 的結果1主進程---耗時 5.575225830078125
同步--所以是串行的。耗時與單進程差不多
3.5 異步調用,順序返回
from concurrent.futures importProcessPoolExecutorfrom multiprocessing importcurrent_processimporttime
n= 1
deftask(i):globaln
time.sleep(1)print(f'{current_process().name} 在執行任務{i}')
n+=ireturn f'得到 {current_process().name} 任務{i} 的結果'
if __name__ == '__main__':
start=time.time()
pool= ProcessPoolExecutor(2) #進程池里又4個線程
pool_lis =[]for i in range(5): #20個任務
future = pool.submit(task,i)#進程池里當前執行的任務i,池子里的4個線程一次一次執行任務
#print(future.result()) # 這是在等待我執行任務得到的結果,如果一直沒有結果,這里會導致我們所有任務編程了串行
#在這里就引出了下面的pool.shutdown()方法
pool_lis.append(future)
pool.shutdown(wait=True) #關閉了池的入口,不允許在往里面添加任務了,會等帶所有的任務執行完,結束阻塞
for p inpool_lis:print(p.result())print(n)#這里肯定是拿到0的
print("主進程---耗時",time.time()-start)#可以用join去解決,等待每一個進程結束后,拿到他的結果
結果:
SpawnProcess-1在執行任務0
SpawnProcess-2在執行任務1
SpawnProcess-1在執行任務2
SpawnProcess-2在執行任務3
SpawnProcess-1在執行任務4
得到 SpawnProcess-1任務0 的結果
得到 SpawnProcess-2任務1 的結果
得到 SpawnProcess-1任務2 的結果
得到 SpawnProcess-2任務3 的結果
得到 SpawnProcess-1任務4 的結果1主進程---耗時 3.2690603733062744
異步結果,有序返回相應結果
3.5 回調函數:
add_done_callback
from multiprocessing importcurrent_processimporttimefrom random importrandomfrom concurrent.futures importProcessPoolExecutordeftask(i):print(f'{current_process().name} 在執行{i}')
time.sleep(1+random())returni#parse 就是一個回調函數
defparse(future):#處理拿到的結果
print(f'{current_process().name} 拿到結果{future.result()} 結束了當前任務')if __name__ == '__main__':
start=time.time()
pool= ProcessPoolExecutor(2)for i in range(5):
future=pool.submit(task,i)'''給當前執行的任務綁定了一個函數,在當前任務結束的時候就會觸發這個函數(稱之為回調函數)
會把future對象作為參數傳給函數
注:這個稱為回調函數,當前任務處理結束了,就回來調parse這個函數'''future.add_done_callback(parse)#add_done_callback (parse) parse是一個回調函數
#add_done_callback () 是對象的一個綁定方法,他的參數就是一個函數
pool.shutdown()print('主線程耗時:',time.time()-start)
結果:
SpawnProcess-1在執行0
SpawnProcess-2在執行1
SpawnProcess-2在執行2
MainProcess 拿到結果1 結束了當前任務
SpawnProcess-1在執行3
MainProcess 拿到結果0 結束了當前任務
SpawnProcess-1在執行4
MainProcess 拿到結果3 結束了當前任務
MainProcess 拿到結果2 結束了當前任務
MainProcess 拿到結果4 結束了當前任務
主線程耗時:4.721129417419434
回調是主進程的,結果是無序的
3.6wait
wait方法可以讓主線程阻塞,直到滿足設定的要求。wait方法接收3個參數,等待的任務序列、超時時間以及等待條件。
等待條件return_when默認為ALL_COMPLETED,表明要等待所有的任務都借宿。
可以看到運行結果中,確實是所有任務都完成了,主線程才打印出main,等待條件還可以設置為FIRST_COMPLETED,表示第一個任務完成就停止等待
from concurrent.futures importThreadPoolExecutor,wait,ALL_COMPLETED,FIRST_COMPLETEDimporttime#參數times用來模擬網絡請求時間
from random importrandomdefget_html(i):
times=1+random()*10time.sleep(times)print("第 NO.{i} get page {times}s finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)
executor= ThreadPoolExecutor(max_workers=2)
urls= range(5)
all_task= [executor.submit(get_html,(url)) for url inurls]
wait(all_task,return_when=ALL_COMPLETED)print("main")
joblib模塊
總結
以上是生活随笔為你收集整理的python多线程执行其他模块的文件_python并发编程--进程线程--其他模块-从菜鸟到老鸟(三)...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信零钱提现要手续费吗 免费额度需要合理
- 下一篇: python中哈希是什么意思_在pyth