ActiveMQ消息传递的两种方式
1.什么是ActiveMQ?
ActiveMQ是apache提供的開源的,實現消息傳遞的一個中間插件,可以和spring整合,是目前最流行的開源消息總線,ActiveMQ是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現。較相似的還有rabbitMQ和kafka等,都是最為消息傳遞的插件
2.ActiveMQ傳遞消息的兩種方式
前提:需要引入activemq的jar包
點對點方式(PTP):一個消費者對應一個生產者
發布/訂閱模式(Publish/Sub):一個生產者產生消息發送后,可以被多個消費者進行接收。
JMS定義了五種消息正文格式,以及消息的調用類型,允許發送和接收一些不同類型的數據,提供現有消息格式的一些級別的兼容性。
StreamMessage:--JAVA原始的數據流
TextMessage:一個字符串對象
ObjectMessage:一個系列化的java對象
BytesMessage:一個字節對象
MapMessage:key/value方式的鍵值對
(1)點對點的方式(PTP)
即:一個消息的生產者對應一個消費者
生產者(Producer)實現步驟:
第一步:創建一個ConnectionFactory對象,將服務端activemq的 ip 和 port 作為構造參數傳遞
第二步:通過第一步創建的工廠對象獲得連接對象Connection
第三步:開啟連接,直接調用connection對象的start方法即可
第四步:創建一個Session對象,通過connection對象創建
第五步:通過Session對象創建一個Destination對象(該對象有兩種方式:topic和quene),這里使用quene
第六步:通過Session對象創建一個生產者Producer對象
第七步:創建Message對象,這里使用TextMessage對象,設置消息內容
第八步:使用創建的生產者對象Producer發送消息
第九步:關閉資源(Producer對象,Connection對象,Session對象)
@Testpublic void testQueueProducer() throws Exception {// 第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。//brokerURL服務器的ip及端口號ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://ip地址:61616");// 第二步:使用ConnectionFactory對象創建一個Connection對象。Connection connection = connectionFactory.createConnection();// 第三步:開啟連接,調用Connection對象的start方法。 connection.start();// 第四步:使用Connection對象創建一個Session對象。//第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。//第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。一般是自動應答。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Queue對象。//參數:隊列的名稱。Queue queue = session.createQueue("test-queue");// 第六步:使用Session對象創建一個Producer對象。MessageProducer producer = session.createProducer(queue);// 第七步:創建一個Message對象,創建一個TextMessage對象。/*TextMessage message = new ActiveMQTextMessage();message.setText("hello activeMq,this is my first test.");*/TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");// 第八步:使用Producer對象發送消息。 producer.send(textMessage);// 第九步:關閉資源。 producer.close();session.close();connection.close();}消費者實現:
第一步:創建一個ConnectionFactory對象,將服務端activemq的 ip 和 port 作為構造參數傳遞
第二步:通過第一步創建的工廠對象獲得連接對象Connection
第三步:開啟連接,直接調用connection對象的start方法即可
第四步:創建一個Session對象,通過connection對象創建
第五步:創建一個Destination對象,使用quene,需要和生產者的quene一致
第六步:創建一個消費者對象
第七步:接收消息
第八步:打印接收的消息
第九步:關閉資源
消費者的代碼:
@Testpublic void testQueueConsumer() throws Exception {// 第一步:創建一個ConnectionFactory對象。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");// 第二步:從ConnectionFactory對象中獲得一個Connection對象。Connection connection = connectionFactory.createConnection();// 第三步:開啟連接。調用Connection對象的start方法。 connection.start();// 第四步:使用Connection對象創建一個Session對象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對象創建一個Destination對象。和發送端保持一致queue,并且隊列的名稱一致。Queue queue = session.createQueue("test-queue");// 第六步:使用Session對象創建一個Consumer對象。MessageConsumer consumer = session.createConsumer(queue);// 第七步:接收消息。consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;//取消息的內容text = textMessage.getText();// 第八步:打印消息。 System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});//等待鍵盤輸入 System.in.read();// 第九步:關閉資源 consumer.close();session.close();connection.close();} View Code(2)訂閱發布方式傳遞消息:Topic??
補充:由于topic傳遞消息的特點是,一個生產者可以有多個消費者,生產者生產的消息在沒有被消費者消費之前,并不會將消息持久化到activemq的服務端,發送的消息會自動消失。所以 測試的時候需要先創建消費者對象,然后在發送消息,防止消息丟失。
生產者實現步驟:
步驟和PTP的方式完全一樣,不同的是在創建Destination對象的時候,需要創建topic對象
直接上代碼:
@Testpublic void testTopicProducer() throws Exception {// 第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。// brokerURL服務器的ip及端口號ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");// 第二步:使用ConnectionFactory對象創建一個Connection對象。Connection connection = connectionFactory.createConnection();// 第三步:開啟連接,調用Connection對象的start方法。 connection.start();// 第四步:使用Connection對象創建一個Session對象。// 第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。// 第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。一般是自動應答。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個topic對象。// 參數:話題的名稱。Topic topic = session.createTopic("test-topic");// 第六步:使用Session對象創建一個Producer對象。MessageProducer producer = session.createProducer(topic);// 第七步:創建一個Message對象,創建一個TextMessage對象。/** TextMessage message = new ActiveMQTextMessage(); message.setText(* "hello activeMq,this is my first test.");*/TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");// 第八步:使用Producer對象發送消息。 producer.send(textMessage);// 第九步:關閉資源。 producer.close();session.close();connection.close();} View Code消費者實現的步驟:
步驟和PTP消費者實現的步驟一樣,唯一不同的是在創建Destination對象的時候,創建topic對象,同時要和發布訂閱的生產者的topic一致
消費者代碼:
@Testpublic void testTopicConsumer() throws Exception {// 第一步:創建一個ConnectionFactory對象。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");// 第二步:從ConnectionFactory對象中獲得一個Connection對象。Connection connection = connectionFactory.createConnection();// 第三步:開啟連接。調用Connection對象的start方法。 connection.start();// 第四步:使用Connection對象創建一個Session對象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對象創建一個Destination對象。和發送端保持一致topic,并且話題的名稱一致。Topic topic = session.createTopic("test-topic");// 第六步:使用Session對象創建一個Consumer對象。MessageConsumer consumer = session.createConsumer(topic);// 第七步:接收消息。consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;// 取消息的內容text = textMessage.getText();// 第八步:打印消息。 System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});System.out.println("topic的消費端03。。。。。");// 等待鍵盤輸入 System.in.read();// 第九步:關閉資源 consumer.close();session.close();connection.close();} View Code總結:兩種傳遞消息的方式的異同
相同點:實現步驟基本一樣,大同小異
不同點:PTP傳遞消息的方法,消息的生產者發送以后,消息會持久化在activemq的服務端,如果該消息給消費者消費,在服務端持久化的消息也就同時被刪除。
發布訂閱傳遞消息的方法:消息的生產者發送消息以后,如果沒有消費者消費,消息不會持久化在activemq的客戶端,會立即消失。如果創建的消息被消費,會的activemq的服務端顯示消息相關內容。這一點和PTP剛好相反。
注意:發布訂閱傳遞消息的方式:也是可以實現消息持久化在服務端的,需要消費者首先在activemq的服務端訂閱消息(注冊),將消費者客戶端的ID(作為唯一標識,因為可以有多個消費者)和消息的ID傳遞給服務端即可。
?
轉載于:https://www.cnblogs.com/shuai-server/p/8966299.html
總結
以上是生活随笔為你收集整理的ActiveMQ消息传递的两种方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [转].Net实现本地化简易教程
- 下一篇: linux虚拟机时间不准的问题