想用Python爬小姐姐图片?那你得先搞定分布式进程
導(dǎo)讀:分布式進(jìn)程指的是將Process進(jìn)程分布到多臺(tái)機(jī)器上,充分利用多臺(tái)機(jī)器的性能完成復(fù)雜的任務(wù)。我們可以將這一點(diǎn)應(yīng)用到分布式爬蟲(chóng)的開(kāi)發(fā)中。
作者:范傳輝
如需轉(zhuǎn)載請(qǐng)聯(lián)系大數(shù)據(jù)(ID:hzdashuju)
分布式進(jìn)程在Python中依然要用到multiprocessing模塊。multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。可以寫(xiě)一個(gè)服務(wù)進(jìn)程作為調(diào)度者,將任務(wù)分布到其他多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信進(jìn)行管理。
舉個(gè)例子:在做爬蟲(chóng)程序時(shí),常常會(huì)遇到這樣的場(chǎng)景,我們想抓取某個(gè)網(wǎng)站的所有圖片,如果使用多進(jìn)程的話(huà),一般是一個(gè)進(jìn)程負(fù)責(zé)抓取圖片的鏈接地址,將鏈接地址存放到Queue中,另外的進(jìn)程負(fù)責(zé)從Queue中讀取鏈接地址進(jìn)行下載和存儲(chǔ)到本地。
現(xiàn)在把這個(gè)過(guò)程做成分布式,一臺(tái)機(jī)器上的進(jìn)程負(fù)責(zé)抓取鏈接,其他機(jī)器上的進(jìn)程負(fù)責(zé)下載存儲(chǔ)。那么遇到的主要問(wèn)題是將Queue暴露到網(wǎng)絡(luò)中,讓其他機(jī)器進(jìn)程都可以訪(fǎng)問(wèn),分布式進(jìn)程就是將這一個(gè)過(guò)程進(jìn)行了封裝,我們可以將這個(gè)過(guò)程稱(chēng)為本地隊(duì)列的網(wǎng)絡(luò)化。整體過(guò)程如圖1-24所示。
▲圖1-24 分布式進(jìn)程
要實(shí)現(xiàn)上面例子的功能,創(chuàng)建分布式進(jìn)程需要分為六個(gè)步驟:
建立隊(duì)列Queue,用來(lái)進(jìn)行進(jìn)程間的通信。服務(wù)進(jìn)程創(chuàng)建任務(wù)隊(duì)列task_queue,用來(lái)作為傳遞任務(wù)給任務(wù)進(jìn)程的通道;服務(wù)進(jìn)程創(chuàng)建結(jié)果隊(duì)列result_queue,作為任務(wù)進(jìn)程完成任務(wù)后回復(fù)服務(wù)進(jìn)程的通道。在分布式多進(jìn)程環(huán)境下,必須通過(guò)由Queuemanager獲得的Queue接口來(lái)添加任務(wù)。
把第一步中建立的隊(duì)列在網(wǎng)絡(luò)上注冊(cè),暴露給其他進(jìn)程(主機(jī)),注冊(cè)后獲得網(wǎng)絡(luò)隊(duì)列,相當(dāng)于本地隊(duì)列的映像。
建立一個(gè)對(duì)象(Queuemanager(BaseManager))實(shí)例manager,綁定端口和驗(yàn)證口令。
啟動(dòng)第三步中建立的實(shí)例,即啟動(dòng)管理manager,監(jiān)管信息通道。
通過(guò)管理實(shí)例的方法獲得通過(guò)網(wǎng)絡(luò)訪(fǎng)問(wèn)的Queue對(duì)象,即再把網(wǎng)絡(luò)隊(duì)列實(shí)體化成可以使用的本地隊(duì)列。
創(chuàng)建任務(wù)到“本地”隊(duì)列中,自動(dòng)上傳任務(wù)到網(wǎng)絡(luò)隊(duì)列中,分配給任務(wù)進(jìn)程進(jìn)行處理。
接下來(lái)通過(guò)程序?qū)崿F(xiàn)上面的例子(Linux版),首先編寫(xiě)的是服務(wù)進(jìn)程(taskManager.py),代碼如下:
from?multiprocessing.managers?import?BaseManager
#?第一步:建立task_queue和result_queue,用來(lái)存放任務(wù)和結(jié)果
task_queue=Queue.Queue()
result_queue=Queue.Queue()
class?Queuemanager(BaseManager):
????pass
#?第二步:把創(chuàng)建的兩個(gè)隊(duì)列注冊(cè)在網(wǎng)絡(luò)上,利用register方法,callable參數(shù)關(guān)聯(lián)了Queue對(duì)象,
#?將Queue對(duì)象在網(wǎng)絡(luò)中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)
#?第三步:綁定端口8001,設(shè)置驗(yàn)證口令‘qiye’。這個(gè)相當(dāng)于對(duì)象的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')
#?第四步:啟動(dòng)管理,監(jiān)聽(tīng)信息通道
manager.start()
#?第五步:通過(guò)管理實(shí)例的方法獲得通過(guò)網(wǎng)絡(luò)訪(fǎng)問(wèn)的Queue對(duì)象
task=manager.get_task_queue()
result=manager.get_result_queue()
#?第六步:添加任務(wù)
for?url?in?["ImageUrl_"+i?for?i?in?range(10)]:
????print?'put?task?%s?...'?%url
????task.put(url)
#?獲取返回結(jié)果
print?'try?get?result...'
for?i?in?range(10):
????print?'result?is?%s'?%result.get(timeout=10)
#?關(guān)閉管理
manager.shutdown()
任務(wù)進(jìn)程已經(jīng)編寫(xiě)完成,接下來(lái)編寫(xiě)任務(wù)進(jìn)程(taskWorker.py),創(chuàng)建任務(wù)進(jìn)程的步驟相對(duì)較少,需要四個(gè)步驟:
使用QueueManager注冊(cè)用于獲取Queue的方法名稱(chēng),任務(wù)進(jìn)程只能通過(guò)名稱(chēng)來(lái)在網(wǎng)絡(luò)上獲取Queue。
連接服務(wù)器,端口和驗(yàn)證口令注意保持與服務(wù)進(jìn)程中完全一致。
從網(wǎng)絡(luò)上獲取Queue,進(jìn)行本地化。
從task隊(duì)列獲取任務(wù),并把結(jié)果寫(xiě)入result隊(duì)列。
程序taskWorker.py代碼(win/linux版)如下:
import?time
from?multiprocessing.managers?import?BaseManager
#?創(chuàng)建類(lèi)似的QueueManager:
class?QueueManager(BaseManager):
????pass
#?第一步:使用QueueManager注冊(cè)用于獲取Queue的方法名稱(chēng)
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#?第二步:連接到服務(wù)器:
server_addr?=?'127.0.0.1'
print('Connect?to?server?%s...'?%?server_addr)
#?端口和驗(yàn)證口令注意保持與服務(wù)進(jìn)程完全一致:
m?=?QueueManager(address=(server_addr,?8001),?authkey='qiye')
#?從網(wǎng)絡(luò)連接:
m.connect()
#?第三步:獲取Queue的對(duì)象:
task?=?m.get_task_queue()
result?=?m.get_result_queue()
#?第四步:從task隊(duì)列獲取任務(wù),并把結(jié)果寫(xiě)入result隊(duì)列:
while(not?task.empty()):
????image_url?=?task.get(True,timeout=5)
????print('run?task?download?%s...'?%?image_url)
????time.sleep(1)
????result.put('%s--->success'%image_url)
#?處理結(jié)束:
print('worker?exit.')
最后開(kāi)始運(yùn)行程序,先啟動(dòng)服務(wù)進(jìn)程taskManager.py,運(yùn)行結(jié)果如下:
put?task?ImageUrl_2?...
put?task?ImageUrl_3?...
put?task?ImageUrl_4?...
put?task?ImageUrl_5?...
put?task?ImageUrl_6?...
put?task?ImageUrl_7?...
put?task?ImageUrl_8?...
put?task?ImageUrl_9?...
try?get?result...
接著再啟動(dòng)任務(wù)進(jìn)程taskWorker.py,運(yùn)行結(jié)果如下:
run?task?download?ImageUrl_1...
run?task?download?ImageUrl_2...
run?task?download?ImageUrl_3...
run?task?download?ImageUrl_4...
run?task?download?ImageUrl_5...
run?task?download?ImageUrl_6...
run?task?download?ImageUrl_7...
run?task?download?ImageUrl_8...
run?task?download?ImageUrl_9...
worker?exit.
當(dāng)任務(wù)進(jìn)程運(yùn)行結(jié)束后,服務(wù)進(jìn)程運(yùn)行結(jié)果如下:
result?is?ImageUrl_2--->success
result?is?ImageUrl_3--->success
result?is?ImageUrl_4--->success
result?is?ImageUrl_5--->success
result?is?ImageUrl_6--->success
result?is?ImageUrl_7--->success
result?is?ImageUrl_8--->success
result?is?ImageUrl_9--->success
其實(shí)這就是一個(gè)簡(jiǎn)單但真正的分布式計(jì)算,把代碼稍加改造,啟動(dòng)多個(gè)worker,就可以把任務(wù)分布到幾臺(tái)甚至幾十臺(tái)機(jī)器上,實(shí)現(xiàn)大規(guī)模的分布式爬蟲(chóng)。
注:由于平臺(tái)的特性,創(chuàng)建服務(wù)進(jìn)程的代碼在Linux和Windows上有一些不同,創(chuàng)建工作進(jìn)程的代碼是一致的。
taskManager.py程序在Windows版下的代碼如下:
#?taskManager.py?for?windows
import?Queue
from?multiprocessing.managers?import?BaseManager
from?multiprocessing?import?freeze_support
#?任務(wù)個(gè)數(shù)
task_number?=?10
#?定義收發(fā)隊(duì)列
task_queue?=?Queue.Queue(task_number);
result_queue?=?Queue.Queue(task_number);
def?get_task():
????return?task_queue
def?get_result():
????return?result_queue
#?創(chuàng)建類(lèi)似的QueueManager:
class?QueueManager(BaseManager):
????pass
def?win_run():
????#?Windows下綁定調(diào)用接口不能使用lambda,所以只能先定義函數(shù)再綁定
????QueueManager.register('get_task_queue',callable?=?get_task)
????QueueManager.register('get_result_queue',callable?=?get_result)
????#?綁定端口并設(shè)置驗(yàn)證口令,Windows下需要填寫(xiě)IP地址,Linux下不填默認(rèn)為本地
????manager?=?QueueManager(address?=?('127.0.0.1',8001),authkey?=?'qiye')
????#?啟動(dòng)
????manager.start()
????try:
????????#?通過(guò)網(wǎng)絡(luò)獲取任務(wù)隊(duì)列和結(jié)果隊(duì)列
????????task?=?manager.get_task_queue()
????????result?=?manager.get_result_queue()
????????#?添加任務(wù)
????????for?url?in?["ImageUrl_"+str(i)?for?i?in?range(10)]:
????????????print?'put?task?%s?...'?%url
????????????task.put(url)
????????print?'try?get?result...'
????????for?i?in?range(10):
????????????print?'result?is?%s'?%result.get(timeout=10)
????except:
????????print('Manager?error')
????finally:
????????#?一定要關(guān)閉,否則會(huì)報(bào)管道未關(guān)閉的錯(cuò)誤
????????manager.shutdown()
if?__name__?==?'__main__':
????#?Windows下多進(jìn)程可能會(huì)有問(wèn)題,添加這句可以緩解
????freeze_support()
????win_run()
關(guān)于作者:范傳輝,資深網(wǎng)蟲(chóng),Python開(kāi)發(fā)者,參與開(kāi)發(fā)了多項(xiàng)網(wǎng)絡(luò)應(yīng)用,在實(shí)際開(kāi)發(fā)中積累了豐富的實(shí)戰(zhàn)經(jīng)驗(yàn),并善于總結(jié),貢獻(xiàn)了多篇技術(shù)文章廣受好評(píng)。研究興趣是網(wǎng)絡(luò)安全、爬蟲(chóng)技術(shù)、數(shù)據(jù)分析、驅(qū)動(dòng)開(kāi)發(fā)等技術(shù)。
本文摘編自《Python爬蟲(chóng)開(kāi)發(fā)與項(xiàng)目實(shí)戰(zhàn)》,經(jīng)出版方授權(quán)發(fā)布。
延伸閱讀《Python爬蟲(chóng)開(kāi)發(fā)與項(xiàng)目實(shí)戰(zhàn)》
點(diǎn)擊上圖了解及購(gòu)買(mǎi)
轉(zhuǎn)載請(qǐng)聯(lián)系微信:DoctorData
推薦語(yǔ):零基礎(chǔ)學(xué)習(xí)爬蟲(chóng)技術(shù),從Python和Web前端基礎(chǔ)開(kāi)始講起,由淺入深,包含大量案例,實(shí)用性強(qiáng)。
據(jù)統(tǒng)計(jì),99%的大咖都完成了這個(gè)神操作
▼
更多精彩
在公眾號(hào)后臺(tái)對(duì)話(huà)框輸入以下關(guān)鍵詞
查看更多優(yōu)質(zhì)內(nèi)容!
PPT?|?報(bào)告?|?讀書(shū)?|?書(shū)單?|?干貨?
大數(shù)據(jù)?|?揭秘?|?Python?|?可視化
AI?|?人工智能?|?5G?|?區(qū)塊鏈
機(jī)器學(xué)習(xí)?|?深度學(xué)習(xí)?|?神經(jīng)網(wǎng)絡(luò)
1024?|?段子?|?數(shù)學(xué)?|?高考
猜你想看
深度學(xué)習(xí)高能干貨:手把手教你搭建MXNet框架
手把手教你用OpenCV實(shí)現(xiàn)機(jī)器學(xué)習(xí)最簡(jiǎn)單的k-NN算法(附代碼)
41款實(shí)用工具,數(shù)據(jù)獲取、清洗、建模、可視化都有了
你是怎樣“被平均”的?細(xì)數(shù)統(tǒng)計(jì)數(shù)據(jù)中的那些坑
Q:?你還知道哪些爬蟲(chóng)神技?
歡迎留言與大家分享
覺(jué)得不錯(cuò),請(qǐng)把這篇文章分享給你的朋友
轉(zhuǎn)載 / 投稿請(qǐng)聯(lián)系:baiyu@hzbook.com
更多精彩,請(qǐng)?jiān)诤笈_(tái)點(diǎn)擊“歷史文章”查看
點(diǎn)擊閱讀原文,了解更多
總結(jié)
以上是生活随笔為你收集整理的想用Python爬小姐姐图片?那你得先搞定分布式进程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 50年后的地球什么样?大数据、AI、量子
- 下一篇: 2个基础操作案例带你入门MySQL