Python 爬虫进阶六之多进程的用法
python 中的多線程其實并不是真正的多線程,并不能做到充分利用多核 CPU 資源。 如果想要充分利用,在 python 中大部分情況需要使用多進程,那么這個包就叫做 multiprocessing。 借助它,可以輕松完成從單進程到并發執行的轉換。multiprocessing 支持子進程、通信和共享數據、執行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等組件。 那么本節要介紹的內容有:
- Process
- Lock
- Semaphore
- Queue
- Pipe
- Pool
Process
基本使用
在 multiprocessing 中,每一個進程都用一個 Process 類來表示。首先看下它的 API
Process([group [, target [, name [, args [, kwargs]]]]])- target 表示調用對象,你可以傳入方法的名字
- args 表示被調用對象的位置參數元組,比如 target 是函數 a,他有兩個參數 m,n,那么 args 就傳入 (m, n) 即可
- kwargs 表示調用對象的字典
- name 是別名,相當于給這個進程取一個名字
- group 分組,實際上不使用
我們先用一個實例來感受一下:
import multiprocessingdef process(num):print 'Process:', numif __name__ == '__main__':for i in range(5):p = multiprocessing.Process(target=process, args=(i,))p.start()最簡單的創建 Process 的過程如上所示,target 傳入函數名,args 是函數的參數,是元組的形式,如果只有一個參數,那就是長度為 1 的元組。 然后調用 start () 方法即可啟動多個進程了。 另外你還可以通過 cpu_count () 方法還有 active_children () 方法獲取當前機器的 CPU 核心數量以及得到目前所有的運行的進程。 通過一個實例來感受一下:
import multiprocessing import timedef process(num):time.sleep(num)print 'Process:', numif __name__ == '__main__':for i in range(5):p = multiprocessing.Process(target=process, args=(i,))p.start()print('CPU number:' + str(multiprocessing.cpu_count()))for p in multiprocessing.active_children():print('Child process name: ' + p.name + ' id: ' + str(p.pid))print('Process Ended')運行結果:
Process: 0 CPU number:8 Child process name: Process-2 id: 9641 Child process name: Process-4 id: 9643 Child process name: Process-5 id: 9644 Child process name: Process-3 id: 9642 Process Ended Process: 1 Process: 2 Process: 3 Process: 4自定義類
另外你還可以繼承 Process 類,自定義進程類,實現 run 方法即可。 用一個實例來感受一下:
from multiprocessing import Process import timeclass MyProcess(Process):def __init__(self, loop):Process.__init__(self)self.loop = loopdef run(self):for count in range(self.loop):time.sleep(1)print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))if __name__ == '__main__':for i in range(2, 5):p = MyProcess(i)p.start()在上面的例子中,我們繼承了 Process 這個類,然后實現了 run 方法。打印出來了進程號和參數。 運行結果:
Pid: 28116 LoopCount: 0 Pid: 28117 LoopCount: 0 Pid: 28118 LoopCount: 0 Pid: 28116 LoopCount: 1 Pid: 28117 LoopCount: 1 Pid: 28118 LoopCount: 1 Pid: 28117 LoopCount: 2 Pid: 28118 LoopCount: 2 Pid: 28118 LoopCount: 3可以看到,三個進程分別打印出了 2、3、4 條結果。 我們可以把一些方法獨立的寫在每個類里封裝好,等用的時候直接初始化一個類運行即可。
deamon
在這里介紹一個屬性,叫做 deamon。每個線程都可以單獨設置它的屬性,如果設置為 True,當父進程結束后,子進程會自動被終止。 用一個實例來感受一下,還是原來的例子,增加了 deamon 屬性:
from multiprocessing import Process import timeclass MyProcess(Process):def __init__(self, loop):Process.__init__(self)self.loop = loopdef run(self):for count in range(self.loop):time.sleep(1)print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))if __name__ == '__main__':for i in range(2, 5):p = MyProcess(i)p.daemon = Truep.start()print 'Main process Ended!'在這里,調用的時候增加了設置 deamon,最后的主進程(即父進程)打印輸出了一句話。 運行結果:
Main process Ended!結果很簡單,因為主進程沒有做任何事情,直接輸出一句話結束,所以在這時也直接終止了子進程的運行。 這樣可以有效防止無控制地生成子進程。如果這樣寫了,你在關閉這個主程序運行時,就無需額外擔心子進程有沒有被關閉了。 不過這樣并不是我們想要達到的效果呀,能不能讓所有子進程都執行完了然后再結束呢?那當然是可以的,只需要加入 join () 方法即可。
from multiprocessing import Process import timeclass MyProcess(Process):def __init__(self, loop):Process.__init__(self)self.loop = loopdef run(self):for count in range(self.loop):time.sleep(1)print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))if __name__ == '__main__':for i in range(2, 5):p = MyProcess(i)p.daemon = Truep.start()p.join()print 'Main process Ended!'在這里,每個子進程都調用了 join () 方法,這樣父進程(主進程)就會等待子進程執行完畢。 運行結果:
Pid: 29902 LoopCount: 0 Pid: 29902 LoopCount: 1 Pid: 29905 LoopCount: 0 Pid: 29905 LoopCount: 1 Pid: 29905 LoopCount: 2 Pid: 29912 LoopCount: 0 Pid: 29912 LoopCount: 1 Pid: 29912 LoopCount: 2 Pid: 29912 LoopCount: 3 Main process Ended!發現所有子進程都執行完畢之后,父進程最后打印出了結束的結果。
Lock
在上面的一些小實例中,你可能會遇到如下的運行結果:
什么問題?有的輸出錯位了。這是由于并行導致的,兩個進程同時進行了輸出,結果第一個進程的換行沒有來得及輸出,第二個進程就輸出了結果。所以導致這種排版的問題。 那這歸根結底是因為線程同時資源(輸出操作)而導致的。 那怎么來避免這種問題?那自然是在某一時間,只能一個進程輸出,其他進程等待。等剛才那個進程輸出完畢之后,另一個進程再進行輸出。這種現象就叫做 “互斥”。 我們可以通過 Lock 來實現,在一個進程輸出時,加鎖,其他進程等待。等此進程執行結束后,釋放鎖,其他進程可以進行輸出。 我們現用一個實例來感受一下:
首先看一下不加鎖的輸出結果:
```python Pid: 45755 LoopCount: 0 Pid: 45756 LoopCount: 0 Pid: 45757 LoopCount: 0 Pid: 45758 LoopCount: 0 Pid: 45759 LoopCount: 0 Pid: 45755 LoopCount: 1 Pid: 45756 LoopCount: 1 Pid: 45757 LoopCount: 1 Pid: 45758 LoopCount: 1 Pid: 45759 LoopCount: 1 Pid: 45755 LoopCount: 2Pid: 45756 LoopCount: 2Pid: 45757 LoopCount: 2 Pid: 45758 LoopCount: 2 Pid: 45759 LoopCount: 2 Pid: 45756 LoopCount: 3 Pid: 45755 LoopCount: 3 Pid: 45757 LoopCount: 3 Pid: 45758 LoopCount: 3 Pid: 45759 LoopCount: 3 Pid: 45755 LoopCount: 4 Pid: 45756 LoopCount: 4 Pid: 45757 LoopCount: 4 Pid: 45759 LoopCount: 4 Pid: 45758 LoopCount: 4 Pid: 45756 LoopCount: 5 Pid: 45755 LoopCount: 5 Pid: 45757 LoopCount: 5 Pid: 45759 LoopCount: 5 Pid: 45758 LoopCount: 5 Pid: 45756 LoopCount: 6Pid: 45755 LoopCount: 6Pid: 45757 LoopCount: 6 Pid: 45759 LoopCount: 6 Pid: 45758 LoopCount: 6 Pid: 45755 LoopCount: 7Pid: 45756 LoopCount: 7Pid: 45757 LoopCount: 7 Pid: 45758 LoopCount: 7 Pid: 45759 LoopCount: 7 Pid: 45756 LoopCount: 8Pid: 45755 LoopCount: 8Pid: 45757 LoopCount: 8 Pid: 45758 LoopCount: 8Pid: 45759 LoopCount: 8Pid: 45755 LoopCount: 9 Pid: 45756 LoopCount: 9 Pid: 45757 LoopCount: 9 Pid: 45758 LoopCount: 9 Pid: 45759 LoopCount: 9 Pid: 45756 LoopCount: 10 Pid: 45757 LoopCount: 10 Pid: 45758 LoopCount: 10 Pid: 45759 LoopCount: 10 Pid: 45757 LoopCount: 11 Pid: 45758 LoopCount: 11 Pid: 45759 LoopCount: 11 Pid: 45758 LoopCount: 12 Pid: 45759 LoopCount: 12 Pid: 45759 LoopCount: 13 可以看到有些輸出已經造成了影響。 然后我們對其加鎖:```python from multiprocessing import Process, Lock import timeclass MyProcess(Process):def __init__(self, loop, lock):Process.__init__(self)self.loop = loopself.lock = lockdef run(self):for count in range(self.loop):time.sleep(0.1)self.lock.acquire()print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))self.lock.release()if __name__ == '__main__':lock = Lock()for i in range(10, 15):p = MyProcess(i, lock)p.start()我們在 print 方法的前后分別添加了獲得鎖和釋放鎖的操作。這樣就能保證在同一時間只有一個 print 操作。 看一下運行結果:
Pid: 45889 LoopCount: 0 Pid: 45890 LoopCount: 0 Pid: 45891 LoopCount: 0 Pid: 45892 LoopCount: 0 Pid: 45893 LoopCount: 0 Pid: 45889 LoopCount: 1 Pid: 45890 LoopCount: 1 Pid: 45891 LoopCount: 1 Pid: 45892 LoopCount: 1 Pid: 45893 LoopCount: 1 Pid: 45889 LoopCount: 2 Pid: 45890 LoopCount: 2 Pid: 45891 LoopCount: 2 Pid: 45892 LoopCount: 2 Pid: 45893 LoopCount: 2 Pid: 45889 LoopCount: 3 Pid: 45890 LoopCount: 3 Pid: 45891 LoopCount: 3 Pid: 45892 LoopCount: 3 Pid: 45893 LoopCount: 3 Pid: 45889 LoopCount: 4 Pid: 45890 LoopCount: 4 Pid: 45891 LoopCount: 4 Pid: 45892 LoopCount: 4 Pid: 45893 LoopCount: 4 Pid: 45889 LoopCount: 5 Pid: 45890 LoopCount: 5 Pid: 45891 LoopCount: 5 Pid: 45892 LoopCount: 5 Pid: 45893 LoopCount: 5 Pid: 45889 LoopCount: 6 Pid: 45890 LoopCount: 6 Pid: 45891 LoopCount: 6 Pid: 45893 LoopCount: 6 Pid: 45892 LoopCount: 6 Pid: 45889 LoopCount: 7 Pid: 45890 LoopCount: 7 Pid: 45891 LoopCount: 7 Pid: 45892 LoopCount: 7 Pid: 45893 LoopCount: 7 Pid: 45889 LoopCount: 8 Pid: 45890 LoopCount: 8 Pid: 45891 LoopCount: 8 Pid: 45892 LoopCount: 8 Pid: 45893 LoopCount: 8 Pid: 45889 LoopCount: 9 Pid: 45890 LoopCount: 9 Pid: 45891 LoopCount: 9 Pid: 45892 LoopCount: 9 Pid: 45893 LoopCount: 9 Pid: 45890 LoopCount: 10 Pid: 45891 LoopCount: 10 Pid: 45892 LoopCount: 10 Pid: 45893 LoopCount: 10 Pid: 45891 LoopCount: 11 Pid: 45892 LoopCount: 11 Pid: 45893 LoopCount: 11 Pid: 45893 LoopCount: 12 Pid: 45892 LoopCount: 12 Pid: 45893 LoopCount: 13嗯,一切都沒問題了。 所以在訪問臨界資源時,使用 Lock 就可以避免進程同時占用資源而導致的一些問題。
Semaphore
信號量,是在進程同步過程中一個比較重要的角色。可以控制臨界資源的數量,保證各個進程之間的互斥和同步。 如果你學過操作系統,那么一定對這方面非常了解,如果你還不了解信號量是什么,可以參考 信號量解析 來了解一下它是做什么的。 那么接下來我們就用一個實例來演示一下進程之間利用 Semaphore 做到同步和互斥,以及控制臨界資源數量。
from multiprocessing import Process, Semaphore, Lock, Queue import timebuffer = Queue(10) empty = Semaphore(2) full = Semaphore(0) lock = Lock()class Consumer(Process):def run(self):global buffer, empty, full, lockwhile True:full.acquire()lock.acquire()buffer.get()print('Consumer pop an element')time.sleep(1)lock.release()empty.release()class Producer(Process):def run(self):global buffer, empty, full, lockwhile True:empty.acquire()lock.acquire()buffer.put(1)print('Producer append an element')time.sleep(1)lock.release()full.release()if __name__ == '__main__':p = Producer()c = Consumer()p.daemon = c.daemon = Truep.start()c.start()p.join()c.join()print 'Ended!'如上代碼實現了注明的生產者和消費者問題,定義了兩個進程類,一個是消費者,一個是生產者。 定義了一個共享隊列,利用了 Queue 數據結構,然后定義了兩個信號量,一個代表緩沖區空余數,一個表示緩沖區占用數。 生產者 Producer 使用 empty.acquire () 方法來占用一個緩沖區位置,然后緩沖區空閑區大小減小 1,接下來進行加鎖,對緩沖區進行操作。然后釋放鎖,然后讓代表占用的緩沖區位置數量 + 1,消費者則相反。 運行結果如下:
Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element可以發現兩個進程在交替運行,生產者先放入緩沖區物品,然后消費者取出,不停地進行循環。 通過上面的例子來體會一下信號量的用法。
Queue
在上面的例子中我們使用了 Queue,可以作為進程通信的共享隊列使用。 在上面的程序中,如果你把 Queue 換成普通的 list,是完全起不到效果的。即使在一個進程中改變了這個 list,在另一個進程也不能獲取到它的狀態。 因此進程間的通信,隊列需要用 Queue。當然這里的隊列指的是 multiprocessing.Queue 依然是用上面那個例子,我們一個進程向隊列中放入數據,然后另一個進程取出數據。
from multiprocessing import Process, Semaphore, Lock, Queue import time from random import randombuffer = Queue(10) empty = Semaphore(2) full = Semaphore(0) lock = Lock()class Consumer(Process):def run(self):global buffer, empty, full, lockwhile True:full.acquire()lock.acquire()print 'Consumer get', buffer.get()time.sleep(1)lock.release()empty.release()class Producer(Process):def run(self):global buffer, empty, full, lockwhile True:empty.acquire()lock.acquire()num = random()print 'Producer put ', numbuffer.put(num)time.sleep(1)lock.release()full.release()if __name__ == '__main__':p = Producer()c = Consumer()p.daemon = c.daemon = Truep.start()c.start()p.join()c.join()print 'Ended!'運行結果:
Producer put 0.719213647437 Producer put 0.44287326683 Consumer get 0.719213647437 Consumer get 0.44287326683 Producer put 0.722859424381 Producer put 0.525321338921 Consumer get 0.722859424381 Consumer get 0.525321338921可以看到生產者放入隊列中數據,然后消費者將數據取出來。 get 方法有兩個參數,blocked 和 timeout,意思為阻塞和超時時間。默認 blocked 是 true,即阻塞式。 當一個隊列為空的時候如果再用 get 取則會阻塞,所以這時候就需要吧 blocked 設置為 false,即非阻塞式,實際上它就會調用 get_nowait () 方法,此時還需要設置一個超時時間,在這么長的時間內還沒有取到隊列元素,那就拋出 Queue.Empty 異常。 當一個隊列為滿的時候如果再用 put 放則會阻塞,所以這時候就需要吧 blocked 設置為 false,即非阻塞式,實際上它就會調用 put_nowait () 方法,此時還需要設置一個超時時間,在這么長的時間內還沒有放進去元素,那就拋出 Queue.Full 異常。 另外隊列中常用的方法 Queue.qsize () 返回隊列的大小 ,不過在 Mac OS 上沒法運行。 原因:
def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() return self._maxsize - self._sem._semlock._get_value()
Queue.empty () 如果隊列為空,返回 True, 反之 False Queue.full () 如果隊列滿了,返回 True, 反之 False Queue.get ([block [, timeout]]) 獲取隊列,timeout 等待時間 Queue.get_nowait () 相當 Queue.get (False) Queue.put (item) 阻塞式寫入隊列,timeout 等待時間 Queue.put_nowait (item) 相當 Queue.put (item, False)
Pipe
管道,顧名思義,一端發一端收。 Pipe 可以是單向 (half-duplex),也可以是雙向 (duplex)。我們通過 mutiprocessing.Pipe (duplex=False) 創建單向管道 (默認為雙向)。一個進程從 PIPE 一端輸入對象,然后被 PIPE 另一端的進程接收,單向管道只允許管道一端的進程輸入,而雙向管道則允許從兩端輸入。 用一個實例來感受一下:
from multiprocessing import Process, Pipeclass Consumer(Process):def __init__(self, pipe):Process.__init__(self)self.pipe = pipedef run(self):self.pipe.send('Consumer Words')print 'Consumer Received:', self.pipe.recv()class Producer(Process):def __init__(self, pipe):Process.__init__(self)self.pipe = pipedef run(self):print 'Producer Received:', self.pipe.recv()self.pipe.send('Producer Words')if __name__ == '__main__':pipe = Pipe()p = Producer(pipe[0])c = Consumer(pipe[1])p.daemon = c.daemon = Truep.start()c.start()p.join()c.join()print 'Ended!'在這里聲明了一個默認為雙向的管道,然后將管道的兩端分別傳給兩個進程。兩個進程互相收發。觀察一下結果:
Producer Received: Consumer Words Consumer Received: Producer Words Ended!以上是對 pipe 的簡單介紹。
Pool
在利用 Python 進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用 multiprocessing 中的 Process 動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。 Pool 可以提供指定數量的進程,供用戶調用,當有新的請求提交到 pool 中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。 在這里需要了解阻塞和非阻塞的概念。 阻塞和非阻塞關注的是程序在等待調用結果(消息,返回值)時的狀態。 阻塞即要等到回調結果出來,在有結果之前,當前進程會被掛起。 Pool 的用法有阻塞和非阻塞兩種方式。非阻塞即為添加進程后,不一定非要等到改進程執行完就添加其他進程運行,阻塞則相反。 現用一個實例感受一下非阻塞的用法:
from multiprocessing import Lock, Pool import timedef function(index):print 'Start process: ', indextime.sleep(3)print 'End process', indexif __name__ == '__main__':pool = Pool(processes=3)for i in xrange(4):pool.apply_async(function, (i,))print "Started processes"pool.close()pool.join()print "Subprocess done."在這里利用了 apply_async 方法,即非阻塞。 運行結果:
Started processes Start process: Start process: 0 1 Start process: 2 End processEnd process 0 1 Start process: 3 End process 2 End process 3 Subprocess done.可以發現在這里添加三個進程進去后,立馬就開始執行,不用非要等到某個進程結束后再添加新的進程進去。 下面再看看阻塞的用法:
from multiprocessing import Lock, Pool import timedef function(index):print 'Start process: ', indextime.sleep(3)print 'End process', indexif __name__ == '__main__':pool = Pool(processes=3)for i in xrange(4):pool.apply(function, (i,))print "Started processes"pool.close()pool.join()print "Subprocess done."在這里只需要把 apply_async 改成 apply 即可。 運行結果如下:
Start process: 0 End process 0 Start process: 1 End process 1 Start process: 2 End process 2 Start process: 3 End process 3 Started processes Subprocess done.這樣一來就好理解了吧? 下面對函數進行解釋: apply_async (func [, args [, kwds [, callback]]]) 它是非阻塞,apply (func [, args [, kwds]]) 是阻塞的。 close () 關閉 pool,使其不在接受新的任務。 terminate () 結束工作進程,不在處理未完成的任務。 join () 主進程阻塞,等待子進程的退出, join 方法要在 close 或 terminate 之后使用。 當然每個進程可以在各自的方法返回一個結果。apply 或 apply_async 方法可以拿到這個結果并進一步進行處理。
from multiprocessing import Lock, Pool import timedef function(index):print 'Start process: ', indextime.sleep(3)print 'End process', indexreturn indexif __name__ == '__main__':pool = Pool(processes=3)for i in xrange(4):result = pool.apply_async(function, (i,))print result.get()print "Started processes"pool.close()pool.join()print "Subprocess done."運行結果:
Start process: 0 End process 0 0 Start process: 1 End process 1 1 Start process: 2 End process 2 2 Start process: 3 End process 3 3 Started processes Subprocess done.另外還有一個非常好用的 map 方法。 如果你現在有一堆數據要處理,每一項都需要經過一個方法來處理,那么 map 非常適合。 比如現在你有一個數組,包含了所有的 URL,而現在已經有了一個方法用來抓取每個 URL 內容并解析,那么可以直接在 map 的第一個參數傳入方法名,第二個參數傳入 URL 數組。 現在我們用一個實例來感受一下:
from multiprocessing import Pool import requests from requests.exceptions import ConnectionErrordef scrape(url):try:print requests.get(url)except ConnectionError:print 'Error Occured ', urlfinally:print 'URL ', url, ' Scraped'if __name__ == '__main__':pool = Pool(processes=3)urls = ['https://www.baidu.com','http://www.meituan.com/','http://blog.csdn.net/','http://xxxyxxx.net']pool.map(scrape, urls)在這里初始化一個 Pool,指定進程數為 3,如果不指定,那么會自動根據 CPU 內核來分配進程數。 然后有一個鏈接列表,map 函數可以遍歷每個 URL,然后對其分別執行 scrape 方法。 運行結果:
<Response [403]> URL http://blog.csdn.net/ Scraped <Response [200]> URL https://www.baidu.com Scraped Error Occured http://xxxyxxx.net URL http://xxxyxxx.net Scraped <Response [200]> URL http://www.meituan.com/ Scraped可以看到遍歷就這么輕松地實現了。
參考
https://docs.python.org/2/library/multiprocessing.html http://www.cnblogs.com/vamei/archive/2012/10/12/2721484.html http://www.cnblogs.com/kaituorensheng/p/4445418.html https://my.oschina.net/yangyanxing/blog/296052
總結
以上是生活随笔為你收集整理的Python 爬虫进阶六之多进程的用法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 爬虫简单上手实战
- 下一篇: GitHub中watch、star、fo