ActiveMQ的queue以及topic两种消息处理机制分析
轉自: http://itindex.net/detail/50057-activemq-queue-topic
?
上一期介紹了我們項目要用到activeMQ來作為jms總線,并且給大家介紹了activeMQ的集群和高可用部署方案,本期給大家再介紹下,如何根據自己的項目需求,更好地使用activeMQ的兩種消息處理模式。
?
???????
?
1??? queue與topic的技術特點對比
?
???????????? topic??????????????????????????????????????????????????????????????????? queue
?
| ????? | ????? topic | |
| 概要 | Publish Subscribe messaging 發布訂閱消息 | Point-to-Point 點對點 | 
| 有無狀態 | topic數據默認不落地,是無狀態的。 | Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。 | 
| 完整性保障 | 并不保證publisher發布的每條數據,Subscriber都能接受到。 | Queue保證每條數據都能被receiver接收。 | 
| 消息是否會丟失 | 一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。 | Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。 | 
| 消息發布接收策略 | 一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器 | 一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。 | 
?
????????? Topic和queue的最大區別在于topic是以廣播的形式,通知所有在線監聽的客戶端有新的消息,沒有監聽的客戶端將收不到消息;而queue則是以點對點的形式通知多個處于監聽狀態的客戶端中的一個。
?
?
?
2??? topic和queue方式的消息處理效率比較
?
??????? 通過增加監聽客戶端的并發數來驗證,topic的消息推送,是否會因為監聽客戶端的并發上升而出現明顯的下降,測試環境的服務器為ci環境的ActiveMQ,客戶端為我的本機。
?
??????? 從實測的結果來看,topic方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者(線程)并發的 前提下,效率差異很明顯(由于500線程并發的情況下,我本機的cpu占用率已高達70-90%,所以無法確認是我本機測試造成的性能瓶頸還是topic 消息發送方式存在性能瓶頸,造成效率下降如此明顯)。
?
??????? Topic方式發送的消息與queue方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者并發的前提下,topic方式的效率明顯低于queue。
?
??????? Queue方式發送的消息,在一個訂閱者、100個訂閱者和500個訂閱者的前提下,發送和接收的效率沒有明顯變化。
?
Topic實測數據:
?
?
?
| 
 | 發送者發送的消息總數 | 所有訂閱者接收到消息的總數 | 消息發送和接收平均耗時 | 
| 單訂閱者 | 100 | 100 | 101ms | 
| 100訂閱者 | 100 | 10000 | 103ms | 
| 500訂閱者 | 100 | 50000 | 14162ms | 
?
?
?
Queue實測數據:
?
?
?
| 
 | 發送者發送的消息總數 | 所有訂閱者接收到消息的總數 | 消息發送和接收平均耗時 | 
