java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费
摘選:https://my.oschina.net/u/3613230/blog/1457227
摘要: 最近在項目開發中,需要用到activemq,用的時候,發現在同一個項目中point-to-point模式中,配置多個隊列,消息生成者只能往一個隊列中發消息或者往多個隊列發送相同消息,并且監聽器只能監聽一個隊列,這樣配置多個隊列也沒有意義,作者想要實現的是:配置多個隊列,并且生產者可以往多個隊列中發送不同的消息,監聽器消費時,可以判斷根據不同的隊列進行相應的業務處理,網上搜了一個,發現都是單個隊列和監聽,研究了一下,發現是可以實現的,廢話不多說,直接上代碼:
項目結構截圖
maven所需依賴:
1
2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3 4.0.0
4 com.gxf
5 springmq
6 war
7 0.0.1-SNAPSHOT
8 springmq Maven Webapp
9 http://maven.apache.org
10
11
12 4.1.8.RELEASE
13 3.1.0
14
15
16
17
18
19 junit
20 junit
21 4.10
22 test
23
24
25
26 jstl
27 jstl
28 1.2
29
30
31
32 javax.servlet
33 javax.servlet-api
34 ${javax.servlet}
35
36
37
38
39 org.springframework
40 spring-core
41 ${springframework}
42
43
44 org.springframework
45 spring-context
46 ${springframework}
47
48
49 org.springframework
50 spring-tx
51 ${springframework}
52
53
54 org.springframework
55 spring-webmvc
56 ${springframework}
57
58
59 org.springframework
60 spring-jms
61 ${springframework}
62
63
64
65 org.apache.xbean
66 xbean-spring
67 3.16
68
69
70
71
72 org.apache.activemq
73 activemq-core
74 5.7.0
75
76
77 org.apache.activemq
78 activemq-pool
79 5.14.3
80
81
82
83
84
85 springmq
86
87
-activemq配置文件:activemq.xml
1 <?xml version="1.0" encoding="UTF-8"?>
2
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:amq="http://activemq.apache.org/schema/core"
5 xmlns:jms="http://www.springframework.org/schema/jms"
6 xmlns:context="http://www.springframework.org/schema/context"
7 xmlns:mvc="http://www.springframework.org/schema/mvc"
8 xsi:schemaLocation="
9 http://www.springframework.org/schema/beans
10 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
11 http://www.springframework.org/schema/context
12 http://www.springframework.org/schema/context/spring-context-4.1.xsd
13 http://www.springframework.org/schema/mvc
14 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
15 http://www.springframework.org/schema/jms
16 http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
17 http://activemq.apache.org/schema/core
18 http://activemq.apache.org/schema/core/activemq-core-5.14.3.xsd"
19 >
20
21
22
23
24
25
26
27
28 class="org.springframework.jms.connection.CachingConnectionFactory">
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
-springmvc配置文件:springmvc.xml
1 <?xml version="1.0" encoding="UTF-8"?>
2
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:context="http://www.springframework.org/schema/context"
5 xmlns:mvc="http://www.springframework.org/schema/mvc"
6 xsi:schemaLocation="http://www.springframework.org/schema/beans
7 http://www.springframework.org/schema/beans/spring-beans.xsd
8 http://www.springframework.org/schema/context
9 http://www.springframework.org/schema/context/spring-context-4.1.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">
12
13
14
15
16
17
18 value="org.springframework.web.servlet.view.JstlView" />
19
20
21
22
-Controll層 MainHandler.java代碼:
1 packagecom.gxf.handler;2
3 importjava.text.SimpleDateFormat;4 import java.util.*;5
6 importjavax.annotation.Resource;7 importjavax.jms.Destination;8
9
10 importorg.apache.activemq.command.ActiveMQDestination;11 importorg.springframework.stereotype.Controller;12 importorg.springframework.web.bind.annotation.RequestMapping;13 importorg.springframework.web.bind.annotation.RequestMethod;14 importorg.springframework.web.bind.annotation.RequestParam;15 importorg.springframework.web.servlet.ModelAndView;16
17 importcom.gxf.service.ProducerService;18
19
20 /**
21 *22 *@authorstark201723 *24 */
25 @Controller26 public classMainHandler {27
28
29 //隊列名
30 @Resource(name="queueDestination")31 privateDestination queueDestination;32
33
34 //隊列消息生產者
35 @Resource(name="producerService")36 privateProducerService producerService;37
38
39
40 @RequestMapping(value="/main",method=RequestMethod.GET)41 publicString producer(){42
43 return "main";44 }45 /**
46 * 往隊列queue1中發送消息47 *@parammessage48 *@return
49 */
50 @RequestMapping(value="/sendone",method=RequestMethod.POST)51 public String producer(@RequestParam("message") String message) {52
53 /**
54 * 將destination強制轉換為ActiveMQDestination,在ActiveMQDestination對象中,55 * 通過getCompositeDestinations()方法獲取destination隊列數組:queue://queue1 queue://queue256 *57 */
58 ActiveMQDestination activeMQDestination=(ActiveMQDestination) queueDestination;59 /**
60 * 往隊列queue1中發送文本消息61 */
62 System.out.println("往隊列"+activeMQDestination.getCompositeDestinations()[0].getPhysicalName()+"中發送文本消息");63 producerService.sendTxtMessage(activeMQDestination.getCompositeDestinations()[0], message);64 /**
65 * 往隊列queue1中發送MapMessage消息66 */
67 System.out.println("往隊列"+activeMQDestination.getCompositeDestinations()[0].getPhysicalName()+"中發送MapMessage消息");68 producerService.sendMapMessage(activeMQDestination.getCompositeDestinations()[0], message);69
70 //String bb="fdsalfkasdfkljasd;flkajsfd";71 //byte[] b = bb.getBytes();72
73 //producer.sendBytesMessage(demoQueueDestination, b);74
75 //producer.sendMapMessage(mqQueueDestination, message);
76
77 return "main";78 }79 /**
80 * 往消息隊列queue2中發送消息81 *@parammessage82 *@return
83 */
84 @RequestMapping(value="/sendtwo",method=RequestMethod.POST)85 public String producertwo(@RequestParam("message") String message) {86
87
88 /**
89 * 將destination強制轉換為ActiveMQDestination,在ActiveMQDestination對象中,90 * 通過getCompositeDestinations()方法獲取destination隊列數組:queue://queue1 queue://queue291 *92 */
93 ActiveMQDestination activeMQDestination=(ActiveMQDestination) queueDestination;94 /**
95 * 隊列queue2中發送文本消息96 */
97 System.out.println("往隊列"+activeMQDestination.getCompositeDestinations()[1].getPhysicalName()+"中發送文本消息");98 producerService.sendTxtMessage(activeMQDestination.getCompositeDestinations()[1], message);99 /**
100 * 隊列queue2中發送mapMessage消息101 */
102 System.out.println("往隊列"+activeMQDestination.getCompositeDestinations()[1].getPhysicalName()+"中發送文本消息");103 producerService.sendMapMessage(activeMQDestination.getCompositeDestinations()[1], message);104
105 String bb="fdsalfkasdfkljasd;flkajsfd";106 byte[] b =bb.getBytes();107
108 //producer.sendBytesMessage(demoQueueDestination, b);109
110 //producer.sendMapMessage(mqQueueDestination, message);
111
112 return "main";113 }114
115
116
117
118 }
-生產者ProducerService.java代碼:
1 packagecom.gxf.service;2
3 importjava.io.Serializable;4 importjava.util.List;5 importjava.util.Map;6
7 importjavax.annotation.Resource;8 importjavax.jms.BytesMessage;9 importjavax.jms.Destination;10 importjavax.jms.JMSException;11 importjavax.jms.MapMessage;12 importjavax.jms.Message;13 importjavax.jms.Session;14 importjavax.jms.StreamMessage;15
16 importorg.springframework.jms.core.JmsTemplate;17 importorg.springframework.jms.core.MessageCreator;18 importorg.springframework.stereotype.Service;19
20 @Service21 public classProducerService {22
23 @Resource(name = "jmsTemplate")24 privateJmsTemplate jmsTemplate;25
26 /**
27 * 向指定Destination發送text消息28 *29 *@paramdestination30 *@parammessage31 */
32 public void sendTxtMessage(Destination destination, finalString message) {33 if (null ==destination) {34 destination =jmsTemplate.getDefaultDestination();35 }36 jmsTemplate.send(destination, newMessageCreator() {37 public Message createMessage(Session session) throwsJMSException {38 returnsession.createTextMessage(message);39 }40 });41 System.out.println("springJMS send text message...");42 }43
44 /**
45 * 向指定Destination發送map消息46 *47 *@paramdestination48 *@parammessage49 */
50 public void sendMapMessage(Destination destination, finalString message) {51 if (null ==destination) {52 destination =jmsTemplate.getDefaultDestination();53 }54 jmsTemplate.send(destination, newMessageCreator() {55 public Message createMessage(Session session) throwsJMSException {56 MapMessage mapMessage =session.createMapMessage();57 mapMessage.setString("msgId", message);58 returnmapMessage;59 }60 });61 System.out.println("springJMS send map message...");62 }63
64 /**
65 * 向指定Destination發送序列化的對象66 *67 *@paramdestination68 *@paramobject69 * object 必須序列化70 */
71 public void sendObjectMessage(Destination destination, finalSerializable object) {72 if (null ==destination) {73 destination =jmsTemplate.getDefaultDestination();74 }75 jmsTemplate.send(destination, newMessageCreator() {76 public Message createMessage(Session session) throwsJMSException {77 returnsession.createObjectMessage(object);78 }79 });80 System.out.println("springJMS send object message...");81 }82
83 /**
84 * 向指定Destination發送字節消息85 *86 *@paramdestination87 *@parambytes88 */
89 public void sendBytesMessage(Destination destination, final byte[] bytes) {90 if (null ==destination) {91 destination =jmsTemplate.getDefaultDestination();92 }93 jmsTemplate.send(destination, newMessageCreator() {94 public Message createMessage(Session session) throwsJMSException {95 BytesMessage bytesMessage =session.createBytesMessage();96 bytesMessage.writeBytes(bytes);97 returnbytesMessage;98
99 }100 });101 System.out.println("springJMS send bytes message...");102 }103
104 /**
105 * 向默認隊列發送Stream消息106 */
107 public voidsendStreamMessage(Destination destination) {108 jmsTemplate.send(newMessageCreator() {109 public Message createMessage(Session session) throwsJMSException {110 StreamMessage message =session.createStreamMessage();111 message.writeString("stream string");112 message.writeInt(11111);113 returnmessage;114 }115 });116 System.out.println("springJMS send Strem message...");117 }118
119 }
-隊列消息監聽器QueueMessageListener.java代碼:
1 packagecom.gxf.listener;2 importjavax.jms.BytesMessage;3 importjavax.jms.JMSException;4 importjavax.jms.MapMessage;5 importjavax.jms.Message;6 importjavax.jms.MessageListener;7 importjavax.jms.ObjectMessage;8 importjavax.jms.StreamMessage;9 importjavax.jms.TextMessage;10
11 importorg.apache.activemq.advisory.DestinationEvent;12 importorg.apache.activemq.command.ActiveMQDestination;13 importorg.apache.activemq.command.ActiveMQMessage;14 importorg.apache.activemq.command.DestinationInfo;15
16
17 public class QueueMessageListener implementsMessageListener {18
19
20 //當收到消息后,自動調用該方法
21 @Override22 public voidonMessage(Message message) {23 try{24 ActiveMQDestination queues=(ActiveMQDestination)message.getJMSDestination();25
26 /**
27 * 監聽消息隊列queue1中的消息28 */
29 if(queues.getPhysicalName().equalsIgnoreCase("queue1"))30 {31 System.out.println("監聽隊列:"+queues.getPhysicalName()+"消費了消息:");32 //如果是文本消息
33 if (message instanceofTextMessage) {34 TextMessage tm =(TextMessage) message;35 try{36 System.out.println("from get textMessage:\t" +tm.getText());37 } catch(JMSException e) {38 //TODO Auto-generated catch block
39 e.printStackTrace();40 }41 }42
43 //如果是Map消息
44 if (message instanceofMapMessage) {45 MapMessage mm =(MapMessage) message;46 try{47 System.out.println("from get MapMessage:\t" + mm.getString("msgId"));48 } catch(JMSException e) {49 //TODO Auto-generated catch block
50 e.printStackTrace();51 }52 }53 }54 /**
55 * 監聽消息隊列queue2中的消息56 */
57 if(queues.getPhysicalName().equalsIgnoreCase("queue2"))58 {59 System.out.println("監聽隊列:"+queues.getPhysicalName()+"消費了消息:");60 //如果是文本消息
61 if (message instanceofTextMessage) {62 TextMessage tm =(TextMessage) message;63 try{64 System.out.println("from get textMessage:\t" +tm.getText());65 } catch(JMSException e) {66 //TODO Auto-generated catch block
67 e.printStackTrace();68 }69 }70
71 //如果是Map消息
72 if (message instanceofMapMessage) {73 MapMessage mm =(MapMessage) message;74 try{75 System.out.println("from get MapMessage:\t" + mm.getString("msgId"));76 } catch(JMSException e) {77 //TODO Auto-generated catch block
78 e.printStackTrace();79 }80 }81 }82
83 } catch(JMSException e1) {84 //TODO Auto-generated catch block
85 e1.printStackTrace();86 }87
88
89 //如果是Object消息
90 if (message instanceofObjectMessage) {91 ObjectMessage om =(ObjectMessage) message;92 System.out.println("from get ObjectMessage:\t");93 }94
95 //如果是bytes消息
96 if (message instanceofBytesMessage) {97 System.out.println("from get BytesMessage:\t");98 byte[] b = new byte[1024];99 int len = -1;100 BytesMessage bm =(BytesMessage) message;101 try{102 while ((len = bm.readBytes(b)) != -1) {103 System.out.println(new String(b, 0, len));104 }105 } catch(JMSException e) {106 //TODO Auto-generated catch block
107 e.printStackTrace();108 }109 }110
111 //如果是Stream消息
112 if (message instanceofStreamMessage) {113 System.out.println("from get BytesMessage:\t");114 StreamMessage sm =(StreamMessage) message;115 try{116 System.out.println(sm.readString());117 System.out.println(sm.readInt());118 } catch(JMSException e) {119 //TODO Auto-generated catch block
120 e.printStackTrace();121 }122
123 }}124
125 }
-啟動項目訪問main,進行消息發送:
后臺打印往不同隊列發送的消息和監聽到不同隊列中的消息:
隊列queue1發送消費了14條消息,queue2發送消費了10條消息:
到此想要的功能需求已實現
總結
以上是生活随笔為你收集整理的java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql数据库程序设_MySQL数据库
- 下一篇: java中为什么要用json_Java中