?http://wuzhaohuixy-qq-com.iteye.com/blog/908395
?
1、P2P模型
在P2P模型中,有下列概念:消息隊列(Queue)、發(fā)送者(Sender)、接收者(Receiver)。每個消息都被發(fā)送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到它們被消費或超時。
? 每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
? 發(fā)送者和接收者之間在時間上沒有依賴性,也就是說當發(fā)送者發(fā)送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發(fā)送到隊列。
? 接收者在成功接收消息之后需向隊列應(yīng)答成功
如果你希望發(fā)送的每個消息都應(yīng)該被成功處理的話,那么你需要P2P模型。
舉例:
//注冊消息監(jiān)聽器,當有消息發(fā)送過來的時候會調(diào)用onMessage方法(實現(xiàn)MessageListener 接口)
Java代碼 ?
import?javax.ejb.ActivationConfigProperty; ??import?javax.ejb.MessageDriven; ??import?javax.jms.JMSException; ??import?javax.jms.Message; ??import?javax.jms.MessageListener; ??import?javax.jms.TextMessage; ????@MessageDriven(activationConfig={ ??????????????@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"), ??????????????@ActivationConfigProperty(propertyName="destination",?propertyValue="queue/myqueue") ??????} ??) ??public?class?QueueMessageBean?implements?MessageListener?{ ????????public?void?onMessage(Message?msg)?{ ??????????????????????????????????????????????????????????????????????TextMessage?txtMsg?=?(TextMessage)msg; ??????????String?s?=?""; ??????????try?{ ??????????????s?=?txtMsg.getText(); ??????????}?catch?(JMSException?e)?{ ??????????????e.printStackTrace(); ??????????} ??????????System.out.println("QueueMessageBean接收到了消息:"?+?s); ??????} ??} ????import?javax.jms.Message; ??import?javax.jms.MessageProducer; ??import?javax.jms.Queue; ??import?javax.jms.QueueConnection; ??import?javax.jms.QueueConnectionFactory; ??import?javax.jms.QueueSession; ??import?javax.naming.InitialContext; ??????public?class?Test?{ ??????public?static?void?main(String[]?args)?throws?Exception?{ ??????InitialContext?ctx?=?new?InitialContext(); ????????????QueueConnectionFactory?factory?=?(QueueConnectionFactory)?ctx.lookup("QueueConnectionFactory"); ????????????QueueConnection?connection?=?factory.createQueueConnection(); ???????????? ?????? ??????QueueSession?session?=?(QueueSession)?connection.createQueueSession(false,?QueueSession.AUTO_ACKNOWLEDGE); ????????????Queue?queue?=?(Queue)?ctx.lookup("queue/myqueue"); ????????????MessageProducer?sender?=?session.createProducer(queue); ????????????Message?msg?=?session.createTextMessage("消息來了"); ????????????sender.send(queue,?msg); ??????session.close(); ??????connection.close(); ?????????? ??????} ??}?? import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;@MessageDriven(activationConfig={@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),@ActivationConfigProperty(propertyName="destination", propertyValue="queue/myqueue")}
)
public class QueueMessageBean implements MessageListener {public void onMessage(Message msg) {//共有下面幾種消息類型//1 Text//2 Map//3 Object//4 stream//5 byteTextMessage txtMsg = (TextMessage)msg;String s = "";try {s = txtMsg.getText();} catch (JMSException e) {e.printStackTrace();}System.out.println("QueueMessageBean接收到了消息:" + s);}
}
//客戶端調(diào)用
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.naming.InitialContext;public class Test {public static void main(String[] args) throws Exception {InitialContext ctx = new InitialContext();//獲得QueueConnectionFactory對象QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");//創(chuàng)建QueueConnection對像 QueueConnection connection = factory.createQueueConnection();//創(chuàng)建會話//arg1:與事物有關(guān),true表示最后提交,false表示自動提交//arg2:表示消息向中間件發(fā)送確認通知,這里采用的是自動通知的類型QueueSession session = (QueueSession) connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);//取得destinationQueue queue = (Queue) ctx.lookup("queue/myqueue");//消息生產(chǎn)者MessageProducer sender = session.createProducer(queue);//定義消息Message msg = session.createTextMessage("消息來了");//發(fā)送消息sender.send(queue, msg);session.close();connection.close();}
}
2、Pub/Sub模式
在Pub/Sub模型中,有下列概念: 主題(Topic)、發(fā)布者(Publisher)、訂閱者(Subscriber)。客戶端將消息發(fā)送到主題。多個發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞給多個訂閱者。
? 每個消息可以有多個消費者
? 發(fā)布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創(chuàng)建一個訂閱之后,才能消費發(fā)布者的消息,而且,為了消費消息,訂閱者必須保持運行的狀態(tài)。
當然,為了緩和這種嚴格的時間相關(guān)性,JMS允許訂閱者創(chuàng)建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發(fā)布者的消息。
如果你希望發(fā)送的消息可以不被做任何處理、或者被一個消費者處理、或者可以被多個消費者處理的話,那么可以采用Pub/Sub模型。
//注冊消息監(jiān)聽器,當有消息發(fā)送過來的時候會調(diào)用onMessage方法(實現(xiàn)MessageListener 接口)
Java代碼 ?
import?javax.ejb.ActivationConfigProperty; ??import?javax.ejb.MessageDriven; ??import?javax.jms.JMSException; ??import?javax.jms.Message; ??import?javax.jms.MessageListener; ??import?javax.jms.TextMessage; ????@MessageDriven(activationConfig={ ??????????????@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"), ??????????????@ActivationConfigProperty(propertyName="destination",?propertyValue="topic/myTopic") ??????} ??) ??public?class?TopicMessageBean?implements?MessageListener?{ ????????public?void?onMessage(Message?msg)?{ ??????????????????????????????????????????????????????????????????????TextMessage?txtMsg?=?(TextMessage)msg; ??????????String?s?=?""; ??????????try?{ ??????????????s?=?txtMsg.getText(); ??????????}?catch?(JMSException?e)?{ ??????????????e.printStackTrace(); ??????????} ??????????System.out.println("TopicMessageBean接收到了消息:"?+?s); ??????} ??} ??????import?javax.jms.MessageProducer; ??import?javax.jms.Topic; ??import?javax.jms.TopicConnection; ??import?javax.jms.TopicConnectionFactory; ??import?javax.jms.TopicSession; ??import?javax.naming.InitialContext; ??????public?class?Test?{ ??????public?static?void?main(String[]?args)?throws?Exception?{ ??????InitialContext?ctx?=?new?InitialContext(); ????????????TopicConnectionFactory?factory?=?(TopicConnectionFactory)?ctx.lookup("TopicConnectionFactory"); ????????????TopicConnection?connection?=?factory.createTopicConnection(); ???????????? ?????? ??????TopicSession?session?=?(TopicSession)?connection.createTopicSession(false,?TopicSession.AUTO_ACKNOWLEDGE); ????????????Topic?queue?=?(Topic)?ctx.lookup("topic/myTopic"); ????????????MessageProducer?publisher?=?session.createProducer(queue); ????????????Message?msg?=?session.createTextMessage("消息來了"); ????????????publisher.send(queue,?msg); ??????session.close(); ??????connection.close(); ?????????? ??????} ??}?? import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;@MessageDriven(activationConfig={@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),@ActivationConfigProperty(propertyName="destination", propertyValue="topic/myTopic")}
)
public class TopicMessageBean implements MessageListener {public void onMessage(Message msg) {//共有下面幾種消息類型//1 Text//2 Map//3 Object//4 stream//5 byteTextMessage txtMsg = (TextMessage)msg;String s = "";try {s = txtMsg.getText();} catch (JMSException e) {e.printStackTrace();}System.out.println("TopicMessageBean接收到了消息:" + s);}
}//客戶端測試
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.InitialContext;public class Test {public static void main(String[] args) throws Exception {InitialContext ctx = new InitialContext();//獲得QueueConnectionFactory對象TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");//創(chuàng)建QueueConnection對像 TopicConnection connection = factory.createTopicConnection();//創(chuàng)建會話//arg1:與事物有關(guān),true表示最后提交,false表示自動提交//arg2:表示消息向中間件發(fā)送確認通知,這里采用的是自動通知的類型TopicSession session = (TopicSession) connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);//取得destinationTopic queue = (Topic) ctx.lookup("topic/myTopic");//消息生產(chǎn)者MessageProducer publisher = session.createProducer(queue);//定義消息Message msg = session.createTextMessage("消息來了");//發(fā)送消息publisher.send(queue, msg);session.close();connection.close();}
}
二種模型的實現(xiàn)結(jié)果:對于p2p模型的每個消息只能有一個消費者? 如果我們定義二個消息接受者的Bean那么只能有一端會接收到消息。當你把部署在Jboss中的消息接收Bean去掉以后,然后發(fā)送消息 此時消息在隊列中,一旦你重新部署他會立刻就接收到剛剛發(fā)送的消息所以它沒有時間的依賴性, pub/sub模型可以有多個消費者 在這個模型中如果我們定義多個接收消息的Bean當我們在客戶端發(fā)送消息的時候二個bean都會接收到消息,所以他有多個消費者 但是如果你把Jboss部署中的消息接收bean去掉之后,發(fā)送消息。然后在重新部署,那么消息也無法接收到,所以說他有時間的依賴性。
//代碼中幾個概念的理解
Connection Factory
創(chuàng)建Connection對象的工廠,針對兩種不同的JMS消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
Destination
Destination的意思是消息生產(chǎn)者的消息發(fā)送目標或者說消息消費者的消息來源。對于消息生產(chǎn)者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。
所以,Destination實際上就是兩種類型的對象:Queue、Topic。
可以通過JNDI來查找Destination。
Connection:
Connection表示在客戶端和JMS系統(tǒng)之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產(chǎn)生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
Session:
Session是我們操作消息的接口。可以通過session創(chuàng)建生產(chǎn)者、消費者、消息等。Session提供了事務(wù)的功能。當我們需要使用session發(fā)送/接收多個消息時,可以將這些發(fā)送/接收動作放到一個事務(wù)中。同樣,也分QueueSession和TopicSession。
消息生產(chǎn)者:
消息生產(chǎn)者由Session創(chuàng)建,并用于將消息發(fā)送到Destination。同樣,消息生產(chǎn)者分兩種類型:QueueSender和TopicPublisher。可以調(diào)用消息生產(chǎn)者的方法(send或publish方法)發(fā)送消息!
消息消費者:
消息消費者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創(chuàng)建。當然,也可以通過session的createDurableSubscriber方法來創(chuàng)建持久化的訂閱者。
MessageListener:
消息監(jiān)聽器。如果注冊了消息監(jiān)聽器,一旦消息到達,將自動調(diào)用監(jiān)聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
MDB介紹:
對客戶端來說,message-driven bean就是異步消息的消費者。當消息到達之后,由容器負責調(diào)用MDB。客戶端發(fā)送消息到destination,MDB作為一個MessageListener接收消息。
總結(jié)
以上是生活随笔為你收集整理的JMS的两种消息模型(Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub))应用举例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。