python (高级消息队列)普通、进程、进程池的消息队列
一、普通消息隊列
from queue import Queue?
 這個是普通的隊列模式,類似于普通列表,先進先出模式,get方法會阻塞請求,直到有數(shù)據(jù)get出來為止。
二、多進程消息隊列
from multiprocessing.Queue import Queue(各子進程共有)
 這個是多進程并發(fā)的Queue隊列,用于解決多進程間的通信問題。普通Queue實現(xiàn)不了。例如來跑多進程對一批IP列表進行運算,運算后的結(jié)果都存到Queue隊列里面,這個就必須使用multiprocessing提供的Queue來實現(xiàn)。
創(chuàng)建
import multiprocessing queue = multiprocessing.Queue(隊列長度)?
因為進程間不共享全局變量,所以使用Queue進行數(shù)據(jù)通信,可以在父進程中創(chuàng)建兩個字進程,一個往Queue里寫數(shù)據(jù),一個從Queue里取出數(shù)據(jù)。?
import multiprocessing import timedef write_queue(queue):# 循環(huán)寫入數(shù)據(jù)for i in range(10):if queue.full():print("隊列已滿!")break# 向隊列中放入消息queue.put(i)print(i)time.sleep(0.5)def read_queue(queue):# 循環(huán)讀取隊列消息while True:# 隊列為空,停止讀取if queue.empty():print("---隊列已空---")break# 讀取消息并輸出result = queue.get()print(result)if __name__ == '__main__':# 創(chuàng)建消息隊列queue = multiprocessing.Queue(3)# 創(chuàng)建子進程p1 = multiprocessing.Process(target=write_queue, args=(queue,))p1.start()# 等待p1寫數(shù)據(jù)進程執(zhí)行結(jié)束后,再往下執(zhí)行p1.join()p1 = multiprocessing.Process(target=read_queue, args=(queue,))p1.start()三、進程池消息隊列
? ? ? ? 初始化Pool時,可以指定一個最大進程數(shù),當(dāng)有新的請求提交到Pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經(jīng)達到指定的最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會用之前的進程來執(zhí)行新的任務(wù)。?
創(chuàng)建
import multiprocessing pool = multiprocessing.Pool(最大進程數(shù))?
進程池內(nèi)通信?
 創(chuàng)建進程池內(nèi)Queue消息隊列通信?
 import multiprocessing?
 Queue:queue = multiprocessing.Manager().Queue()?
 例:?
 import multiprocessing?
 import time
事例:
def write_data(queue):?# for循環(huán) 向消息隊列中寫入值?for i in range(5):?# 添加消息?queue.put(i)?print(i)?time.sleep(0.2)?print(“隊列已滿~”)def read_data(queue):# 循環(huán)讀取數(shù)據(jù)while True:# 判斷隊列是否為空if queue.qsize() == 0:print("隊列為空~")break# 從隊列中讀取數(shù)據(jù)result = queue.get()print(result)if __name__ == '__main__':# 創(chuàng)建進程池pool = multiprocessing.Pool(2)# 創(chuàng)建進程池隊列queue = multiprocessing.Manager().Queue()# 在進程池中的進程間進行通信# 使用線程池同步的方式,先寫后讀# pool.apply(write_data, (queue, ))# pool.apply(read_data, (queue, ))# apply_async() 返回ApplyResult 對象result = pool.apply_async(write_data, (queue, ))# ApplyResult對象的wait() 方法,表示后續(xù)進程必須等待當(dāng)前進程執(zhí)行完再繼續(xù)result.wait()pool.apply_async(read_data, (queue, ))pool.close()# 異步后,主線程不再等待子進程執(zhí)行結(jié)束,再結(jié)束# join() 后,表示主線程會等待子進程執(zhí)行結(jié)束后,再結(jié)束pool.join()?
?
新人創(chuàng)作打卡挑戰(zhàn)賽發(fā)博客就能抽獎!定制產(chǎn)品紅包拿不停!總結(jié)
以上是生活随笔為你收集整理的python (高级消息队列)普通、进程、进程池的消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: C/C++:uint64_t 转为cha
- 下一篇: Linux系统编程——僵尸的模拟以及僵尸
