ActiveMQ的使用
生活随笔
收集整理的這篇文章主要介紹了
ActiveMQ的使用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
ActiveMQ使用分為兩大塊:生產者和消費者 一、準備 項目導入jar包:activemq-all-5.15.3.jar 并buildpath? 二、生產者 創建連接工廠 ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL); 注: userName是ActiveMQ的用戶名,默認可以通過:ActiveMQConnection.DEFAULT_USER password是ActiveMQ的密碼,默認可以通過: ActiveMQConnection.DEFAULT_PASSWORD brokerURL是ActiveMQ的連接,指定格式為:tcp://主機名:61616 獲取連接 connection = mqf.createConnection(); 生成會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 生成對應的topic Destination destination = session.createTopic("mytopic"); 創建生產者 MessageProducer producer = session.createProducer(destination); 設置發送消息使用的模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 默認是:DeliveryMode.PERSISTENT 生成消息 TextMessage msg = session.createTextMessage(“message"); 啟動連接 connection.start(); 發送消息 producer.send(msg); 關閉生產者 producer.close(); 關閉會話 session.close(); 關閉連接 connection.close(); 三、消費者 繼承接口 MessageListener ExceptionListener 并實現onException(JMSException exception)和onMessage(Message message)方法 創建連接工廠 ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL); 具體參數同上 獲取連接 Connection connection = mqf.createConnection(); 生成會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 生成對應的topic Destination destination = session.createTopic("mytopic”); 創建消費者 MessageConsumer consumer = session.createConsumer(destination); 啟動連接 connection.start(); 設置消息監聽 consumer.setMessageListener(this); 設置異常監聽 connection.setExceptionListener(this); 實現onMessage方法 改方法有一個參數Message message,這個參數是從ActiveMQ上拿到的消息,可以通過如下方法解析出來: TextMessage tm = (TextMessage)message;
String result = tm.getText(); 關閉消費者 consumer.close(); 關閉會話 session.close(); 關閉連接 connection.close(); 四、例程 生產者實現程序 1 package activemq_test;
2
3 import javax.jms.Connection;
4 import javax.jms.DeliveryMode;
5 import javax.jms.Destination;
6 import javax.jms.JMSException;
7 import javax.jms.MessageProducer;
8 import javax.jms.Session;
9 import javax.jms.TextMessage;
10
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13
14 public class Producer_tool {
15
16 private final static String userName = ActiveMQConnection.DEFAULT_USER;
17 private final static String password = ActiveMQConnection.DEFAULT_PASSWORD;
18 private final static String brokerURL = "tcp://192.168.0.5:61616";
19 private MessageProducer producer = null;
20 private Connection connection = null;
21 private Session session = null;
22
23 public void initialize() throws JMSException {
24 ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
25 connection = mqf.createConnection();
26 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
27 Destination destination = session.createTopic("mytopic");
28 producer = session.createProducer(destination);
29 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
30 }
31
32 public void send(String message) throws JMSException {
33 initialize();
34 TextMessage msg = session.createTextMessage(message);
35 System.out.println("sending message: " + message);
36 connection.start();
37 producer.send(msg);
38 }
39
40 public void close() throws JMSException {
41 if(producer != null) {
42 producer.close();
43 }
44 if(session != null) {
45 session.close();
46 }
47 if(connection != null) {
48 connection.close();
49 }
50 System.out.println("closed");
51 }
52
53 } 生產者主程序 1 package activemq_test;
2 import javax.jms.JMSException;
3 public class Producer_test {
4 public static void main(String[] args) throws JMSException {
5 Producer_tool producer = null;
6 for(int i = 0; i < 10; i++) {
7 producer = new Producer_tool();
8 producer.send("message" + i);
9 producer.close();
10 }
11 }
12 } 消費者實現程序 1 package activemq_test;
2
3 import javax.jms.Connection;
4 import javax.jms.Destination;
5 import javax.jms.ExceptionListener;
6 import javax.jms.JMSException;
7 import javax.jms.Message;
8 import javax.jms.MessageConsumer;
9 import javax.jms.MessageListener;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12
13 import org.apache.activemq.ActiveMQConnection;
14 import org.apache.activemq.ActiveMQConnectionFactory;
15
16 public class Consumer_tool implements MessageListener,ExceptionListener{
17
18 private final static String userName = ActiveMQConnection.DEFAULT_USER;
19 private final static String password = ActiveMQConnection.DEFAULT_PASSWORD;
20 private final static String brokerURL = "tcp://192.168.0.5:61616";
21 private Connection connection = null;
22 private Session session = null;
23 private MessageConsumer consumer = null;
24 static boolean isConnection = false;
25
26 public void initialize() throws JMSException {
27 ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
28 connection = mqf.createConnection();
29 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
30 Destination destination = session.createTopic("mytopic");
31 consumer = session.createConsumer(destination);
32 }
33
34 public void consumeMessage() throws JMSException {
35 initialize();
36 connection.start();
37 consumer.setMessageListener(this);
38 connection.setExceptionListener(this);
39 isConnection = true;
40 System.out.println("consumer is listening");
41
42 }
43
44 @Override
45 public void onException(JMSException exception) {
46 isConnection = false;
47 }
48
49 @Override
50 public void onMessage(Message message) {
51 if(message instanceof TextMessage) {
52 TextMessage tm = (TextMessage)message;
53 try {
54 System.out.println("consumer received " + tm.getText());
55 } catch (JMSException e) {
56 e.printStackTrace();
57 }
58 }
59 else {
60 System.out.println(message);
61 }
62 }
63
64 public void close() throws JMSException {
65 if(consumer != null) {
66 consumer.close();
67 }
68 if(session != null) {
69 session.close();
70 }
71 if(connection != null) {
72 connection.close();
73 }
74 System.out.println("consumer has closed");
75 }
76 } 消費者主程序 1 package activemq_test;
2 import javax.jms.JMSException;
3 public class Consumer_test {
4 public static void main(String[] args) throws JMSException {
5 Consumer_tool consumer = new Consumer_tool();
6 consumer.consumeMessage();
7 while(Consumer_tool.isConnection) {
8
9 }
10 consumer.close();
11 }
12 }
?
轉載于:https://www.cnblogs.com/xiatianyu/p/9055647.html
總結
以上是生活随笔為你收集整理的ActiveMQ的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HTML/CSS[收藏]
- 下一篇: 文本属性和属性连写