pynng 超快速上手,但不保熟
生活随笔
收集整理的這篇文章主要介紹了
pynng 超快速上手,但不保熟
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
nng 是 zeromq 的后輩,nanomsg 的 下一代的消息處理庫。
主要就是快,上手超快,消息收發也超快。
本機 python 3.9 + pynng 0.7.1 本地環回測試 10000次1G數據,只需要1秒就可以完成。
nng相比zeromq的目前發現優勢,鏈接狀態全自動維護,自動重連,掉線重連能快速恢復,數據發送不完整時能自動無限重發(默認情況),報頭能全定義從而能支持http協議,速度飛快。
以下4個簡單完整例子
_try_push_pull.py
_try_pair_pair.py
_try_pub_sub.py
_try_rep_req.py
pynng安裝
pip install -U pynng_try_push_pull.py
from pynng import nng from multiprocessing import Process import numpy as np import pickledef proc_send_data():'''push端只管發數據:return:'''sock = nng.Push0(dial='tcp://127.0.0.1:8855')for i in range(10000):data = np.random.uniform([128, 4, 512, 512])data = pickle.dumps(data)sock.send(data)sock.close()def proc_recv_data():'''pull端只管收數據:return:'''sock = nng.Pull0(listen='tcp://127.0.0.1:8855')for i in range(10000):r = sock.recv(block=True)data = pickle.loads(r)print(i, np.mean(data))sock.close()if __name__ == '__main__':p1 = Process(target=proc_recv_data, daemon=True)p2 = Process(target=proc_send_data, daemon=True)print('start')p1.start()p2.start()p1.join()p2.join()print('success')_try_pair_pair.py
from pynng import nng from pynng.exceptions import TryAgain, Timeout from multiprocessing import Process import numpy as np import pickle import randomdef proc_send_data():'''pair端想發就發,想收就收如果發消息前,對端已經關掉了,那么將會卡在send函數那里,直到對端重新打開,或send_timeout超時:return:'''sock = nng.Pair0(dial='tcp://127.0.0.1:8855', recv_timeout=500, send_timeout=500)for i in range(10000):# 發送1G的數據。128*4*512*512*8/1024/1024=1024MBdata = np.random.uniform([128, 4, 512, 512])data = pickle.dumps(data)try:sock.send(data)except Timeout:# 如果對面端提前關了,那么這里會超時,這里直接跳出breakif random.random() < 0.5:try:r = sock.recv(block=False)print(r)except TryAgain:passprint('end1')sock.close()def proc_recv_data():'''pair端想發就發,想收就收:return:'''sock = nng.Pair0(listen='tcp://127.0.0.1:8855', recv_timeout=500, send_timeout=500)for i in range(10000):try:r = sock.recv(block=False)data = pickle.loads(r)print(i, np.mean(data))except TryAgain:# 如果沒有消息passif random.random() < 0.5:try:sock.send(bytes([1]))except Timeout:# 如果對面端提前關了,那么這里會超時,這里直接跳出breakprint('end2')sock.close()if __name__ == '__main__':p1 = Process(target=proc_recv_data, daemon=True)p2 = Process(target=proc_send_data, daemon=True)p1.start()p2.start()p1.join()p2.join()print('success')_try_pub_sub.py
from pynng import nng from multiprocessing import Process import numpy as np import pickle from pynng.exceptions import TryAgaindef proc_send_data():'''pub端 和push端相似,只管發數據:return:'''sock = nng.Pub0(listen='tcp://127.0.0.1:8855')for i in range(10000):data = np.random.uniform([128, 4, 512, 512])data = pickle.dumps(data)sock.send(data)sock.close()def proc_recv_data():'''sub端 和pull端相似,只管收數據。可以設定訂閱標題和取消訂閱標題。首次使用時,要設定為接收空標題,不然啥也收不到:return:'''sock = nng.Sub0(dial='tcp://127.0.0.1:8855')# 設定接受空標題sock.subscribe('')for i in range(10000):try:r = sock.recv(block=False)data = pickle.loads(r)print(i, np.mean(data))except TryAgain:print()sock.close()if __name__ == '__main__':p1 = Process(target=proc_recv_data, daemon=True)p2 = Process(target=proc_send_data, daemon=True)p1.start()p2.start()p1.join()p2.join()print('success')_try_rep_req.py
from pynng import nng from multiprocessing import Process import numpy as np import pickledef proc_send_data():'''req端一開始只能發送數據,并且發送完數據后,必定要接收數據:return:'''sock = nng.Req0(dial='tcp://127.0.0.1:8855')for i in range(10000):data = np.random.uniform([128, 4, 512, 512])data = pickle.dumps(data)sock.send(data)sock.recv()sock.close()def proc_recv_data():'''rep端一開始只能接收數據,并且接收完數據后,必定要發送一個長度不為0的數據:return:'''sock = nng.Rep0(listen='tcp://127.0.0.1:8855')for i in range(10000):r = sock.recv(block=True)data = pickle.loads(r)print(i, np.mean(data))# rep發送的數據長度不能為0sock.send(bytes([0]))sock.close()if __name__ == '__main__':p1 = Process(target=proc_recv_data, daemon=True)p2 = Process(target=proc_send_data, daemon=True)p1.start()p2.start()p1.join()p2.join()print('success')總結
以上是生活随笔為你收集整理的pynng 超快速上手,但不保熟的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解决“System clock has
- 下一篇: 液压系统管路流速推荐表_管径与流速推荐表