Python多进程multiprocessing共享数据Value(包括常用的整数、字符串)、列表及字典以及Queue
1. 共享Value、列表以及字典
import multiprocessing import ctypes import timedef process_write(int_data, str_data, list_data, dict_data):i = 1while True:int_data.value = istr_data.value = 'str' + str(i)list_data[0] = idict_data['dict0'] = iprint('write, index: ', i)i += 1def process_read(int_data, str_data, list_data, dict_data):while True:print('read data: ', int_data.value, str_data.value, list_data[0], dict_data['dict0'])if __name__=='__main__':int_data = multiprocessing.Manager().Value(ctypes.c_int, 0)str_data = multiprocessing.Manager().Value(ctypes.c_char_p, 'str0')list_data = multiprocessing.Manager().list()list_data.append(0)dict_data = multiprocessing.Manager().dict()dict_data['dict0'] = 0p1 = multiprocessing.Process(target=process_write, args=(int_data, str_data, list_data, dict_data,))p2 = multiprocessing.Process(target=process_read, args=(int_data, str_data, list_data, dict_data,))p1.start()p2.start()start = time.time()while time.time() - start < 1:passp1.terminate()p2.terminate()執行結果部分截圖如下圖。
?
2. Queue
數據先存先取,所以最后取到的數據可能是先前的數據,數據不是實時的,代碼和執行結果如下圖。
import multiprocessing import ctypes import timedef process_write(queue):i = 1while True:queue.put(i)print('put data: ', i)i += 1def process_read(queue):while True:time.sleep(0.2)print('read data: ', queue.get())#print('read data: ', queue.get_nowait())print('get data wait end\n')if __name__=='__main__':queue = multiprocessing.Manager().Queue()p1 = multiprocessing.Process(target=process_write, args=(queue,))p2 = multiprocessing.Process(target=process_read, args=(queue,))p1.start()p2.start()start = time.time()while time.time() - start < 5:passp1.terminate()p2.terminate()每次從queue中get數據時,都得等待queue有數據,也就是先執行put,再執行get。如果queue為空,直接使用get_nowait()方法取數據,那么拋異常,如下圖。
?
3. 總結
3.1 對于先進先出的需求,取完前面的數據后,才能取后面的數據,這種場景使用Queue類,而且適用于更新數據進程比獲取數據進程快,否則獲取數據進程需要等待更新數據進程。
3.2 對于一般進程間共享數據來說,使用multiprocessing.Manager().Value和multiprocessing.Manager().list()和multiprocessing.Manager().dict()即可。
3.3 Value傳遞其它類型的參數對應表。
?
?
?
附錄 使用multiprocessing.Value而不是multiprocessing.Manager().Value引起的問題
import multiprocessing import ctypes import timedef process_write(val):i = 1while True:val.value = ("str" + str(i)).encode('utf-8')print('write index: ', i)i += 1def process_read(val):while True:print('read data: ', val.value)if __name__=='__main__':val = multiprocessing.Value(ctypes.c_char_p, "str0".encode('utf-8'))p1 = multiprocessing.Process(target=process_write, args=(val,))p2 = multiprocessing.Process(target=process_read, args=(val,))p1.start()p2.start()start = time.time()while time.time() - start < 1:passp1.terminate()p2.terminate()附錄1?錯誤:bytes or integer address expected instead of str instance
原因及解決方案:使用Value傳遞參數,和C函數接口有關,涉及類型轉換。將字符串編碼為utf-8,在字符串后面加上 .encode('utf-8'),如下圖。
?
?
附錄2 寫進程正常工作,但是程序沒有拋異常就強制停止
通過定位,發現是使用val.value獲取數據有誤,思考過后,可能是覺得對象讀寫發生沖突,于是想著使用copy包的copy方法進行克隆,然后再使用,結果報錯如下。
經過搜索,才發現直接使用multiprocessing.Value是線程不安全的,需要使用multiprocessing.Manager().Value才行,所以更改即可。
?
?
總結
以上是生活随笔為你收集整理的Python多进程multiprocessing共享数据Value(包括常用的整数、字符串)、列表及字典以及Queue的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python多线程threading和多
- 下一篇: CentOS搭建Redis-cluste