python守护进程进程池_Python3标准库:multiprocessing像线程一样管理进程
Python
Python開發(fā)
Python語言
Python3標(biāo)準(zhǔn)庫:multiprocessing像線程一樣管理進程
1. multiprocessing像線程一樣管理進程
multiprocessing模塊包含一個API,它基于threadingAPI,可以把工作劃分到多個進程。有些情況下,multiprocessing可以作為臨時替換取代threading來利用多個CPU內(nèi)核,相應(yīng)地避免Python全局解釋器鎖所帶來的計算瓶頸。
由于multiprocessing與threading模塊的這種相似性,這里的前幾個例子都是從threading例子修改得來。后面會介紹multiprocessing中有但threading未提供的特性。
1.1?multiprocessing基礎(chǔ)
要創(chuàng)建第二個進程,最簡單的方法是用一個目標(biāo)函數(shù)實例化一個Process對象,然后調(diào)用start()讓它開始工作。
importmultiprocessingdefworker():"""worker function"""
print('Worker')if __name__ == '__main__':
jobs=[]for i in range(5):
p= multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
輸出中單詞“Worker”將打印5次,不過取決于具體的執(zhí)行順序,無法清楚地看出孰先孰后,這是因為每個進程都在競爭訪問輸出流。
大多數(shù)情況下,更有用的做法是,在創(chuàng)建一個進程時提供參數(shù)來告訴它要做什么。與threading不同,要向一個multiprocessing Process傳遞參數(shù),這個參數(shù)必須能夠用pickle串行化。下面這個例子向各個工作進程傳遞一個要打印的數(shù)。
importmultiprocessingdefworker(num):"""thread worker function"""
print('Worker:', num)if __name__ == '__main__':
jobs=[]for i in range(5):
p= multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
現(xiàn)在整數(shù)參數(shù)會包含在各個工作進程打印的消息中。
1.2 可導(dǎo)入的目標(biāo)函數(shù)
threading與multiprocessing例子之間有一個區(qū)別,multiprocessing例子中對main使用了額外的保護。基于啟動新進程的方式,要求子進程能夠?qū)氚繕?biāo)函數(shù)的腳本??梢园褢?yīng)用的主要部分包裝在一個__main_檢查中,確保模塊導(dǎo)入時不會在各個子進程中遞歸地運行。另一種方法是從一個單獨的腳本導(dǎo)入目標(biāo)函數(shù)。
importmultiprocessingdefworker():"""worker function"""
print('Worker')return
if __name__ == '__main__':
jobs=[]for i in range(5):
p=multiprocessing.Process(
target=worker,
)
jobs.append(p)
p.start()
調(diào)用主程序會生成與第一個例子類似的輸出。
1.3 確定當(dāng)前進程
通過傳遞參數(shù)來標(biāo)識或命名進程很麻煩,也沒有必要。每個Process實例都有一個名,可以在創(chuàng)建進程時改變它的默認值。對進程命名對于跟蹤進程很有用,特別是如果應(yīng)用中有多種類型的進程在同時運行。
importmultiprocessingimporttimedefworker():
name=multiprocessing.current_process().nameprint(name, 'Starting')
time.sleep(2)print(name, 'Exiting')defmy_service():
name=multiprocessing.current_process().nameprint(name, 'Starting')
time.sleep(3)print(name, 'Exiting')if __name__ == '__main__':
service=multiprocessing.Process(
name='my_service',
target=my_service,
)
worker_1=multiprocessing.Process(
name='worker 1',
target=worker,
)
worker_2= multiprocessing.Process( #default name
target=worker,
)
worker_1.start()
worker_2.start()
service.start()
調(diào)試輸出中,每行都包含當(dāng)前進程的名。進程名列為Process-3的行對應(yīng)未命名的
進程worker_1。
1.4 守護進程
默認地,在所有子進程退出之前主程序不會退出。有些情況下,可能需要啟動一個后臺進程,它可以一直運行而不阻塞主程序退出,如果一個服務(wù)無法用一種容易的方法中斷進程,或者希望進程工作到一半時中止而不損失或破壞數(shù)據(jù)(例如為一個服務(wù)監(jiān)控工具生成“心跳”的任務(wù)),那么對于這些服務(wù),使用守護進程就很有用。
要標(biāo)志一個進程為守護進程,可以將其daemon屬性設(shè)置為True。默認情況下進程不作為守護進程。
importmultiprocessingimporttimeimportsysdefdaemon():
p=multiprocessing.current_process()print('Starting:', p.name, p.pid)
sys.stdout.flush()
time.sleep(2)print('Exiting :', p.name, p.pid)
sys.stdout.flush()defnon_daemon():
p=multiprocessing.current_process()print('Starting:', p.name, p.pid)
sys.stdout.flush()print('Exiting :', p.name, p.pid)
sys.stdout.flush()if __name__ == '__main__':
d=multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon=True
n=multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon=False
d.start()
time.sleep(1)
n.start()
輸出中沒有守護進程的“Exiting”消息,因為在守護進程從其2秒的睡眠時間喚醒之前,所有非守護進程(包括主程序)已經(jīng)退出。
守護進程會在主程序退出之前自動終止,以避免留下“孤”進程繼續(xù)運行。要驗證這一點,可以查找程序運行時打印的進程ID值,然后用一個類似ps的命令檢查該進程。
1.5 等待進程
要等待一共進程完成工作并退出,可以使用join()方法。
importmultiprocessingimporttimedefdaemon():
name=multiprocessing.current_process().nameprint('Starting:', name)
time.sleep(2)print('Exiting :', name)defnon_daemon():
name=multiprocessing.current_process().nameprint('Starting:', name)print('Exiting :', name)if __name__ == '__main__':
d=multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon=True
n=multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon=False
d.start()
time.sleep(1)
n.start()
d.join()
n.join()
由于主進程使用join()等待守護進程退出,所以這一次會打印“Exiting”消息。
默認地,join()會無限阻塞??梢韵蜻@個模塊傳入一個超時參數(shù)(這是一個浮點數(shù),表示在進程變?yōu)椴换顒又八却拿霐?shù))。即使進程在這個超時期限內(nèi)沒有完成,join()也會返回。
importmultiprocessingimporttimedefdaemon():
name=multiprocessing.current_process().nameprint('Starting:', name)
time.sleep(2)print('Exiting :', name)defnon_daemon():
name=multiprocessing.current_process().nameprint('Starting:', name)print('Exiting :', name)if __name__ == '__main__':
d=multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon=True
n=multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon=False
d.start()
n.start()
d.join(1)print('d.is_alive()', d.is_alive())
n.join()
由于傳入的超時值小于守護進程睡眠的時間,所以join()返回之后這個進程仍"活著"。
1.6 終止進程
盡管最好使用“毒藥”(poison pill)方法向進程發(fā)出信號,告訴它應(yīng)當(dāng)退出,但是如果一個進程看起來經(jīng)掛起或陷入死鎖,那么能夠強制性地將其結(jié)束會很有用。對一個進程對象調(diào)用terminate()會結(jié)束子進程。
importmultiprocessingimporttimedefslow_worker():print('Starting worker')
time.sleep(0.1)print('Finished worker')if __name__ == '__main__':
p= multiprocessing.Process(target=slow_worker)print('BEFORE:', p, p.is_alive())
p.start()print('DURING:', p, p.is_alive())
p.terminate()print('TERMINATED:', p, p.is_alive())
p.join()print('JOINED:', p, p.is_alive())
1.7 進程退出狀態(tài)
進程退出時生成的狀態(tài)碼可以通過exitcode屬性訪問。下表列出了這個屬性的可取值范圍。
退出碼含義==?0
沒有產(chǎn)生錯誤
>?0
進程有一個錯誤,并以該錯誤碼退出
進程以一個-1 * exitcode
importmultiprocessingimportsysimporttimedefexit_error():
sys.exit(1)defexit_ok():return
defreturn_value():return 1
defraises():raise RuntimeError('There was an error!')defterminated():
time.sleep(3)if __name__ == '__main__':
jobs=[]
funcs=[
exit_error,
exit_ok,
return_value,
raises,
terminated,
]for f infuncs:print('Starting process for', f.__name__)
j= multiprocessing.Process(target=f, name=f.__name__)
jobs.append(j)
j.start()
jobs[-1].terminate()for j injobs:
j.join()print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
產(chǎn)生異常的進程會自動得到exitcode為1。
1.8 日志
調(diào)試并發(fā)問題時,如果能夠訪問multiprocessing所提供對象的內(nèi)部狀態(tài),那么這會很有用??梢允褂靡粋€方便的模塊級函數(shù)啟用日志記錄,名為log_to_stderr()。它使用logging建立一個日志記錄器對象,并增加一個處理器,使日志消息被發(fā)送到標(biāo)準(zhǔn)錯誤通道。
importmultiprocessingimportloggingimportsysdefworker():print('Doing some work')
sys.stdout.flush()if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p= multiprocessing.Process(target=worker)
p.start()
p.join()
默認的,日志級別被設(shè)置為NOTSET,即不產(chǎn)生任何消息。通過傳入一個不同的日志級別,可以初始化日志記錄器并指定所需的詳細程度。
若要直接處理日志記錄器(修改其日志級別或增加處理器),可以使用get_logger()。
importmultiprocessingimportloggingimportsysdefworker():print('Doing some work')
sys.stdout.flush()if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger=multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p= multiprocessing.Process(target=worker)
p.start()
p.join()
使用名multiprocessing,還可以通過logging配置文件API來配置日志記錄器。
1.9 派生進程
要在一個單獨的進程中開始工作,盡管最簡單的方法是使用Process并傳人一個目標(biāo)函數(shù),但也可以使用一個定制子類。
importmultiprocessingclassWorker(multiprocessing.Process):defrun(self):print('In {}'.format(self.name))return
if __name__ == '__main__':
jobs=[]for i in range(5):
p=Worker()
jobs.append(p)
p.start()for j injobs:
j.join()
派生類應(yīng)當(dāng)覆蓋run()以完成工作。
1.10 向進程傳遞消息
類似于線程,對于多個進程,一種常見的使用模式是將一個工作劃分到多個工作進程中并行地運行。要想有效地使用多個進程,通常要求它們之間有某種通信,這樣才能分解工作,并完成結(jié)果的聚集。利用multiprocessing完成進程間通信的一種簡單方法是使用一個Queue來回傳遞消息。能夠用pickle串行化的任何對象都可以通過Queue傳遞。
importmultiprocessingclassMyFancyClass:def __init__(self, name):
self.name=namedefdo_something(self):
proc_name=multiprocessing.current_process().nameprint('Doing something fancy in {} for {}!'.format(
proc_name, self.name))defworker(q):
obj=q.get()
obj.do_something()if __name__ == '__main__':
queue=multiprocessing.Queue()
p= multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))#Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
這個小例子只是向一個工作進程傳遞一個消息,然后主進程等待這個工作進程完成。
來看一個更復(fù)雜的例子,這里展示了如何管理多個工作進程,它們都消費一個JoinableQueue的數(shù)據(jù),并把結(jié)果傳遞回父進程。這里使用“毒藥”技術(shù)來停止工作進程。建立具體任務(wù)后,主程序會在作業(yè)隊列中為每個工作進程增加一個“stop”值。當(dāng)一個工作進程遇到這個特定值時,就會退出其處理循環(huán)。主進程使用任務(wù)隊列的join()方法等待所有任務(wù)都完成后才開始處理結(jié)果。
importmultiprocessingimporttimeclassConsumer(multiprocessing.Process):def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue=task_queue
self.result_queue=result_queuedefrun(self):
proc_name=self.namewhileTrue:
next_task=self.task_queue.get()if next_task isNone:#Poison pill means shutdown
print('{}: Exiting'.format(proc_name))
self.task_queue.task_done()break
print('{}: {}'.format(proc_name, next_task))
answer=next_task()
self.task_queue.task_done()
self.result_queue.put(answer)classTask:def __init__(self, a, b):
self.a=a
self.b=bdef __call__(self):
time.sleep(0.1) #pretend to take time to do the work
return '{self.a} * {self.b} = {product}'.format(
self=self, product=self.a *self.b)def __str__(self):return '{self.a} * {self.b}'.format(self=self)if __name__ == '__main__':#Establish communication queues
tasks =multiprocessing.JoinableQueue()
results=multiprocessing.Queue()#Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print('Creating {} consumers'.format(num_consumers))
consumers=[
Consumer(tasks, results)for i inrange(num_consumers)
]for w inconsumers:
w.start()#Enqueue jobs
num_jobs = 10
for i inrange(num_jobs):
tasks.put(Task(i, i))#Add a poison pill for each consumer
for i inrange(num_consumers):
tasks.put(None)#Wait for all of the tasks to finish
tasks.join()#Start printing results
whilenum_jobs:
result=results.get()print('Result:', result)
num_jobs-= 1
盡管作業(yè)按順序進入隊列,但它們的執(zhí)行卻是并行的,所以不能保證它們完成的順序。
1.11 進程間信號傳輸
Event類提供了一種簡單的方法,可以在進程之間傳遞狀態(tài)信息。事件可以在設(shè)置狀態(tài)和未設(shè)置狀態(tài)之間切換。通過使用一個可選的超時值,事件對象的用戶可以等待其狀態(tài)從未設(shè)置變?yōu)樵O(shè)置。
importmultiprocessingimporttimedefwait_for_event(e):"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()print('wait_for_event: e.is_set()->', e.is_set())defwait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)print('wait_for_event_timeout: e.is_set()->', e.is_set())if __name__ == '__main__':
e=multiprocessing.Event()
w1=multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2=multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()print('main: event is set')
wait()到時間時就會返回,而且沒有任何錯誤。調(diào)用者負責(zé)使用is_set()檢查事件的狀態(tài)。
1.12 控制資源訪問
如果需要在多個進程間共享一個資源,那么在這種情況下,可以使用一個Lock來避免訪問沖突。
importmultiprocessingdefworker_with(lock, f):
with lock:
fs= open(f, "a+")
fs.write('Lock acquired via withn')
fs.close()defworker_no_with(lock, f):
lock.acquire()try:
fs= open(f, "a+")
fs.write('Lock acquired directlyn')
fs.close()finally:
lock.release()if __name__ == "__main__":
f= "file.txt"lock=multiprocessing.Lock()
w= multiprocessing.Process(target=worker_with, args=(lock, f))
nw= multiprocessing.Process(target=worker_no_with, args=(lock, f))
w.start()
nw.start()
w.join()
nw.join()
在這個例子中,如果這兩個進程沒有用鎖同步其輸出流訪問,那么打印到控制臺的消息可能會糾結(jié)在一起。
1.13 同步操作
可以用Condition對象來同步一個工作流的各個部分,使其中一些部分并行運行,而另外一些順序運行,即使它們在不同的進程中。
importmultiprocessingimporttimedefstage_1(cond):"""perform first stage of work,
then notify stage_2 to continue"""name=multiprocessing.current_process().nameprint('Starting', name)
with cond:print('{} done and ready for stage 2'.format(name))
cond.notify_all()defstage_2(cond):"""wait for the condition telling us stage_1 is done"""name=multiprocessing.current_process().nameprint('Starting', name)
with cond:
cond.wait()print('{} running'.format(name))if __name__ == '__main__':
condition=multiprocessing.Condition()
s1= multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients=[
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)for i in range(1, 3)
]for c ins2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()for c ins2_clients:
c.join()
在這個例子,兩個進程并行的運行一個作業(yè)的第二階段,但前提是第一階段已經(jīng)完成。
1.14 控制資源的并發(fā)訪問
有時可能需要允許多個工作進程同時訪問一個資源,但要限制總數(shù)。這時候我們就可以使用Semaphore來管理。
importmultiprocessingimporttimedefworker(s, i):
s.acquire()print(multiprocessing.current_process().name + "acquire")
time.sleep(i)print(multiprocessing.current_process().name + "release")
s.release()if __name__ == "__main__":
s= multiprocessing.Semaphore(2)for i in range(5):
p= multiprocessing.Process(target=worker, args=(s, i * 2))
p.start()
1.15 管理共享狀態(tài)
Manager負責(zé)協(xié)調(diào)其所有用戶之間共享的信息狀態(tài)。
importmultiprocessingdefworker(d, key, value):
d[key]=valueif __name__ == '__main__':
mgr=multiprocessing.Manager()
d=mgr.dict()
jobs=[
multiprocessing.Process(
target=worker,
args=(d, i, i * 2),
)for i in range(10)
]for j injobs:
j.start()for j injobs:
j.join()print('Results:', d)
因為這個列表是通過管理器創(chuàng)建的,所以它會由所有進程共享,所有進程都能看到這個列表的更新。除了列表,管理器還支持字典。
1.16 共享命名空間
除了字典和列表,Manager還可以創(chuàng)建一個共享Namespace。
importmultiprocessingdefproducer(ns, event):
ns.value= 'This is the value'event.set()defconsumer(ns, event):try:print('Before event: {}'.format(ns.value))exceptException as err:print('Before event, error:', str(err))
event.wait()print('After event:', ns.value)if __name__ == '__main__':
mgr=multiprocessing.Manager()
namespace=mgr.Namespace()
event=multiprocessing.Event()
p=multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c=multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
增加到Namespace的所有命名值對所有接收Namespace實例的客戶都可見。
對命名空間中可變值內(nèi)容的更新不會自動傳播。
importmultiprocessingdefproducer(ns, event):#DOES NOT UPDATE GLOBAL VALUE!
ns.my_list.append('This is the value')
event.set()defconsumer(ns, event):print('Before event:', ns.my_list)
event.wait()print('After event :', ns.my_list)if __name__ == '__main__':
mgr=multiprocessing.Manager()
namespace=mgr.Namespace()
namespace.my_list=[]
event=multiprocessing.Event()
p=multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c=multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
要更新這個列表,需要將它再次關(guān)聯(lián)到命名空間對象。
1.17 進程池
有些情況下,所要完成的工作可以分解并獨立地分布到多個工作進程,對于這種簡單的情況,可以用Pool類來管理固定數(shù)目的工作進程。會收集各個作業(yè)的返回值并作為一個列表返回。池(pool)參數(shù)包括進程數(shù)以及啟動任務(wù)進程時要運行的函數(shù)(對每個子進程調(diào)用一次)。
importmultiprocessingdefdo_calculation(data):return data * 2
defstart_process():print('Starting', multiprocessing.current_process().name)if __name__ == '__main__':
inputs= list(range(10))print('Input :', inputs)
builtin_outputs=list(map(do_calculation, inputs))print('Built-in:', builtin_outputs)
pool_size= multiprocessing.cpu_count() * 2pool=multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
)
pool_outputs=pool.map(do_calculation, inputs)
pool.close()#no more tasks
pool.join() #wrap up current tasks
print('Pool :', pool_outputs)
map()方法的結(jié)果在功能上等價于內(nèi)置map()的結(jié)果,只不過各個任務(wù)會并行運行。由于進程池并行地處理輸入,可以用close()和join()使任務(wù)進程與主進程同步,以確保完成適當(dāng)?shù)那謇怼?/p>
默認的,Pool會創(chuàng)建固定數(shù)目的工作進程,并向這些工作進程傳遞作業(yè),直到再沒有更多作業(yè)為止。設(shè)置maxtasksperchild參數(shù)可以告訴池在完成一些任務(wù)之后要重新啟動一個工作進程,來避免長時間運行的工作進程消耗更多的系統(tǒng)資源。
importmultiprocessingdefdo_calculation(data):return data * 2
defstart_process():print('Starting', multiprocessing.current_process().name)if __name__ == '__main__':
inputs= list(range(10))print('Input :', inputs)
builtin_outputs=list(map(do_calculation, inputs))print('Built-in:', builtin_outputs)
pool_size= multiprocessing.cpu_count() * 2pool=multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
maxtasksperchild=2,
)
pool_outputs=pool.map(do_calculation, inputs)
pool.close()#no more tasks
pool.join() #wrap up current tasks
print('Pool :', pool_outputs)
池完成其分配的任務(wù)時,即使并沒有更多工作要做,也會重新啟動工作進程。從下面的輸出可以看到,盡管只有10個任務(wù),而且每個工作進程一次可以完成兩個任務(wù),但是這里創(chuàng)建了8個工作進程。
內(nèi)容來源于網(wǎng)絡(luò),如有侵權(quán)請聯(lián)系客服刪除
總結(jié)
以上是生活随笔為你收集整理的python守护进程进程池_Python3标准库:multiprocessing像线程一样管理进程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vb6 datagrid表格垂直居中_W
- 下一篇: 公网ip判断_银行客户生产网和办公网知识