c++ 共享内存_Python3.8多进程之共享内存
最近發了個宏愿想寫一個做企業金融研究的Python框架。拖出Python一看已經更新到了3.8,于是就發現了Python 3.8里新出現的模塊:multiprocessing.shared_memory。
隨手寫了個測試。生成一個240MB大小的pandas.DataFrame,然后轉換成numpy.recarray。這個DataFarme里包括了datetime,整型和字符串類型的列。使用numpy.recarray的目的是為了保存dtype,這樣才能在子進程中正確從共享內存里讀數據。 我在子進程中簡單地使用numpy.nansum來做計算。第一種方法是使用共享內存,第二種方法是直接將numpy.recarray作為參數傳遞給子進程。 下圖為測試代碼的輸出。
可以看出,使用共享內存的第一種方法只使用了可以忽略不計的內存,并且2秒結束戰斗。傳參數的方法使用了1.8GB的內存,并且慢得要命,花費200多秒。當然這跟我使用的測試機是一臺2017年的12寸MacBook 4-core i5 8G RAM(已停產)有可能,不過側面也說明在數據足夠大的時候,盡量避免沒必要的復制和傳遞還是很有效的。
測試代碼如下:
from multiprocessing.shared_memory import SharedMemory from multiprocessing.managers import SharedMemoryManager from concurrent.futures import ProcessPoolExecutor, as_completed from multiprocessing import current_process, cpu_count from datetime import datetime import numpy as np import pandas as pd import tracemalloc import timedef work_with_shared_memory(shm_name, shape, dtype):print(f'With SharedMemory: {current_process()=}')# Locate the shared memory by its nameshm = SharedMemory(shm_name)# Create the np.recarray from the buffer of the shared memorynp_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf)return np.nansum(np_array.val)def work_no_shared_memory(np_array: np.recarray):print(f'No SharedMemory: {current_process()=}')# Without shared memory, the np_array is copied into the child processreturn np.nansum(np_array.val)if __name__ == "__main__":# Make a large data frame with date, float and character columnsa = [(datetime.today(), 1, 'string'),(datetime.today(), np.nan, 'abc'),] * 5000000df = pd.DataFrame(a, columns=['date', 'val', 'character_col'])# Convert into numpy recarray to preserve the dtypesnp_array = df.to_records(index=False)del dfshape, dtype = np_array.shape, np_array.dtypeprint(f"np_array's size={np_array.nbytes/1e6}MB")# With shared memory# Start tracking memory usagetracemalloc.start()start_time = time.time()with SharedMemoryManager() as smm:# Create a shared memory of size np_arry.nbytesshm = smm.SharedMemory(np_array.nbytes)# Create a np.recarray using the buffer of shmshm_np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf)# Copy the data into the shared memorynp.copyto(shm_np_array, np_array)# Spawn some processes to do some workwith ProcessPoolExecutor(cpu_count()) as exe:fs = [exe.submit(work_with_shared_memory, shm.name, shape, dtype)for _ in range(cpu_count())]for _ in as_completed(fs):pass# Check memory usagecurrent, peak = tracemalloc.get_traced_memory()print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB")print(f'Time elapsed: {time.time()-start_time:.2f}s')tracemalloc.stop()# Without shared memorytracemalloc.start()start_time = time.time()with ProcessPoolExecutor(cpu_count()) as exe:fs = [exe.submit(work_no_shared_memory, np_array)for _ in range(cpu_count())]for _ in as_completed(fs):pass# Check memory usagecurrent, peak = tracemalloc.get_traced_memory()print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB")print(f'Time elapsed: {time.time()-start_time:.2f}s')tracemalloc.stop()值得一提的是,numpy.ndarray的dtype一定不能是object,不然子進程訪問共享內存的時候一定segfault,但如果在主進程里訪問共享內存就沒事。
補充更新一下,上面的測試代碼work_with_shared_memory 函數里不能解引用np_array,比如print(np_array),不然會segfault。使用np_array.val和np_array.date則沒有問題則是因為這兩個column的dtype不是object。而np_array.character_col的dtype在這個代碼里是object。
解決這個問題的辦法也很簡單,(踩坑無數次后),在to_records()里指定dtype。
np_array = df.to_records(index=False,column_dtypes={'character_col': 'S6'})這里我們指定character_col為長度為6的字符串。 如果是unicode的話,可以將S6換成U6。 超出指定長度的字符串則會被truncate。
這樣就不會有segfault了。重點就是不能有object的dtype。
總結
以上是生活随笔為你收集整理的c++ 共享内存_Python3.8多进程之共享内存的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: b311b853路由器怎样WiF复位路由
- 下一篇: 微信怎么改实名认证身份