java 消息队列服务_ActiveMQ 消息队列服务
1?ActiveMQ簡介
1.1?ActiveMQ是什么
ActiveMQ是一個消息隊列應用服務器(推送服務器)。支持JMS規范。
1.1.1?JMS概述
全稱:Java Message Service ,即為Java消息服務,是一套java消息服務的API標準。(標準即接口)
實現了JMS標準的系統,稱之為JMS Provider。
1.1.2?消息隊列
1.1.2.1?概念
消息隊列是在消息的傳輸過程中保存消息的容器,提供一種不同進程或者同一進程不同線程直接通訊的方式。
Producer:消息生產者,負責產生和發送消息到Broker;
Broker:消息處理中心。負責消息存儲、確認、重試等,一般其中會包含多個queue;
Consumer:消息消費者,負責從Broker中獲取消息,并進行相應處理;
1.1.2.2?常見消息隊列應用
(1)、ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現。
(2)、RabbitMQ
RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。開發語言為Erlang。
(3)、RocketMQ
由阿里巴巴定義開發的一套消息隊列應用服務。
1.2?ActiveMQ能做什么
(1)實現兩個不同應用(程序)之間的消息通訊。
(2)實現同一個應用,不同模塊之間的消息通訊。(確保數據發送的穩定性)
1.3?ActiveMQ下載
ActiveMQ下載地址:http://activemq.apache.org/download-archives.html
--可供下載的歷史版本
--說明:
ActiveMQ 5.10.x以上版本必須使用JDK1.8才能正常使用。
ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。
--根據操作系統,選擇下載版本。(本教程下載Linux版本)
1.4?ActiveMQ主要特點
(1)支持多語言、多協議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議:OpenWire,Stomp REST,WS Notification,XMPP,AMQP
(2)對Spring的支持,ActiveMQ可以很容易整合到Spring的系統里面去。
(3)支持高可用、高性能的集群模式。
2?入門示例
2.1?需求
使用ActiveMQ實現消息隊列模型。
2.2?配置步驟說明
(1)搭建ActiveMQ消息服務器。
(2)創建一個java項目。
(3)創建消息生產者,發送消息。
(4)創建消息消費者,接收消息。
2.3?第一部分:搭建ActiveMQ消息服務器
2.3.1?第一步:下載、上傳至Linux
--說明:確保已經安裝了jdk
2.3.2?第二步:安裝到/usr/local/activemq目錄
(1)解壓到/usr/local目錄下
[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz?-C /usr/local
(2)修改名稱為activemq
[root@node07192 ~]# cd /usr/local/
[root@node07192 local]# mv apache-activemq-5.9.0/ activemq
2.3.3?第三步:啟動ActiveMQ服務器
--說明:ActiveMQ是免安裝軟件,解壓即可啟動服務。
[root@node07192 local]# cd activemq/bin
[root@node07192 bin]# ./activemq start
--查看ActiveMQ啟動狀態
[root@node07192 bin]# ./activemq status
2.3.4?第四步:瀏覽器訪問ActiveMQ管理界面
2.3.4.1?Step1:查看ActiveMQ管理界面的服務端口。在/conf/jetty.xml中
--訪問管理控制臺的服務端口,默認為:8161
[root@node07192 bin]# cd ../conf
[root@node07192 conf]# vim jetty.xml
2.3.4.2?Step2:查看ActiveMQ用戶、密碼。在/conf/users.properties中:
--默認的用戶名、密碼均為amdin
[root@node07192 conf]# vim users.properties
2.3.4.3?Step3:訪問ActiveMQ管理控制臺。地址:http://ip:8161/
--注意:防火墻是沒有配置該服務的端口的。
因此,要訪問該服務,必須在防火墻中配置。
(1)修改防火墻,開放8161端口
[root@node07192 conf]# vim /etc/sysconfig/iptables
(2)重啟防火墻
[root@node07192 conf]# service iptables restart
(3)登錄管理控制臺
--登陸,用戶名、密碼均為admin
--控制臺主界面
--搭建ActiveMQ服務器成功!!!
2.4?第二部分:創建java項目,導入jar包
--導包說明:
ActiveMQ的解壓包中,提供了運行ActiveMQ的所有jar。
--創建項目
2.5?第三部分:創建消息生成者,發送消息
--說明:ActiveMQ是實現了JMS規范的。在實現消息服務的時候,必須基于API接口規范。
2.5.1?JMS常用的API說明
下述API都是接口類型,定義在javax.jms包中,是JMS標準接口定義。ActiveMQ完全實現這一套api標準。
2.5.1.1?ConnectionFactory
鏈接工廠, 用于創建鏈接的工廠類型。
2.5.1.2?Connection
鏈接,用于建立訪問ActiveMQ連接的類型,由鏈接工廠創建。
2.5.1.3?Session
會話, 一次持久有效、有狀態的訪問,由鏈接創建。
2.5.1.4?Destination ?& ?Queue & Topic
目的地, 即本次訪問ActiveMQ消息隊列的地址,由Session會話創建。
(1)interfaceQueue?extends Destination
(2)Queue:隊列模型,只有一個消費者。消息一旦被消費,默認刪除。
(3)Topic:主題訂閱中的消息,會發送給所有的消費者同時處理。
2.5.1.5?Message
消息,在消息傳遞過程中數據載體對象,是所有消息【文本消息TextMessage,對象消息ObjectMessage等】具體類型的頂級接口,可以通過會話創建或通過會話從ActiveMQ服務中獲取。
2.5.1.6?MessageProducer
消息生成者, 在一次有效會話中,用于發送消息給ActiveMQ服務的工具,由Session會話創建。
2.5.1.7?MessageCustomer
消息消費者【消息訂閱者,消息處理者】, 在一次有效會話中,用于ActiveMQ服務中獲取消息的工具,由Session會話創建。
我們定義的消息生產者和消費者,都是基于上面API實現的。
2.5.2?第一步:創建MyProducer類,定義sendMessage方法
package?cn.gzsxt.mq.producer;
import?javax.jms.Connection;
import?javax.jms.ConnectionFactory;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageProducer;
import?javax.jms.Session;
import?org.apache.activemq.ActiveMQConnectionFactory;
public?class?MyProducer {
// 定義鏈接工廠
ConnectionFactory connectionFactory?= null;
// 定義鏈接
Connection connection?= null;
// 定義會話
Session session?= null;
// 定義目的地
Destination destination?= null;
// 定義消息生成者
MessageProducer producer?= null;
// 定義消息
Message message?= null;
public?void?sendToMQ(){
try{
/*
* 創建鏈接工廠
* ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類.
* 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* ?userName - 訪問ActiveMQ服務的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.
* ?password - 訪問ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置文件配置.
* ?brokerURL - 訪問ActiveMQ服務的路徑地址.路徑結構為-協議名://主機地址:端口號
* ?????此鏈接基于TCP/IP協議.
*/
connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創建鏈接對象
connection?= connectionFactory.createConnection();
// 啟動鏈接
connection.start();
/*
* 創建會話對象
* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);
* ?transacted - 是否使用事務,可選值為true|false
* ?????true - 使用事務,當設置此變量值,則acknowledgeMode參數無效,建議傳遞的acknowledgeMode參數值為
* ?????????Session.SESSION_TRANSACTED
* ?????false - 不使用事務,設置此變量值,則acknowledgeMode參數必須設置.
* ?acknowledgeMode - 消息確認機制,可選值為:
* ?????Session.AUTO_ACKNOWLEDGE - 自動確認消息機制
* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制
* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制
*/
session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應的隊列
destination?= session.createQueue("test-mq");
// 創建消息生成者,創建的消息生成者與某目的地對應,即方法參數目的地.
producer?= session.createProducer(destination);
// 創建消息對象,創建一個文本消息,此消息對象中保存要傳遞的文本數據.
message?= session.createTextMessage("hello,activeme");
// 發送消息
producer.send(message);
System.out.println("消息發送成功!");
}catch(Exception e){
e.printStackTrace();
System.out.println("訪問ActiveMQ服務發生錯誤!!");
}finally{
try?{
// 回收消息發送者資源
if(null?!= producer)
producer.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收會話資源
if(null?!= session)
session.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收鏈接資源
if(null?!= connection)
connection.close();
} catch?(JMSException e) {
e.printStackTrace();
}
}
}
}
2.5.3?第二步:創建一個測試類MessageTest
--添加junit類庫,快捷鍵ctrl+1
package?cn.gzsxt.mq.test;
import?org.junit.Test;
import?cn.gzsxt.mq.producer.MyProducer;
public?class?MessageTest {
@Test
public?void?sendToMQ(){
MyProducer producer?= new?MyProducer();
producer.sendToMQ();
}
}
2.5.4?第三步:測試
(1)設置防火墻,配置61616端口。注意修改之后重啟防火墻。
(2)測試結果:
--查看控制臺
--查看ActiveMQ管理控制界面
--消息發送成功!!!
2.6?第四部分:創建消息消費者,消費消息
2.6.1?第一步:創建MyConsumer類
package?cn.gzsxt.mq.consumer;
import?javax.jms.Connection;
import?javax.jms.ConnectionFactory;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageConsumer;
import?javax.jms.Session;
import?javax.jms.TextMessage;
import?org.apache.activemq.ActiveMQConnectionFactory;
/**
* @ClassName:MyConsumer
* @Description: 消息消費者代碼
*/
public?class?MyConsumer {
// 定義鏈接工廠
ConnectionFactory connectionFactory?= null;
// 定義鏈接
Connection connection?= null;
// 定義會話
Session session?= null;
// 定義目的地
Destination destination?= null;
// 定義消息消費者
MessageConsumer consumer?= null;
// 定義消息
Message message?= null;
public?void?recieveFromMQ(){
try{
/*
* 創建鏈接工廠
* ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類.
* 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* ?userName - 訪問ActiveMQ服務的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.
* ?password - 訪問ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置文件配置.
* ?brokerURL - 訪問ActiveMQ服務的路徑地址.路徑結構為-協議名://主機地址:端口號
* ?????此鏈接基于TCP/IP協議.
*/
connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創建鏈接對象
connection?= connectionFactory.createConnection();
// 啟動鏈接
connection.start();
/*
* 創建會話對象
* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);
* ?transacted - 是否使用事務,可選值為true|false
* ?????true - 使用事務,當設置此變量值,則acknowledgeMode參數無效,建議傳遞的acknowledgeMode參數值為
* ?????????Session.SESSION_TRANSACTED
* ?????false - 不使用事務,設置此變量值,則acknowledgeMode參數必須設置.
* ?acknowledgeMode - 消息確認機制,可選值為:
* ?????Session.AUTO_ACKNOWLEDGE - 自動確認消息機制
* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制
* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制
*/
session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應的隊列
destination?= session.createQueue("test-mq");
// 創建消息消費者,創建的消息消費者與某目的地對應,即方法參數目的地.
consumer?= session.createConsumer(destination);
// 從ActiveMQ服務中獲取消息
message?= consumer.receive();
TextMessage tMsg?= (TextMessage) message;
System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
}catch(Exception e){
e.printStackTrace();
System.out.println("訪問ActiveMQ服務發生錯誤!!");
}finally{
try?{
// 回收消息消費者資源
if(null?!= consumer)
consumer.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收會話資源
if(null?!= session)
session.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收鏈接資源
if(null?!= connection)
connection.close();
} catch?(JMSException e) {
e.printStackTrace();
}
}
}
}
2.6.2?第二步:修改測試類MessageTest,新增測試方法
@Test
public?void?recieveFromMQ(){
MyConsumer consumer?= new?MyConsumer();
consumer.recieveFromMQ();
}
2.6.3?第三步:測試
--查看Eclipse控制臺
--查看ActiveMQ管理控制界面
--消息被消費了,測試成功!!!
3?ActiveMQ監聽器
問題:在前面的示例中,我們發現消費者每次只能消費一條消息。當隊列中有多條消息的時候,我們需要多次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?我們希望一次將所有的消息全部接收。
答:使用ActiveMQ監聽器來監聽隊列,持續消費消息。
3.1?配置步驟說明
(1)創建一個監聽器對象。
(2)修改消費者代碼,加載監聽器。
3.2?配置步驟
3.2.1?第一步:創建監聽器MyListener類
--說明:自定義監聽器需要實現MessageListener接口
package?cn.gzsxt.mq.listener;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageListener;
import?javax.jms.TextMessage;
public?class?MyListener implements?MessageListener{
@Override
public?void?onMessage(Message message) {
if(null!=message){
TextMessage tMsg?= (TextMessage) message;
try?{
System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
} catch?(JMSException e) {
e.printStackTrace();
}
}
}
}
3.2.2?第二步:修改MyConsumer代碼,加載監聽器
--說明:監聽器需要持續加載,因此消費程序不能結束。
這里我們使用輸入流阻塞消費線程結束。(實際開發中,使用web項目加載)
package?cn.gzsxt.mq.consumer;
import?javax.jms.Connection;
import?javax.jms.ConnectionFactory;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageConsumer;
import?javax.jms.Session;
import?javax.jms.TextMessage;
import?org.apache.activemq.ActiveMQConnectionFactory;
import?cn.gzsxt.mq.listener.MyListener;
/**
* @ClassName:MyConsumer
* @Description: 消息消費者代碼
*/
public?class?MyConsumer {
// 定義鏈接工廠
ConnectionFactory connectionFactory?= null;
// 定義鏈接
Connection connection?= null;
// 定義會話
Session session?= null;
// 定義目的地
Destination destination?= null;
// 定義消息消費者
MessageConsumer consumer?= null;
// 定義消息
Message message?= null;
public?Message recieveFromMQ(){
try{
/*
* 創建鏈接工廠
* ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類.
* 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* ?userName - 訪問ActiveMQ服務的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.
* ?password - 訪問ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置文件配置.
* ?brokerURL - 訪問ActiveMQ服務的路徑地址.路徑結構為-協議名://主機地址:端口號
* ?????此鏈接基于TCP/IP協議.
*/
connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創建鏈接對象
connection?= connectionFactory.createConnection();
// 啟動鏈接
connection.start();
/*
* 創建會話對象
* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);
* ?transacted - 是否使用事務,可選值為true|false
* ?????true - 使用事務,當設置此變量值,則acknowledgeMode參數無效,建議傳遞的acknowledgeMode參數值為
* ?????????Session.SESSION_TRANSACTED
* ?????false - 不使用事務,設置此變量值,則acknowledgeMode參數必須設置.
* ?acknowledgeMode - 消息確認機制,可選值為:
* ?????Session.AUTO_ACKNOWLEDGE - 自動確認消息機制
* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制
* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制
*/
session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應的隊列
destination?= session.createQueue("test-mq");
// 創建消息消費者,創建的消息消費者與某目的地對應,即方法參數目的地.
consumer?= session.createConsumer(destination);
// // 從ActiveMQ服務中獲取消息
// message = consumer.receive();
//
// TextMessage tMsg = (TextMessage) message;
//
// System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
//加載監聽器
consumer.setMessageListener(newMyListener());
//監聽器需要持續加載,這里我們使用輸入流阻塞當前線程結束。
System.in.read();
}catch(Exception e){
e.printStackTrace();
System.out.println("訪問ActiveMQ服務發生錯誤!!");
}finally{
try?{
// 回收消息消費者資源
if(null?!= consumer)
consumer.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收會話資源
if(null?!= session)
session.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收鏈接資源
if(null?!= connection)
connection.close();
} catch?(JMSException e) {
e.printStackTrace();
}
}
return?message;
}
}
3.3?測試
(1)多次運行生產者,發送多條消息到隊列中。
(2)運行消費者。觀察結果
--查看Eclipse控制臺,一次消費了3條消息
--查看ActiveMQ管理控制界面,所有消息都被消費了!
--測試成功!!!
4?ActiveMQ消息服務模式
問題:在入門示例中,只能向一個消費者發送消息。但是有一些場景,需求有多個消費者都能接收到消息,比如:美團APP每天的消息推送。該如何實現呢?
答:ActiveMQ是通過不同的服務模式來解決這個問題的。
所以,要搞清楚這個問題,必須知道ActiveMQ有哪些應用模式。
4.1?PTP模式(point to point)
--消息模型
消息生產者生產消息發送到queue中,然后消息消費者從queue中取出并且消費消息。
消息被消費以后,queue中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。
Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費、其它的則不能消費此消息了。
當消費者不存在時,消息會一直保存,直到有消費消費
我們的入門示例,就是采用的這種PTP服務模式。
4.2?TOPIC(主題訂閱模式)
--消息模型
消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。
和點對點方式不同,發布到topic的消息會被所有訂閱者消費。
當生產者發布消息,不管是否有消費者。都不會保存消息
所以,主題訂閱模式下,一定要先有消息的消費者(訂閱者),后有消息的生產者(發布者)。
我們前面已經實現了PTP模式,下面我們來實現TOPIC模式。
5?Topic模式實現
5.1?配置步驟說明
發表于 2019-08-01 00:00
閱讀 ( 411 )
總結
以上是生活随笔為你收集整理的java 消息队列服务_ActiveMQ 消息队列服务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java int 传引用吗_Java的参
- 下一篇: 备案费是什么(备案工本费)