Python3.2+ 的 concurrent.futures 模块,利用 multiprocessing 实现高并发。
From:https://www.cnblogs.com/weihengblog/p/9812110.html
concurrent.futures 官方文檔:https://docs.python.org/3/library/concurrent.futures.html
concurrent.futures: 線程池, 讓你更加高效, 并發(fā)的處理任務(wù):https://www.h3399.cn/201906/703751.html
python 因為其全局解釋器鎖 GIL 而無法通過線程實現(xiàn)真正的平行計算。這個論斷我們不展開,但是有個概念我們要說明,
IO 密集型 vs?計算密集型:
而 concurrent.futures 模塊,可以利用 multiprocessing 實現(xiàn)真正的平行計算。
核心原理是:concurrent.futures 會以子進程的形式,平行的運行多個 python 解釋器,從而令 python 程序可以利用多核 CPU 來提升執(zhí)行速度。由于 子進程 與 主解釋器 相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU 內(nèi)核。
解釋 2:concurrent.futures 中的 ProcessPoolExecutor類把工作分配給多個Python進程處理,因此,如果需要做CPU密集型處理,使用這個模塊能繞開GIL,利用所有的CPU核心。
其原理是一個ProcessPoolExecutor創(chuàng)建了N個獨立的Python解釋器,N是系統(tǒng)上面可用的CPU核數(shù)。使用方法和ThreadPoolExecutor方法一樣
Python:使用 Future、asyncio 處理并發(fā)
:https://blog.csdn.net/sinat_38682860/article/details/105419842
future 初始 -- 處理并發(fā):https://www.cnblogs.com/zhaof/p/7679529.html
從Python3.2開始,標準庫為我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進程池)兩個類。
相比 threading 等模塊,該模塊通過 submit 返回的是一個 future 對象,它是一個未來可期的對象,通過它可以獲悉線程的狀態(tài)主線程(或進程)中可以獲取某一個線程(進程)執(zhí)行的狀態(tài)或者某一個任務(wù)執(zhí)行的狀態(tài)及返回值:
Python 模塊 - Concurrent.futures
從 Python3.2開始,Python?標準庫提供了 concurrent.futures?模塊,為開發(fā)人員提供了啟動異步任務(wù)的高級接口。 它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現(xiàn)了對 threading 和 multiprocessing 的更高級的抽象,對編寫 線程池/進程池 提供了直接的支持。?可以將相應(yīng)的 tasks 直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題,線程池/進程池會自動幫我們調(diào)度。
Future總結(jié)
1. python3自帶,python2需要安裝2. Executer對象它是一個抽象類,它提供了異步執(zhí)行的方法,他不能直接使用,但可以通過它的子類ThreadPoolExecuter和ProcessPoolExecuter2.1 Executer.submit(fn, *args, **kwargs)fn: 需要異步執(zhí)行的函數(shù)*args,**kwargs fn 接受的參數(shù)該方法的作用就是提交一個可執(zhí)行的回調(diào) task,它返回一個 Future 對象2.2 map(fn, *iterables, timeout=None, chunksize=1)map(task,URLS) # 返回一個 map()迭代器,這個迭代器中的回調(diào)執(zhí)行返回的結(jié)果是有序的3. Future對象相關(guān)future可以理解為一個在未來完成的操作,這是異步編程的基礎(chǔ)通常情況下我們在遇到IO操作的時候,將會發(fā)生阻塞,cpu不能做其他事情而future的引入幫助我們在這段等待時間可以完成其他的操作3.1 done():如果當前線程已取消/已成功,返回True。3.2 cance():如果當前線程正在執(zhí)行,并且不能取消調(diào)用,返回Flase。否則調(diào)用取消,返回True3.3 running():如果當前的線程正在執(zhí)行,則返回True3.4 result():返回調(diào)用返回的值,如果調(diào)用尚未完成,則此方法等待如果等待超時,會拋出concurrent.futures.TimeoutError如果沒有指定超時時間,則等待無時間限制如果在完成之前,取消了Future,則會引發(fā)CancelledError4. as_completed():在多個Future實例上的迭代器將會被返回這些Future實例由fs完成時產(chǎn)生。由fs返回的任何重復的Future,都會被返回一次。里面保存的都是已經(jīng)執(zhí)行完成的Future對象5. wait():返回一個元祖,元祖包含兩個元素1. 已完成的future集合2. 未完成的future集合初體驗:
# coding=utf-8 from concurrent import futures from concurrent.futures import Future import timedef return_future(msg):time.sleep(3)return msgpool = futures.ThreadPoolExecutor(max_workers=2)t1 = pool.submit(return_future,'hello') t2 = pool.submit(return_future,'world')time.sleep(3) print(t1.done()) # 如果順利完成,則返回True time.sleep(3) print(t2.done())print(t1.result()) # 獲取future的返回值 time.sleep(3) print(t2.result())print("主線程")map(func,* iterables,timeout = None,chunksize = 1 )
# coding=utf-8import time from concurrent.futures import Future,as_completed from concurrent.futures import ThreadPoolExecutor as Pool import requests import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)pool = Pool() result = pool.map(task,URLS)start_time = time.time()# 按照 URLS 的順序返回 for res in result:print("{} {}".format(res.url,len(res.content)))# 無序的 with Pool(max_workers=3) as executer:future_task = [executer.submit(task,url) for url in URLS]for f in as_completed(future_task):if f.done():f_ret = f.result() # f.result()得到task的返回值,requests對象print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))print("耗時", time.time() - start_time) print("主線程")Future對象
Future可以理解為一個未來完成的操作
當我們執(zhí)行io操作的時候,在等待返回結(jié)果之前會產(chǎn)生阻塞
cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他操作
模塊方法
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
wait() 會返回一個tuple,tuple 會包含兩個集合:已完成的集合?和 未完成的集合。使用 wait() 會獲得更大的自由度,他接受三個參數(shù):FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETE。默認為 ALL_COMPLETE。
如果采用默認的 ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務(wù)都完成,再執(zhí)行主線程:
from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed, wait import requestsURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url, timeout=10):r = requests.get(url=url, timeout=timeout)print(r.status_code)with Pool(max_workers=3) as execute:future_task = [execute.submit(task, url) for url in URLS]for f in future_task:if f.running():print("%s" % (str(f)))"""并且wait還有timeout和return_when兩個參數(shù)return_when有三個常量 (默認是 ALL_COMPLETED)FIRST_COMPLETED 任何一個future_task執(zhí)行完成時/取消時,改函數(shù)返回FIRST_EXCEPTION 任何一個future_task發(fā)生異常時,該函數(shù)返回,如果沒有異常發(fā)生,等同于ALL_COMPLETED ALL_COMPLETED 當所有的future_task執(zhí)行完畢返回。"""results = wait(future_task, return_when="FIRST_COMPLETED") #done = results[0]for d in done:print(d)concurrent.futures.as_completed(fs, timeout=None)
在多個 Future 實例上的迭代器將會被返回,這些 Future 實例由 fs 完成時產(chǎn)生。由 fs 返回的任何重復的 Future,都會被返回一次。里面保存的都是已經(jīng)執(zhí)行完成的 Future 對象。
from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)with Pool(max_workers=3) as executor:# 創(chuàng)建future任務(wù)future_task = [executor.submit(task,url) for url in URLS]for f in future_task:if f.running():print("%s is running"%str(f))for f in as_completed(future_task):try:ret = f.done()if ret:f_ret = f.result()print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))except Exception as e:f.cance()print(e)下面我們將學習?concurrent.futures?模塊中的類。concurrent.futures 基礎(chǔ)模塊是 executor 和 future。
使用示例代碼:
# -*- coding:utf-8 -*-import redis from redis import WatchError from concurrent.futures import ProcessPoolExecutorr = redis.Redis(host='127.0.0.1', port=6379)# 減庫存函數(shù), 循環(huán)直到減庫存完成 # 庫存充足, 減庫存成功, 返回True # 庫存不足, 減庫存失敗, 返回Falsedef reduce_stock():# python中redis事務(wù)是通過pipeline的封裝實現(xiàn)的with r.pipeline() as pipe:while True:try:# watch庫存鍵, multi后如果該key被其他客戶端改變, 事務(wù)操作會拋出WatchError異常pipe.watch('stock:count')count = int(pipe.get('stock:count'))if count > 0: # 有庫存# 事務(wù)開始pipe.multi()pipe.decr('stock:count')# 把命令推送過去# execute返回命令執(zhí)行結(jié)果列表, 這里只有一個decr返回當前值print(pipe.execute()[0])return Trueelse:return Falseexcept WatchError as ex:# 打印WatchError異常, 觀察被watch鎖住的情況print(ex)pipe.unwatch()def worker():while True:# 沒有庫存就退出if not reduce_stock():breakif __name__ == "__main__":# 設(shè)置庫存為100r.set("stock:count", 100)# 多進程模擬多個客戶端提交with ProcessPoolExecutor() as pool:for _ in range(10):pool.submit(worker)concurrent.futures 模塊詳解
1. Executor對象
class?concurrent.futures.Executor
Executor 是一個抽象類,它提供了異步執(zhí)行調(diào)用的方法。它不能直接使用,但可以通過它的兩個子類 ThreadPoolExecutor 或者 ProcessPoolExecutor 進行調(diào)用。
1.1 Executor.submit(fn, *args, **kwargs)
fn:需要異步執(zhí)行的函數(shù)
*args, **kwargs:fn 的參數(shù)
示例代碼:
# -*- coding:utf-8 -*- from concurrent import futuresdef test(num):import timereturn time.ctime(), numwith futures.ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(test, 1)print(future.result())線程池的基本使用
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagewith ThreadPoolExecutor(max_workers=5) as t: # 創(chuàng)建一個最大容納數(shù)量為5的線程池task1 = t.submit(spider, 1)task2 = t.submit(spider, 2) # 通過submit提交執(zhí)行的函數(shù)到線程池中task3 = t.submit(spider, 3)print(f"task1: {task1.done()}") # 通過done來判斷線程是否完成print(f"task2: {task2.done()}")print(f"task3: {task3.done()}")time.sleep(2.5)print(f"task1: {task1.done()}")print(f"task2: {task2.done()}")print(f"task3: {task3.done()}")print(task1.result()) # 通過result來獲取返回值使用 with 語句 ,通過 ThreadPoolExecutor 構(gòu)造實例,同時傳入 max_workers 參數(shù)來設(shè)置線程池中最多能同時運行的線程數(shù)目。
使用 submit 函數(shù)來提交線程需要執(zhí)行的任務(wù)到線程池中,并返回該任務(wù)的句柄(類似于文件、畫圖),注意 submit() 不是阻塞的,而是立即返回。
通過使用 done() 方法判斷該任務(wù)是否結(jié)束。上面的例子可以看出,提交任務(wù)后立即判斷任務(wù)狀態(tài),顯示四個任務(wù)都未完成。在延時2.5后,task1 和 task2 執(zhí)行完畢,task3 仍在執(zhí)行中。
使用 result() 方法可以獲取任務(wù)的返回值。
wait(fs, timeout=None, return_when=ALL_COMPLETED)
wait 接受三個參數(shù):
? ? ? ? fs: 表示需要執(zhí)行的序列
? ? ? ? timeout: 等待的最大時間,如果超過這個時間即使線程未執(zhí)行完成也將返回
? ? ? ? return_when:表示wait返回結(jié)果的條件,默認為 ALL_COMPLETED 全部執(zhí)行完成再返回
示例代碼:
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagewith ThreadPoolExecutor(max_workers=5) as t:all_task = [t.submit(spider, page) for page in range(1, 5)]wait(all_task, return_when=FIRST_COMPLETED)print('finished')print(wait(all_task, timeout=2.5))as_completed
上面雖然提供了判斷任務(wù)是否結(jié)束的方法,但是不能在主線程中一直判斷啊。最好的方法是當某個任務(wù)結(jié)束了,就給主線程返回結(jié)果,而不是一直判斷每個任務(wù)是否結(jié)束。
ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個方法,當子線程中的任務(wù)執(zhí)行完后,直接用 result() 獲取返回結(jié)果
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor, as_completed import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagedef main():with ThreadPoolExecutor(max_workers=5) as t:obj_list = []for page in range(1, 5):obj = t.submit(spider, page)obj_list.append(obj)for future in as_completed(obj_list):data = future.result()print(f"main: {data}")as_completed() 方法是一個生成器,在沒有任務(wù)完成的時候,會一直阻塞,除非設(shè)置了 timeout。
當有某個任務(wù)完成的時候,會 yield 這個任務(wù),就能執(zhí)行 for 循環(huán)下面的語句,然后繼續(xù)阻塞住,循環(huán)到所有的任務(wù)結(jié)束。同時,先完成的任務(wù)會先返回給主線程
map(fn, *iterables, timeout=None)
? ? fn: 第一個參數(shù) fn 是需要線程執(zhí)行的函數(shù);
? ? iterables:第二個參數(shù)接受一個可迭代對象;
? ? timeout: 第三個參數(shù) timeout 跟 wait() 的 timeout 一樣,
?? ? ? ? ? ? ?但由于 map 是返回線程執(zhí)行的結(jié)果,
?? ??? ??? ? ?如果 timeout小于線程執(zhí)行時間會拋異常 TimeoutError。
用法如下:
import time from concurrent.futures import ThreadPoolExecutordef spider(page):time.sleep(page)return pagestart = time.time() executor = ThreadPoolExecutor(max_workers=4)i = 1 for result in executor.map(spider, [2, 3, 1, 4]):print("task{}:{}".format(i, result))i += 1使用 map 方法,無需提前使用 submit 方法,map 方法與 python 高階函數(shù) map 的含義相同,都是將序列中的每個元素都執(zhí)行同一個函數(shù)。
上面的代碼對列表中的每個元素都執(zhí)行 spider() 函數(shù),并分配各線程池。
可以看到執(zhí)行結(jié)果與上面的 as_completed() 方法的結(jié)果不同,輸出順序和列表的順序相同,就算 1s 的任務(wù)先執(zhí)行完成,也會先打印前面提交的任務(wù)返回的結(jié)果。
1.2 Executor.map(func, *iterables, timeout=None)
相當于map(func, *iterables),但是func是異步執(zhí)行。timeout的值可以是int或float,如果操作超時,會返回raisesTimeoutError;如果不指定timeout參數(shù),則不設(shè)置超時間。
func:需要異步執(zhí)行的函數(shù)
*iterables:可迭代對象,如列表等。每一次func執(zhí)行,都會從iterables中取參數(shù)。
timeout:設(shè)置每次異步操作的超時時間
示例代碼:
# -*- coding:utf-8 -*- from concurrent import futuresdef test(num):import timereturn time.ctime(), numdata = [1, 2, 3] with futures.ThreadPoolExecutor(max_workers=1) as executor:for future in executor.map(test, data):print(future)1.3 Executor.shutdown(wait=True)
釋放系統(tǒng)資源,在Executor.submit()或 Executor.map()等異步操作后調(diào)用。使用with語句可以避免顯式調(diào)用此方法。
2. ThreadPoolExecutor對象
ThreadPoolExecutor類是Executor子類,使用線程池執(zhí)行異步調(diào)用.
class concurrent.futures.ThreadPoolExecutor(max_workers),使用 max_workers 數(shù)目的線程池執(zhí)行異步調(diào)用
python3標準庫concurrent.futures比原Thread封裝更高,多線程concurrent.futures.ThreadPoolExecutor,多進程concurrent.futures.ProcessPoolExecutor
利用concurrent.futures.Future來進行各種便捷的數(shù)據(jù)交互,包括處理異常,都在result()中再次拋出。
示例代碼:
import time from concurrent import futures from concurrent.futures import ThreadPoolExecutordef display(args):print(time.strftime('[%H:%M:%S]', time.localtime()), end=' ')print(args)def task(n):"""只是休眠"""display('begin sleep {}s.'.format(n))time.sleep(n)display('ended sleep {}s.'.format(n))def do_many_task_inorder():"""多線程按任務(wù)發(fā)布順序依次等待完成"""tasks = [5, 4, 3, 2, 1]with ThreadPoolExecutor(max_workers=3) as executor:future_list = [executor.submit(task, arg) for arg in tasks]display('非阻塞運行')for future in future_list:display(future)display('統(tǒng)一結(jié)束(有序)')for future in future_list:display(future.result())def do_many_task_disorder():"""多線程執(zhí)行先完成先顯示"""tasks = [5, 4, 3, 2, 1]with ThreadPoolExecutor(max_workers=3) as executor:future_list = [executor.submit(task, arg) for arg in tasks]display('非阻塞運行')for future in future_list:display(future)display('統(tǒng)一結(jié)束(無序)')done_iter = futures.as_completed(future_list) # generatorfor done in done_iter:display(done)if __name__ == '__main__':do_many_task_inorder()do_many_task_disorder()3. ProcessPoolExecutor對象
ThreadPoolExecutor類是Executor子類,使用進程池執(zhí)行異步調(diào)用.
class concurrent.futures.ProcessPoolExecutor(max_workers=None),使用 max_workers數(shù)目的進程池執(zhí)行異步調(diào)用,如果max_workers為None則使用機器的處理器數(shù)目(如4核機器max_worker配置為None時,則使用4個進程進行異步并發(fā))。
示例代碼:
# -*- coding:utf-8 -*- from concurrent import futuresdef test(num):import timereturn time.ctime(), numdef muti_exec(m, n):# m 并發(fā)次數(shù)# n 運行次數(shù)with futures.ProcessPoolExecutor(max_workers=m) as executor: # 多進程# with futures.ThreadPoolExecutor(max_workers=m) as executor: #多線程executor_dict = dict((executor.submit(test, times), times) for times in range(m * n))for future in futures.as_completed(executor_dict):times = executor_dict[future]if future.exception() is not None:print('%r generated an exception: %s' % (times, future.exception()))else:print('RunTimes:%d,Res:%s' % (times, future.result()))if __name__ == '__main__':muti_exec(5, 1)調(diào)度單個任務(wù)
執(zhí)行者類Executor調(diào)度單個任務(wù),使用submit() 函數(shù),然后用返回的 Future 實例等待任務(wù)結(jié)果。
Executor 是一個 Python concurrent.futures?模塊的抽象類。 它不能直接使用,我們需要使用以下具體子類之一 -
- ThreadPoolExecutor:線程池
- ProcessPoolExecutor:進程池
示例代碼:
from concurrent import futures import time import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3) future = executor.submit(task, 5) print('future: {}'.format(future)) result = future.result() print('result: {}'.format(result))線程池?和 進程池
如何創(chuàng)建一個 ThreadPoolExecutor 或者 ProcessPoolExecutor?
? ? ? ? 在concurrent.futures模塊及其具體子類Executor的幫助下,可以很容易地創(chuàng)建一個線程池或者進程池。 需要使用我們想要的池中的線程數(shù)構(gòu)造一個ThreadPoolExecutor 或者?ProcessPoolExecutor。 默認情況下,數(shù)字是5。然后可以提交一個任務(wù)到線程池或者進程池。 當submit()任務(wù)時,會返回Future對象。 Future對象有一個名為done()的方法,它告訴Future是否已經(jīng)解決。 有了這個,為這個特定的Future對象設(shè)定了一個值。 當任務(wù)完成時,線程池執(zhí)行器將該值設(shè)置為Future的對象。
線程池 示例代碼:
from concurrent.futures import ThreadPoolExecutor from time import sleepdef task(message):sleep(2)return messagedef main():executor = ThreadPoolExecutor(5)future = executor.submit(task, "Completed")print(future.done())sleep(2)print(future.done())print(future.result())if __name__ == '__main__':main()結(jié)果截圖:
在上面的例子中,一個ThreadPoolExecutor已經(jīng)由5個線程構(gòu)造而成。 然后,在提供消息之前等待2秒的任務(wù)被提交給線程池執(zhí)行器。 從輸出中可以看出,任務(wù)直到2秒才完成,所以第一次調(diào)用done()將返回False。 2秒后,任務(wù)完成,我們通過調(diào)用result()方法得到future的結(jié)果。
進程池 示例代碼:
from concurrent.futures import ProcessPoolExecutor from time import sleepdef task(message):sleep(2)return messagedef main():executor = ProcessPoolExecutor(5)future = executor.submit(task, ("Completed"))print(future.done())sleep(2)print(future.done())print(future.result())if __name__ == '__main__':main()實例化ThreadPoolExecutor 或者?ProcessPoolExecutor? 之?上下文管理器
另一種實例化ThreadPoolExecutor的方法是在上下文管理器的幫助下完成的。 它的工作方式與上例中使用的方法類似。 使用上下文管理器的主要優(yōu)點是它在語法上看起來不錯。 實例化可以在下面的代碼的幫助下完成
示例
以下示例是從 Python 文檔借用的。 在這個例子中,首先必須導入?concurrent.futures?模塊。 然后創(chuàng)建一個名為?load_url()的函數(shù),它將加載請求的url。 然后該函數(shù)用池中的5個線程創(chuàng)建?ThreadPoolExecutor。 ThreadPoolExecutor?已被用作上下文管理器。 我們可以通過調(diào)用?result()方法來獲得?future的結(jié)果。
import concurrent.futures import urllib.requestURLS = ['http://www.foxnews.com/','https://www.yiibai.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/' ]def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))以下將是上面的Python腳本的輸出 -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed> 'http://www.foxnews.com/' page is 229313 bytes 'http://www.yiibai.com/' page is 168933 bytes 'http://www.bbc.co.uk/' page is 283893 bytes 'http://europe.wsj.com/' page is 938109 bytes進程池:
import concurrent.futures from concurrent.futures import ProcessPoolExecutor import urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()def main():with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))if __name__ == '__main__':main()使用 map() 調(diào)度多任務(wù),有序返回
使用map(),多個worker并發(fā)地從輸入迭代器里取數(shù)據(jù),處理,然后按順序返回結(jié)果。
示例代碼:
from concurrent import futures import time import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(task, range(1, 10)) print('unprocessed results: {}'.format(results)) real_results = list(results) print('real results: {}'.format(real_results))使用 Executor.map() 函數(shù)
Python map()函數(shù)廣泛用于許多任務(wù)。 一個這樣的任務(wù)是對可迭代內(nèi)的每個元素應(yīng)用某個函數(shù)。 同樣,可以將迭代器的所有元素映射到一個函數(shù),并將這些作為獨立作業(yè)提交到ThreadPoolExecutor之外。 考慮下面的Python腳本示例來理解函數(shù)的工作原理。
示例
在下面的示例中,map函數(shù)用于將square()函數(shù)應(yīng)用于values數(shù)組中的每個值。
以下將是上面的Python腳本的輸出 :
進程池:
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completedvalues = [2, 3, 4, 5]def square(n):return n * ndef main():with ProcessPoolExecutor(max_workers=3) as executor:results = executor.map(square, values)for result in results:print(result)if __name__ == '__main__':main()多任務(wù)調(diào)度,無序返回
不斷將任務(wù)submit到executor,返回future列表,使用as_completed無序產(chǎn)生每個任務(wù)的結(jié)果。
示例代碼:
from concurrent import futures import time import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3) future_list = [executor.submit(task, i) for i in range(1, 10)] for f in futures.as_completed(future_list):print(f.result())何時使用ProcessPoolExecutor 和 ThreadPoolExecutor ?
現(xiàn)在我們已經(jīng)學習了兩個Executor類 - ThreadPoolExecutor和ProcessPoolExecutor,我們需要知道何時使用哪個執(zhí)行器。需要在受CPU限制的工作負載情況下選擇ProcessPoolExecutor,而在受I/O限制的工作負載情況下則需要選擇ThreadPoolExecutor。
如果使用ProcessPoolExecutor,那么不需要擔心GIL,因為它使用多處理。 而且,與ThreadPoolExecution相比,執(zhí)行時間會更少。
總結(jié)
以上是生活随笔為你收集整理的Python3.2+ 的 concurrent.futures 模块,利用 multiprocessing 实现高并发。的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: frida hook so层、proto
- 下一篇: Lambda 表达式详解~简化匿名内部类