python 管道队列_Python多处理 – 管道与队列
> A
Pipe()只能有兩個端點。
> A
Queue()可以有多個生產者和消費者。
什么時候使用它們
如果你需要兩點以上的溝通,使用Queue()。
如果你需要絕對的性能,Pipe()是快得多,因為Queue()是建立在Pipe()之上。
性能基準
讓我們假設您想生成兩個進程,并盡可能快地在它們之間發送消息。這些是在使用Pipe()和Queue()的類似測試之間的拖動比賽的計時結果…這是在運行Ubuntu 11.10和Python 2.7.2的ThinkpadT61。
FYI,我把JoinableQueue()的結果作為獎金;調用queue.task_done()時,JoinableQueue()用于處理任務,它甚至不知道特定任務,它只計算隊列中未完成的任務),因此queue.join()知道工作已完成。
此答案底部的每個代碼…
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
總之,Pipe()大約比Queue()快三倍。甚至不要考慮JoinableQueue(),除非你真的必須有好處。
獎金材料2
多處理引入信息流中的細微變化,使調試困難,除非你知道一些快捷方式。例如,您可能有一個腳本在許多條件下通過字典編制索引時工作正常,但很少在某些輸入下失敗。
通常,當整個python進程崩潰時,我們得到失敗的線索;但是,如果多處理函數崩潰,您不會收到未經請求的崩潰回溯到控制臺。跟蹤未知的多處理崩潰是很困難的,沒有關于什么崩潰的過程的線索。
我發現跟蹤多處理崩潰信息的最簡單的方法是將整個多處理函數包裝在try / except中,并使用traceback.print_exc():
import traceback
def reader(args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
現在,當你發現一個崩潰,你會看到:
FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(task_q, result_q)
File "foo.py", line 46, in run
raise ValueError
ValueError
源代碼:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader(pipe):
output_p, input_p = pipe
input_p.close() # We are only reading
while True:
try:
msg = output_p.recv() # Read from the output pipe and do nothing
except EOFError:
break
def writer(count, input_p):
for ii in xrange(0, count):
input_p.send(ii) # Write 'count' numbers into the input pipe
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
output_p, input_p = Pipe()
reader_p = Process(target=reader, args=((output_p, input_p),))
reader_p.start() # Launch the reader process
output_p.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, input_p) # Send a lot of stuff to reader()
input_p.close() # Ask the reader to stop when it reads EOF
reader_p.join()
print "Sending %s numbers to Pipe() took %s seconds" % (count,
(time.time() - _start))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
def reader(queue):
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = Queue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print "Sending %s numbers to Queue() took %s seconds" % (count,
(time.time() - _start))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader(queue):
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = JoinableQueue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
queue.join() # Wait for the reader to finish
print "Sending %s numbers to JoinableQueue() took %s seconds" % (count,
(time.time() - _start))
總結
以上是生活随笔為你收集整理的python 管道队列_Python多处理 – 管道与队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LINK : warning LNK40
- 下一篇: 周末送新书 | 世界名校数据挖掘经典《斯