activitemq与spring的整合
生活随笔
收集整理的這篇文章主要介紹了
activitemq与spring的整合
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
activitemq整合spring
一.activmq的點對點模型
pom.xml:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.demo</groupId><artifactId>aq-test</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>aq-test Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>javax.jms</groupId><artifactId>jms-api</artifactId><version>1.1-rev-1</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.14.5</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework/spring-aop --><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.1.3.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version></dependency></dependencies><build><finalName>aq-test</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build> </project>ActiviteMq.class:(發送端)
package com.demo;import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test;import javax.jms.*;public class ActiviteMq {@Testpublic void testQueueProducer() throws JMSException {//1.創建connectinfactory對象,需要指定服務的IP以及端口號//brokerURL服務器的ip以及端口號ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");//2.使用ConnectionFactory創建Connection connection = connectionFactory.createConnection();//3.開啟鏈接,調用connection對象的start的方法connection.start();//4.使用connection對創建一個session對象//[4.1] 第一參數:是否開啟事務//[4.2] 第二參數:當第一個參數為false的時候 才有意義 消息的應答模式//1.自動應答2.手動應答 一般為自動Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第五步:使用session對象創建一個destination對象(topic,queue) 此處創建一個queue對象//參數:隊列名稱Queue queue = session.createQueue("test-queue2");//第六步 使用session創建一個producer對象MessageProducer producer = session.createProducer(queue);//第七步 創建一個message對象 創建一個textmessage對象TextMessage textMessage = session.createTextMessage("風風光光");//第八步 使用producer對象發送消息producer.send(textMessage);//第九步 關閉資源producer.close();session.close();connection.close();} }ReceiveMsf.class:(接收端)
package com.demo; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test;import javax.jms.*; import java.io.IOException;public class ReceiveMsf {@Testpublic void testQueueConsumer() throws JMSException, IOException {//1.創建connectinfactory對象,需要指定服務的IP以及端口號//brokerURL服務器的ip以及端口號ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");//2.使用ConnectionFactory創建Connection connection = connectionFactory.createConnection();//3.開啟鏈接,調用connection對象的start的方法connection.start();//4.使用connection對創建一個session對象//[4.1] 第一參數:是否開啟事務//[4.2] 第二參數:當第一個參數為false的時候 才有意義 消息的應答模式//1.自動應答2.手動應答 一般為自動Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第五步:使用session對象創建一個destination對象(topic,queue) 此處創建一個queue對象//參數:隊列名稱Queue queue = session.createQueue("test-queue2");// 第六步:使用Session對象創建一個Consumer對象。MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;//取消的內容text = textMessage.getText();//第八步 打印消息System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});//等待鍵盤輸入 阻塞System.in.read();//第九步 關閉資源consumer.close();session.close();connection.close();} }二.activmq的發布訂閱模型
TopicProducer.class
package com.demo.dingyue;import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test;import javax.jms.*;public class TopicProducer {@Testpublic void testTopicProducer() throws JMSException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("huaYuanBaoBao");MessageProducer producer = session.createProducer(topic);TextMessage textMessage = session.createTextMessage("這個是發布訂閱的");producer.send(textMessage);producer.close();session.close();connection.close();}}TopicCustomer.class:
package com.demo.dingyue;import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test;import javax.jms.*; import java.io.IOException;public class TopicCustomer {@Testpublic void testTopicCustomer() throws JMSException, IOException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("huaYuanBaoBao");MessageConsumer consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try{TextMessage textMessage = (TextMessage) message;String text = null;//取出消息的內容text= textMessage.getText();System.out.println(text);}catch (Exception e){e.printStackTrace();}}});System.out.println("消費端03");System.in.read();//關閉資源connection.close();consumer.close();session.close();} }和Spring整合:
spring-amq.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-3.0.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-3.0.xsd"><context:component-scan base-package="com.demo.spring"/><bean id="amqSenderService" class="com.demo.spring.AMQSenderServiceImpl"><!--<bean id="user" class="com.demo.spring.User">--></bean><bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="trustAllPackages" value="true"/><property name="brokerURL"><value>tcp://192.168.1.20:61616</value></property></bean></property><property name="maxConnections" value="100"></property></bean><!--使用緩存可以提升效率--><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="jmsFactory"/><property name="sessionCacheSize" value="1"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean><!--測試Queue,隊列的名字是spring-queue--><bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"><!--<constructor-arg index="0" value="spring-queue"/>--><constructor-arg name="name" value="spring-queue"/></bean><!--測試Topic--><bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="spring-topic"/></bean></beans>AMQSenderServiceImpl:
package com.demo.spring;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service;import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session;@Service public class AMQSenderServiceImpl {private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);@Resource(name = "jmsTemplate")private JmsTemplate jmsTemplate;//目的地隊列的明證,我們要向這個隊列發送消息@Resource(name = "destinationQueue")private Destination destination;//向特定的隊列發送消息public void sendMsg(final User user) { // final String msg = JSON.toJSONString(mqParamDto);user.setEmail("javaceshi@aa.com");user.setPassword("123456");user.setPhone("123456");user.setSex("M");user.setUsername("javaceshi");try {logger.info("將要向隊列{}發送的消息msg:{}", destination, user);jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException { // return session.createObjectMessage(user);return session.createTextMessage("2019/1/18message");}});} catch (Exception ex) {logger.error("向隊列{}發送消息失敗,消息為:{}", destination, user);}} }AMQReceiverServiceImpl:
package com.demo.spring; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service;import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session;@Service public class AMQReceiverServiceImpl {private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);@Resource(name = "jmsTemplate")private JmsTemplate jmsTemplate;//目的地隊列的明證,我們要向這個隊列接收消息@Resource(name = "destinationQueue")private Destination destination;//向特定的隊列接收消息public void receiverMsg(final User user) { //try {Object object = jmsTemplate.receive(destination);User msg = (User) object;System.out.println(msg);} catch (Exception ex) {ex.printStackTrace();}} }測試類:App
package com.demo.spring;import com.demo.spring.User; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;/*** 主發送類**/ public class App {public static void main( String[] args ){final User user = new User();user.setEmail("javaceshi@aa.com");user.setPassword("123456");user.setPhone("123456");user.setSex("M");user.setUsername("javaceshi");ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-amq.xml");AMQSenderServiceImpl sendService = (AMQSenderServiceImpl)context.getBean("amqSenderService");sendService.sendMsg(user); // sendService.send(user);System.out.println("send successfully, please visit http://192.168.1.20:8161/admin to see it");} }轉載于:https://www.cnblogs.com/charlypage/p/10493286.html
總結
以上是生活随笔為你收集整理的activitemq与spring的整合的全部內容,希望文章能夠幫你解決所遇到的問題。