| 單訂閱者 | 100 | 100 | 96ms | 
| 100訂閱者 | 100 | 100 | 96ms | 
| 500訂閱者 | 100 | 100 | 100ms | 
?
?
?
3???? topic方式的消息處理示例
?
3.1???? 通過客戶端代碼調用來發送一個topic的消息:
?
import javax.jms.Connection;
?
import javax.jms.ConnectionFactory;
?
import javax.jms.DeliveryMode;
?
import javax.jms.Destination;
?
import javax.jms.MessageProducer;
?
import javax.jms.Session;
?
import javax.jms.TextMessage;
?
?
?
import org.apache.activemq.ActiveMQConnection;
?
import org.apache.activemq.ActiveMQConnectionFactory;
?
?
?
publicclass SendTopic {
?
??? privatestaticfinalint SEND_NUMBER = 5;
?
??? publicstaticvoid sendMessage(Session session, MessageProducer producer)
?
??????????? throws Exception {
?
?? ????? for ( int i = 1; i <= SEND_NUMBER; i++) {
?
??????????? TextMessage message = session
?
??????????????????? .createTextMessage("ActiveMq發送的消息" + i);
?
??????????? //發送消息到目的地方
?
??????????? System. out.println("發送消息:" + "ActiveMq 發送的消息" + i);
?
??????????? producer.send(message);
?
??????? }
?
??? }
?
???
?
??? publicstaticvoid main(String[] args) {
?
??????? // ConnectionFactory:連接工廠,JMS用它創建連接
?
??????? ConnectionFactory connectionFactory;
?
??????? // Connection:JMS客戶端到JMS Provider的連接
?
??????? Connection connection = null;
?
??????? // Session:一個發送或接收消息的線程
?
??????? Session session;
?
??????? // Destination:消息的目的地;消息發送給誰.
?
??????? Destination destination;
?
??????? // MessageProducer:消息發送者
?
??????? MessageProducer producer;
?
??????? // TextMessage message;
?
??????? //構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar
?
??????? connectionFactory = new ActiveMQConnectionFactory(
?
??????????????? ActiveMQConnection. DEFAULT_USER,
?
??????????????? ActiveMQConnection. DEFAULT_PASSWORD,
?
??????????????? "tcp://10.20.8.198:61616");
?
??????? try {
?
??????????? //構造從工廠得到連接對象
?
??????????? connection = connectionFactory.createConnection();
?
??????????? //啟動
?
??????????? connection.start();
?
??????????? //獲取操作連接
?
??????????? session = connection.createSession( true, Session. AUTO_ACKNOWLEDGE);
?
??????????? //獲取session注意參數值FirstTopic是一個服務器的topic(與queue消息的發送相比,這里是唯一的不同)
?
??????????? destination = session.createTopic("FirstTopic");
?
??????? ????//得到消息生成者【發送者】
?
??????????? producer = session.createProducer(destination);
?
??????????? //設置不持久化,此處學習,實際根據項目決定
?
??????????? producer.setDeliveryMode(DeliveryMode. PERSISTENT);
?
??????????? //構造消息,此處寫死,項目就是參數,或者方法獲取
?
??????????? sendMessage(session, producer);
?
??????????? session.commit();
?
??????? } catch (Exception e) {
?
??????????? e.printStackTrace();
?
??????? } finally {
?
??????????? try {
?
??????????????? if ( null != connection)
?
??????????????????? connection.close();
?
??????????? } catch (Throwable ignore) {
?
??????????? }
?
??????? }
?
??? }
?
}
?
?
?
3.2???? 啟動多個客戶端監聽來接收topic的消息:
?
publicclass ReceiveTopic implements Runnable {
?
????? private StringthreadName;
?
?
?
????? ReceiveTopic(String threadName) {
?
?????????? this.threadName = threadName;
?
????? }
?
?
?
????? publicvoid run() {
?
?????????? // ConnectionFactory:連接工廠,JMS用它創建連接
?
?????????? ConnectionFactory connectionFactory;
?
?????????? // Connection:JMS客戶端到JMS Provider的連接
?
?????????? Connection connection = null;
?
?????????? // Session:一個發送或接收消息的線程
?
?????????? Session session;
?
?????????? // Destination:消息的目的地;消息發送給誰.
?
?????????? Destination destination;
?
?????????? //消費者,消息接收者
?
?????????? MessageConsumer consumer;
?
?????????? connectionFactory = new ActiveMQConnectionFactory(
?
????????????????????? ActiveMQConnection. DEFAULT_USER,
?
????????????????????? ActiveMQConnection. DEFAULT_PASSWORD,"tcp://10.20.8.198:61616");
?
?????????? try {
?
???????????????? //構造從工廠得到連接對象
?
???????????????? connection = connectionFactory.createConnection();
?
???????????????? //啟動
?
???????????????? connection.start();
?
???????????????? //獲取操作連接,默認自動向服務器發送接收成功的響應
?
???????????????? session = connection.createSession( false, Session. AUTO_ACKNOWLEDGE);
?
???????????????? //獲取session注意參數值FirstTopic是一個服務器的topic
?
???????????????? destination = session.createTopic("FirstTopic");
?
???????????????? consumer = session.createConsumer(destination);
?
???????????????? while ( true) {
?
????????????????????? //設置接收者接收消息的時間,為了便于測試,這里設定為100s
?
????????????????????? TextMessage message = (TextMessage) consumer
?
????????????????????????????????? .receive(100 * 1000);
?
????????????????????? if ( null != message) {
?
??????????????????????????? System. out.println("線程"+threadName+"收到消息:" + message.getText());
?
????????????????????? } else {
?
??????????????????????????? continue;
?
????????????????????? }
?
???????????????? }
?
?????????? } catch (Exception e) {
?
???????????????? e.printStackTrace();
?
?????????? } finally {
?
???????????????? try {
?
????????????????????? if ( null != connection)
?
??????????????????????????? connection.close();
?
???????????????? } catch (Throwable ignore) {
?
???????????????? }
?
?????????? }
?
????? }
?
?
?
????? publicstaticvoid main(String[] args) {
?
????? ????? //這里啟動3個線程來監聽FirstTopic的消息,與queue的方式不一樣三個線程都能收到同樣的消息
?
?????????? ReceiveTopic receive1= new ReceiveTopic("thread1");
?
?????????? ReceiveTopic receive2= new ReceiveTopic("thread2");
?
?????????? ReceiveTopic receive3= new ReceiveTopic("thread3");
?
?????????? Thread thread1= new Thread(receive1);
?
?????????? Thread thread2= new Thread(receive2);
?
?????????? Thread thread3= new Thread(receive3);
?
?????????? thread1.start();
?
?????????? thread2.start();
?
?????????? thread3.start();
?
????? }
?
}
?
?
?
4???? queue方式的消息處理示例
?
總結
以上是生活随笔為你收集整理的ActiveMQ的queue以及topic两种消息处理机制分析的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 使用Solr 增加索引以及检索
- 下一篇: webshpere缓存--web.xml
