生活随笔
收集整理的這篇文章主要介紹了
ZeroMQ之Publish/Subscribe (Java)
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
前面的文章介紹了比較簡(jiǎn)單的Request/Subscribe模式, 這篇文章介紹更為經(jīng)典的Publish/Subscribe通信模式用來(lái)ZeroMQ的實(shí)現(xiàn),其通信方式如下圖:
客戶(hù)端(subscriber)向服務(wù)器(publisher)訂閱消息,然后服務(wù)器可以將消息推送到所有訂閱了消息的客戶(hù)端,這里也可以理解為廣播吧。。。。
好了,閑話(huà)不多說(shuō)了,直接上用ZeroMQ實(shí)現(xiàn)這種通信模式的代碼吧:
(1)服務(wù)端(publisher):
[java]?view plaincopy
package?pubsub;??????import?org.zeromq.ZMQ;????public?class?Publisher?{??????public?static?void?main(String?args[])?{????????????????ZMQ.Context?context?=?ZMQ.context(1);??//創(chuàng)創(chuàng)建包含一個(gè)I/O線程的context??????????ZMQ.Socket?publisher?=?context.socket(ZMQ.PUB);???//創(chuàng)建一個(gè)publisher類(lèi)型的socket,他可以向所有訂閱的subscriber廣播數(shù)據(jù)????????????????????publisher.bind("tcp://*:5555");??//將當(dāng)前publisher綁定到5555端口上,可以接受subscriber的訂閱????????????????????while?(!Thread.currentThread?().isInterrupted?())?{??????????????String?message?=?"fjs?hello";??//最開(kāi)始可以理解為pub的channel,subscribe需要訂閱fjs這個(gè)channel才能接收到消息??????????????publisher.send(message.getBytes());??????????}????????????publisher.close();??????????context.term();??????}??}??
代碼很簡(jiǎn)單吧,這里publisher來(lái)充當(dāng)服務(wù)端,所有的subscriber都需要建立于publisher的連接。,,。,
(2)客戶(hù)端(subscriber)代碼:
[java]?view plaincopy
package?pubsub;????import?org.zeromq.ZMQ;????public?class?Subscriber?{??????public?static?void?main(String?args[])?{??????????for?(int?j?=?0;?j?<?100;?j++)?{??????????????new?Thread(new?Runnable(){????????????????????public?void?run()?{??????????????????????//?TODO?Auto-generated?method?stub??????????????????????ZMQ.Context?context?=?ZMQ.context(1);??//創(chuàng)建1個(gè)I/O線程的上下文??????????????????????ZMQ.Socket?subscriber?=?context.socket(ZMQ.SUB);?????//創(chuàng)建一個(gè)sub類(lèi)型,也就是subscriber類(lèi)型的socket??????????????????????subscriber.connect("tcp://127.0.0.1:5555");????//與在5555端口監(jiān)聽(tīng)的publisher建立連接??????????????????????subscriber.subscribe("fjs".getBytes());?????//訂閱fjs這個(gè)channel????????????????????????????????????????????for?(int?i?=?0;?i?<?100;?i++)?{??????????????????????????byte[]?message?=?subscriber.recv();??//接收publisher發(fā)送過(guò)來(lái)的消息??????????????????????????System.out.println("receive?:?"?+?new?String(message));??????????????????????}??????????????????????subscriber.close();??????????????????????context.term();??????????????????}????????????????????????????????}).start();??????????}??????????????????????????}??}??
這里需要注意訂閱的channel問(wèn)題,如果這里錯(cuò)了的話(huà),subscriber是不會(huì)受到publisher發(fā)送過(guò)來(lái)的數(shù)據(jù)的
好了,到這里publish/subscribe的實(shí)現(xiàn)就算ok了
轉(zhuǎn)載于:https://www.cnblogs.com/jym-sunshine/p/5441470.html
總結(jié)
以上是生活随笔為你收集整理的ZeroMQ之Publish/Subscribe (Java)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。