一、ActiveMQ安全機制
ActiveMQ是使用jetty部署的,修改密碼需要到相應的配置文件?
配置文件是這個:
在其第123行添加用戶名和密碼,添加配置如下:
<plugins><simpleAuthenticationPlugin><users><authenticationUser username="bhz" password="bhz" groups="users,admins"/></users></simpleAuthenticationPlugin></plugins>
這時候這個需要改為這樣才能進行連接:
//第一步:建立ConnectionFactory工廠對象ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");
二、ActiveMQ的api的使用
1. Connection的使用
省略
2. Session方法的使用
一旦從ConnectionFactory中獲得一個Connection,必須從Connection中創建一個或多個Session.Session是一個發送或接收消息的線程?
session可以被事務化,也可以不被事務化?
如果使用事務的話,那么需要commit,如下
package test.mq.helloworld;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] agrs) throws Exception{//第一步:建立ConnectionFactory工廠對象ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");//第二步:通過ConnectionFactory工廠對象我們創建一個Connection對象Connection connection = connectionFactory.createConnection();connection.start();//第三步:通過connection對象創建Session會話,第一個參數為是否開啟事務,第二個參數為簽收模式,一般設置為自動簽收Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);//第四步:通過Session創建Destination對象,queue1可看作是放入隊列的消息名稱,可以自定義Destination desiDestination = session.createQueue("queue1");//第五步:通過session創建消息的生產者或消費者,下面是創建生產者MessageProducer messageProducer = session.createProducer(desiDestination);//第六步:使用MessageProducer的set方法為其設置持久化特性和非持久化特性,后面再詳細介紹messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//這里先設置為非持久化//第七步:通過JMS規范的TextMessage形式創建數據(通過session對象),并用MessageProducer的send方法發送數據for(int i =0; i<5;i++){TextMessage textMessage = session.createTextMessage("這是消息內容,id為"+i);messageProducer.send(textMessage);}session.commit();if(connection!=null){connection.close();}}}
注意session.commit();?
session的簽收模式有三種情況:?
1.?Session.AUTO_ACKNOWLEDGE?:自動簽收?
2.?Session.CLIENT_ACKNOWLEDGE:客戶端通過調用消息(Message)的acknowledge方法簽收消息,在這種情況下,簽收發生在Session層面:簽收一個已消費的消息會自動簽收這個Session所有已消費消息的收條?
3.?Session.DUPS_OK_ACKNOWLEDGE:此選項指示Session不必確保對傳送消息的簽收。它可能引起消息的重復,但是降低了Session的開銷,所以只有客戶端能容忍重復的消息,才可使用?
上面三種模式,推薦使用的是Session.CLIENT_ACKNOWLEDGE?
它的操作如下:
package test.mq.helloworld;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver {public static void main(String[] agrs) throws Exception{//第一步:建立ConnectionFactory工廠對象ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");//第二步:通過ConnectionFactory工廠對象我們創建一個Connection對象Connection connection = connectionFactory.createConnection();connection.start();//第三步:通過connection對象創建Session會話,第一個參數為是否開啟事務,第二個參數為簽收模式,一般設置為自動簽收Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);//第四步:通過Session創建Destination對象,queue1可看作是放入隊列的消息名稱,可以自定義Destination desiDestination = session.createQueue("queue1");//第五步:通過session創建消息的生產者或消費者,下面是創建消費者MessageConsumer messageConsumer = session.createConsumer(desiDestination);while(true){TextMessage msg = (TextMessage) messageConsumer.receive();msg.acknowledge();if(msg==null) {break;}System.out.println("收到的內容為"+msg.getText());}if(connection!=null){connection.close();}}}
使用該模式,需要手動的設置接收完畢的信號msg.acknowledge();,這可以當做告訴隊列,這個消息已經接收完畢,可以銷毀了。
3.MessageProducer
?
下面是改造后的代碼:
package test.mq.helloworld;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] agrs) throws Exception{//第一步:建立ConnectionFactory工廠對象ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");//第二步:通過ConnectionFactory工廠對象我們創建一個Connection對象Connection connection = connectionFactory.createConnection();connection.start();//第三步:通過connection對象創建Session會話,第一個參數為是否開啟事務,第二個參數為簽收模式,一般設置為自動簽收Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);//第四步:通過Session創建Destination對象,queue1可看作是放入隊列的消息名稱,可以自定義Destination desiDestination = session.createQueue("queue1");//第五步:通過session創建消息的生產者或消費者,下面是創建生產者MessageProducer messageProducer = session.createProducer(null);//第六步:使用MessageProducer的set方法為其設置持久化特性和非持久化特性,后面再詳細介紹// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//這里先設置為非持久化//第七步:通過JMS規范的TextMessage形式創建數據(通過session對象),并用MessageProducer的send方法發送數據for(int i =0; i<5;i++){//第一個參數:目的地//第二個參數:消息文本//第三個參數:接收模式//第四個參數:優先級//第五個參數:消息在消息隊列保存的時間,這里指保存2分鐘TextMessage textMessage = session.createTextMessage("這是消息內容,id為"+i);messageProducer.send(desiDestination,textMessage,DeliveryMode.NON_PERSISTENT, i,1000*60*2);}session.commit();if(connection!=null){connection.close();}}}
需要注意以下內容:?
1. 優先級并不是嚴格遵循的?
2. 消息在隊列中超過保存事件后,還沒有人取,那么就會自動消失
4.MessageConsumer
三、訂閱模式的使用
上面及之前的文章的是p2p模式(點對點模式)?
那么發布訂閱模式又是如何處理的?下面先看發布/訂閱模式相關的含義:?
?
下面是demo案例?
消息生產者
package bhz.mq.pd;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Publish {private ConnectionFactory factory;private Connection connection;private Session session;private MessageProducer producer;public Publish(){try {factory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");connection = factory.createConnection();connection.start();session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);producer = session.createProducer(null);} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public void sendMessage() throws Exception{Destination destination = session.createTopic("topic");TextMessage textMessage = session.createTextMessage("我是內容");producer.send(destination, textMessage);}public static void main(String[] agrs) throws Exception{Publish p = new Publish();p.sendMessage();}
}
上面的主題是:topic?
當發布一條消息的時候,便會在后臺topics上顯示?
下面是消費者的代碼,消費者訂閱了topic主題
package bhz.mq.pd;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;public class Consumer1 {private ConnectionFactory factory;private Connection connection;private Session session;private MessageConsumer consumer;public Consumer1(){try {factory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");connection = factory.createConnection();connection.start();session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}public void receive() throws Exception{Destination destionation = session.createTopic("topic");consumer = session.createConsumer(destionation);consumer.setMessageListener(new Listener());}public static void main(String[] agrs) throws Exception{Consumer1 consumer1 = new Consumer1();consumer1.receive();}class Listener implements MessageListener{@Overridepublic void onMessage(Message msg) {// TODO Auto-generated method stubSystem.out.println("這是接收的內容:"+msg);}}
}
這時候只要生產者發布消息,那么消費者就會接收到消息
總結
以上是生活随笔為你收集整理的ActiveMQ(三):ActiveMQ的安全机制、api及订阅模式demo的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。