ActiveMQ入门系列二:入门代码实例(点对点模式)
在上一篇《ActiveMQ入門系列一:認(rèn)識并安裝ActiveMQ(Windows下)》中,大致介紹了ActiveMQ和一些概念,并下載、安裝、啟動他,還訪問了他的控制臺頁面。
這篇,就用代碼實(shí)例說下如何實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi)。
一、理論基礎(chǔ)
同RabbitMQ一樣,ActiveMQ中也是有兩種模式:
- 點(diǎn)對點(diǎn)模式(Point to Point,簡寫為PTP)
- 發(fā)布/訂閱模式(Publish & Subscribe,簡寫為Pub & Sub)
通過上一篇我們知道了制造消息的應(yīng)用叫生產(chǎn)者(Producer),生產(chǎn)者在生產(chǎn)了消息后會發(fā)送消息到目的地(Destination),到達(dá)消費(fèi)和處理消息的應(yīng)用(也就是消費(fèi)者Consumer)。這里的兩種模式就通過對應(yīng)不同的消息目的地(Destination)來實(shí)現(xiàn),PTP對應(yīng)Queue(隊列)、Pub&Sub對應(yīng)Topic(主題)。
今天就詳細(xì)介紹下PTP和Queue,下一篇介紹Pub & Sub和Topic。
在PTP模式的示意圖:
?
- 消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息。
- 消息被消費(fèi)以后,queue中不再有存儲,所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。
- Queue支持存在多個消費(fèi)者,但是對一個消息而言,只會有一個消費(fèi)者可以消費(fèi)、其它的則不能消費(fèi)此消息了。
- 當(dāng)消費(fèi)者不存在時,消息會一直保存,直到有消費(fèi)消費(fèi)。
在PTP中,代碼實(shí)現(xiàn)有兩種方式:消費(fèi)者主動消費(fèi)和消費(fèi)者監(jiān)聽消費(fèi),下面就分別說下。
二、消費(fèi)者主動消費(fèi)
主動消費(fèi)是最基本也是最簡單的消費(fèi)方式,先上代碼:
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /*** @author JAVA開發(fā)老菜鳥**/ public class Producer {public static final String QUEUE_NAME = "ptp-demo";//隊列名public void producer(String message) throws JMSException {ConnectionFactory factory = null;Connection connection = null;Session session = null;MessageProducer producer = null;try {/*** 1.創(chuàng)建連接工廠* 創(chuàng)建工廠,構(gòu)造方法有三個參數(shù):分別是用戶名、密碼、連接地址* 無參構(gòu)造:有默認(rèn)的連接地址,localhost* 一個參數(shù):無驗證模式,無用戶的認(rèn)證* 三個參數(shù):有認(rèn)證和連接地址,我這里使用三個參數(shù)的構(gòu)造方法*/factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");/*** 2.創(chuàng)建連接,有兩個方法(我這里使用無參數(shù)的)* 無參數(shù)* 有參數(shù):用戶名、密碼;*/connection = factory.createConnection();/*** 3.啟動連接* 生產(chǎn)者可以不用調(diào)用start()方法啟動,因為在發(fā)送消息的時候回進(jìn)行檢查* 如果未啟動連接,會自動啟動。* 如果有特殊配置,需要配置完成后再啟動連接*/connection.start();/*** 4.用連接創(chuàng)建會話* 有兩個參數(shù):是否需要事務(wù)、消息確認(rèn)機(jī)制* 如果支持事務(wù),對于生產(chǎn)者來說第二個參數(shù)就無效了,這個時候第二個參數(shù)建議傳入Session.SESSION_TRANSACTED* 如果不支持事務(wù),第二個參數(shù)有效且必須傳遞** AUTO_ACKNOWLEDGE:自動確認(rèn),消息處理后自動確認(rèn)(商業(yè)開發(fā)不推薦)* CLIENT_ACKNOWLEDGE:客戶端手動確認(rèn),消費(fèi)者處理后必須手動確認(rèn)* DUPS_OK_ACKNOWLEDGE:有副本的客戶端手動確認(rèn),消息可以多次處理(不建議)*/session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);/*** 5.用會話創(chuàng)建目的地(隊列)、生產(chǎn)者、消息* 隊列名是隊列的唯一標(biāo)記* 創(chuàng)建生產(chǎn)者的時候可以指定目的地,也可以在發(fā)送消息的時候再指定*/Destination destination = session.createQueue(QUEUE_NAME);producer = session.createProducer(destination);TextMessage textMessage = session.createTextMessage(message);/*** 6.生產(chǎn)者發(fā)送消息到目的地*/producer.send(textMessage);System.out.println("消息發(fā)送成功");} catch(Exception ex){throw ex;} finally {/*** 7.釋放資源*/if(producer != null){producer.close();}if(session != null){session.close();}if(connection != null){connection.close();}}}public static void main(String[] args){Producer producer = new Producer();try{producer.producer("hello, activemq");} catch (Exception ex){ex.printStackTrace();}} }
?
?
好,這樣代碼就寫好了,我們來測試下。
1.先運(yùn)行生產(chǎn)者,我發(fā)現(xiàn)報錯了。。。
好吧,原來是我這次沒有啟動ActiveMQ,被自己蠢哭了。。。
啟動ActiveMQ之后,再運(yùn)行生產(chǎn)者,成功了。
去看下控制臺頁面的變化,隊列里面多了個“ptp-demo”隊列,這個就是我們生產(chǎn)者代碼里面的隊列名,并且能看到該隊列的基本情況:
從左到右依次為,有待消費(fèi)的消息1條、消費(fèi)者0個、已經(jīng)發(fā)送的消息1條、已經(jīng)消費(fèi)的消息0條
2.接下來運(yùn)行消費(fèi)者,成功
再去看下控制臺頁面,發(fā)現(xiàn)隊列信息變了,從左到右依次為:有待消費(fèi)的消息0條、消費(fèi)者0個、已經(jīng)發(fā)送的消息1條、已經(jīng)消費(fèi)的消息1條
也就是說,消息真的被消費(fèi)了!
?代碼寫完了,也按照預(yù)期執(zhí)行完了,我們現(xiàn)在再回過頭來分析下消費(fèi)者的代碼,會發(fā)現(xiàn)他在consumer.receive()之后不會再消費(fèi)其他消息了,即便后面再有消息被生產(chǎn)出來也不會再消費(fèi)。也就是說只能在運(yùn)行后消費(fèi)一次消息,這個就是主動消費(fèi)。
如果想要循環(huán)消費(fèi)多次產(chǎn)生的消息的話,怎么辦呢?請用下面的監(jiān)聽消費(fèi)
?
三、消費(fèi)者監(jiān)聽消費(fèi)
還是先上代碼,代碼結(jié)構(gòu)同主動消費(fèi)類似,有細(xì)微差別,具體代碼不貼了,可以到我的GitHub或碼云上獲取源碼
?
?
?
?
執(zhí)行生產(chǎn)者:
?
執(zhí)行消費(fèi)者,消息全部被消費(fèi)了:
?
再執(zhí)行2遍生產(chǎn)者,消息同樣都被消費(fèi)了。?
控制臺頁面多了個隊列,由于監(jiān)聽中的消費(fèi)者沒有關(guān)閉,因此這里能看到消費(fèi)者數(shù)量為1,我執(zhí)行了三遍生產(chǎn)者,因此消息有30條。
還沒完,繼續(xù)...
我們這次先啟動2個消費(fèi)者,然后啟動生產(chǎn)者
兩個生產(chǎn)者分別消費(fèi)了消息0,2,4,6,8和1,3,5,7,9
也就是說兩個消費(fèi)者都監(jiān)聽到了消息,并且activemq自動輪詢兩個監(jiān)聽器發(fā)送消息。
?
好,到這里,ActiveMQ的點(diǎn)對點(diǎn)模式就介紹完了。下一篇介紹發(fā)布訂閱模式,敬請期待
轉(zhuǎn)載于:https://www.cnblogs.com/sam-uncle/p/10988930.html
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ入门系列二:入门代码实例(点对点模式)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ES Next Arrow funct
- 下一篇: 【学习笔记】斜率优化