python多线程_python多线程:控制线程数量
背景
前段時間學習了python的多線程爬蟲,當時爬取一個圖片網站,開啟多線程后,并沒有限制線程的數量,也就是說,如果下載1000張圖片,會一次性開啟1000個子線程同時進行下載? ? ?
現在希望控制線程數量:例如每次只下載5張,當下載完成后再下載另外5張,直至全部完成??
查了一些資料,發現在python中,threading 模塊有提供 Semaphore類 和 BoundedSemaphore?類來限制線程數
詳細說明可以看看下面幾篇文章,寫的很棒:
https://docs.python.org/3.5/library/threading.html?highlight=threading#semaphore-objects
https://www.liujiangblog.com/course/python/79
?https://my.oschina.net/u/3524921/blog/920303
?https://zhuanlan.zhihu.com/p/34159519 ???
官網給出例子如下:信號量通常用于保護容量有限的資源,例如數據庫服務器。在資源大小固定的任何情況下,都應使用有界信號量。在產生任何工作線程之前,您的主線程將初始化信號量:
maxconnections = 5# ...pool_sema = BoundedSemaphore(value=maxconnections)產生后,工作線程在需要連接到服務器時會調用信號量的獲取和釋放方法:with pool_sema: conn = connectdb() try: # ... use connection ... finally: conn.close()改造之前的多線程爬蟲
首先貼出原來的代碼# -*- coding:utf-8 -*-import requestsfrom requests.exceptions import RequestExceptionimport os, timeimport refrom lxml import etreeimport threadinglock = threading.Lock()def get_html(url): """ 定義一個方法,用于獲取一個url頁面的響應內容 :param url: 要訪問的url :return: 響應內容 """ response = requests.get(url, timeout=10) # print(response.status_code) try: if response.status_code == 200: # print(response.text) return response.text else: return None except RequestException: print("請求失敗") # return Nonedef parse_html(html_text): """ 定義一個方法,用于解析頁面內容,提取圖片url :param html_text: :return:一個頁面的圖片url集合 """ html = etree.HTML(html_text) if len(html) > 0: img_src = html.xpath("//img[@class='photothumb lazy']/@data-original") # 元素提取方法 # print(img_src) return img_src else: print("解析頁面元素失敗")def get_image_pages(url): """ 獲取所查詢圖片結果的所有頁碼 :param url: 查詢圖片url :return: 總頁碼數 """ html_text = get_html(url) # 獲取搜索url響應內容 # print(html_text) if html_text is not None: html = etree.HTML(html_text) # 生成XPath解析對象 last_page = html.xpath("//div[@class='pages']//a[last()]/@href") # 提取最后一頁所在href鏈接 print(last_page) if last_page: max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group() # 使用正則表達式提取鏈接中的頁碼數字 print(max_page) print(type(max_page)) return int(max_page) # 將字符串頁碼轉為整數并返回 else: print("暫無數據") return None else: print("查詢結果失敗")def get_all_image_url(page_number): """ 獲取所有圖片的下載url :param page_number: 爬取頁碼 :return: 所有圖片url的集合 """ base_url = 'https://imgbin.com/free-png/naruto/' image_urls = [] x = 1 # 定義一個標識,用于給每個圖片url編號,從1遞增 for i in range(1, page_number): url = base_url + str(i) # 根據頁碼遍歷請求url try: html = get_html(url) # 解析每個頁面的內容 if html: data = parse_html(html) # 提取頁面中的圖片url # print(data) # time.sleep(3) if data: for j in data: image_urls.append({ 'name': x, 'value': j }) x += 1 # 每提取一個圖片url,標識x增加1 except RequestException as f: print("遇到錯誤:", f) continue # print(image_urls) return image_urlsdef get_image_content(url): """請求圖片url,返回二進制內容""" # print("正在下載", url) try: r = requests.get(url, timeout=15) if r.status_code == 200: return r.content return None except RequestException: return Nonedef main(url, image_name): """ 主函數:實現下載圖片功能 :param url: 圖片url :param image_name: 圖片名稱 :return: """ semaphore.acquire() # 加鎖,限制線程數 print('當前子線程: {}'.format(threading.current_thread().name)) save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' try: file_path = '{0}/{1}.jpg'.format(save_path, image_name) if not os.path.exists(file_path): # 判斷是否存在文件,不存在則爬取 with open(file_path, 'wb') as f: f.write(get_image_content(url)) f.close() print('第{}個文件保存成功'.format(image_name)) else: print("第{}個文件已存在".format(image_name)) semaphore.release() # 解鎖imgbin-多線程-重寫run方法.py except FileNotFoundError as f: print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) print("報錯:", f) raise except TypeError as e: print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) print("報錯:", e)class MyThread(threading.Thread): """繼承Thread類重寫run方法創建新進程""" def __init__(self, func, args): """ :param func: run方法中要調用的函數名 :param args: func函數所需的參數 """ threading.Thread.__init__(self) self.func = func self.args = args def run(self): print('當前子線程: {}'.format(threading.current_thread().name)) self.func(self.args[0], self.args[1]) # 調用func函數 # 因為這里的func函數其實是上述的main()函數,它需要2個參數;args傳入的是個參數元組,拆解開來傳入if __name__ == '__main__': start = time.time() print('這是主線程:{}'.format(threading.current_thread().name)) urls = get_all_image_url(5) # 獲取所有圖片url列表 thread_list = [] # 定義一個列表,向里面追加線程 semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法 for t in urls: # print(i) m = MyThread(main, (t["value"], t["name"])) # 調用MyThread類,得到一個實例 thread_list.append(m) for m in thread_list: m.start() # 調用start()方法,開始執行 for m in thread_list: m.join() # 子線程調用join()方法,使主線程等待子線程運行完畢之后才退出 end = time.time() print(end-start) # get_image_pages("https://imgbin.com/free-png/Naruto")將代碼進行改造??1、下面的第8、9行表示調用 threading 的 BoundedSemaphore類,初始化信號量為5,把結果賦給變量 pool_semaif __name__ == '__main__': start = time.time() print('這是主線程:{}'.format(threading.current_thread().name)) urls = get_all_image_url(5) # 獲取所有圖片url列表 thread_list = [] # 定義一個列表,向里面追加線程 max_connections = 5 # 定義最大線程數 pool_sema = threading.BoundedSemaphore(max_connections) # 或使用Semaphore方法 for t in urls: # print(i) m = MyThread(main, (t["value"], t["name"])) # 調用MyThread類,得到一個實例 thread_list.append(m) for m in thread_list: m.start() # 調用start()方法,開始執行 for m in thread_list: m.join() # 子線程調用join()方法,使主線程等待子線程運行完畢之后才退出 end = time.time() print(end-start)2、修改main()函數??
(1)方法一:通過with語句實現,第9行添加 with pool_sema:?
使用 with 語句來獲得一個鎖、條件變量或信號量,相當于調用 acquire();離開 with 塊后,會自動調用 release()
def main(url, image_name): """ 主函數:實現下載圖片功能 :param url: 圖片url :param image_name: 圖片名稱 :return: """ with pool_sema: print('當前子線程: {}'.format(threading.current_thread().name)) save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' try: file_path = '{0}/{1}.jpg'.format(save_path, image_name) if not os.path.exists(file_path): # 判斷是否存在文件,不存在則爬取 with open(file_path, 'wb') as f: f.write(get_image_content(url)) f.close() print('第{}個文件保存成功'.format(image_name)) else: print("第{}個文件已存在".format(image_name)) except FileNotFoundError as f: print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) print("報錯:", f) raise except TypeError as e: print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) print("報錯:", e)(2)方法二:直接使用acquire()?和 release()下面的第8行調用?acquire(),第24行調用 release()def main(url, image_name): """ 主函數:實現下載圖片功能 :param url: 圖片url :param image_name: 圖片名稱 :return: """ pool_sema.acquire() # 加鎖,限制線程數 # with pool_sema: print('當前子線程: {}'.format(threading.current_thread().name)) save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' try: file_path = '{0}/{1}.jpg'.format(save_path, image_name) if not os.path.exists(file_path): # 判斷是否存在文件,不存在則爬取 with open(file_path, 'wb') as f: f.write(get_image_content(url)) f.close() print('第{}個文件保存成功'.format(image_name)) else: print("第{}個文件已存在".format(image_name)) pool_sema.release() # 解鎖imgbin-多線程-重寫run方法.py except FileNotFoundError as f: print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) print("報錯:", f) raise except TypeError as e: print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) print("報錯:", e)最終效果是一樣的,每次啟用5個線程,完成后再啟動下一批
喜歡記得來一個
總結
以上是生活随笔為你收集整理的python多线程_python多线程:控制线程数量的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: xshell最多支持4个_中集拉钢卷专用
- 下一篇: 浏览器崩溃_字节跳动程序员28岁身价上亿