JMS和AWS SQS的更多高级内容
  如您所知, AWS中的SQS SQS代表“簡(jiǎn)單隊(duì)列服務(wù)”。 最近,在使用它的同時(shí),我發(fā)現(xiàn)了將其稱為“簡(jiǎn)單”的原因之一。 在之前的兩篇文章( 此處和此處 )中,我展示了結(jié)合Spring Framework將SQS用作JMS隊(duì)列提供程序 。 通過(guò)這個(gè)基本設(shè)置,我決定更進(jìn)一步,并開(kāi)始結(jié)合JMS(利用JMS屬性'JMSReplyTo'和臨時(shí)隊(duì)列)一起嘗試請(qǐng)求-響應(yīng)模式 。 在這篇相當(dāng)經(jīng)典的文章中 ,很好地解釋了它是如何工作的以及為什么這樣工作。 
 為了顯示它應(yīng)該如何工作,我首先顯示與Apache ActiveMQ一起使用的設(shè)置。 讓我展示一下從隊(duì)列中挑選消息,對(duì)內(nèi)容執(zhí)行操作并將答復(fù)發(fā)送回JMS標(biāo)頭中的JMSReplyTo的bean。 自從我使用Spring以來(lái),這聽(tīng)起來(lái)比實(shí)際要難。 首先是Java代碼: 
我要說(shuō)的是一門很簡(jiǎn)單的課。 它實(shí)現(xiàn)了ResponsiveTextMessageDelegate(此接口的詳細(xì)信息在此處描述),并僅返回傳入消息內(nèi)容的長(zhǎng)度。 Spring框架會(huì)處理所有其他需要完成的事情。 該服務(wù)的Spring配置如下所示:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"><context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan><context:annotation-config/><!-- ActiveMQ config --><bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="jmsFactory"/></bean><bean id="requestQueueName" class="java.lang.String"><constructor-arg value="DefaultDemoQueue"/></bean><bean id="myMessageService" class="net.pascalalma.aws.sqs.requestresponse.MyMessageService" /><bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"><property name="delegate" ref="myMessageService"/><property name="defaultListenerMethod" value="onMessage"/><property name="messageConverter" ref="messageConverter" /></bean><bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" /><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="jmsFactory"/><property name="destinationName" ref="requestQueueName"/><property name="messageListener" ref="messageListener"/></bean> </beans>這與我上一篇文章中描述的配置基本相同。 唯一的區(qū)別是,我現(xiàn)在使用一個(gè)轉(zhuǎn)換器,即SimpleMessageConverter,該轉(zhuǎn)換器負(fù)責(zé)將返回的String轉(zhuǎn)換為TextMessage。 如果我們不定義此轉(zhuǎn)換器,則會(huì)收到以下錯(cuò)誤:
java.lang.NoSuchMethodException: net.pascalalma.aws.sqs.requestresponse.MyMessageService.onMessage(org.apache.activemq.command.ActiveMQTextMessage接下來(lái),我們需要一個(gè)可以與我們的服務(wù)“對(duì)話”的Service客戶端bean。 在Java中可能看起來(lái)像這樣:
package net.pascalalma.aws.sqs.requestresponse;import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.SessionCallback; import org.springframework.jms.support.JmsUtils; import org.springframework.stereotype.Component; import org.springframework.util.Assert;import javax.annotation.Resource; import javax.jms.*; import java.util.Random;@Component public class MyMessageServiceClient {final static Logger logger = Logger.getLogger(MyMessageServiceClient.class);@Resourceprivate JmsTemplate jmsTemplate;@Autowiredprivate String requestQueueName;public String process(final String txt) {//Setup a message producer to send message to the queue the server is consuming fromMessage response = jmsTemplate.sendAndReceive(requestQueueName,new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText(txt);return message;}});String result = null;try {result = ((TextMessage) response).getText();} catch (JMSException e) {logger.error(e);}return result;} }我們看到的是,我們利用jmsTemplate的sendAndReceive來(lái)發(fā)送在MessageCreator回調(diào)中創(chuàng)建的消息,并等待響應(yīng)消息。 此類的相應(yīng)Spring配置為:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"><context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan><context:annotation-config/><!-- ActiveMQ config --><bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean><!-- End ActiveMQ specific --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="jmsFactory"/></bean><bean id="requestQueueName" class="java.lang.String"><constructor-arg value="DefaultDemoQueue"/></bean><bean id="myMessageServiceClient" class="net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient"/> </beans>現(xiàn)在剩下的是一些“容器”,可以在我為“服務(wù)器”部分創(chuàng)建主類的操作中查看這些bean:
package net.pascalalma.aws.sqs.requestresponse;import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;public class MessageServiceMain {public static void main(String[] args) {//Build application context by reading spring-config.xmlApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context.xml"});} }在您的IDE或終端中運(yùn)行此類僅讀取SPring配置并實(shí)例化服務(wù)bean。 客戶的Main類還有更多代碼:
package net.pascalalma.aws.sqs.requestresponse;import org.apache.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;import java.util.HashMap; import java.util.Map; import java.util.Random;public class MessageServiceClientMain {final static Logger logger = Logger.getLogger(MessageServiceClientMain.class);public static void main(String[] args) {//Build application context by reading spring-config.xmlApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context-client.xml"});//Get an instance of ProductService class;MyMessageServiceClient messageServiceClient = (MyMessageServiceClient) ctx.getBean("myMessageServiceClient");//Call getProduct method of ProductServiceString random = createRandomString();for (int i=0; i<16; i++) {String key = random.substring(i);logger.info("Sending to service: " + key);logger.info("Sending to service with length: " + key.length());String result = messageServiceClient.process(key);logger.info("Received from service: " + result);logger.info("======================================================");}}private static String createRandomString() {Random random = new Random(System.currentTimeMillis());long randomLong = random.nextLong();return Long.toHexString(randomLong);} }運(yùn)行此類將生成消息,并將其發(fā)送到服務(wù),并打印從服務(wù)接收的結(jié)果,如下所示:
2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 42fdcd4355cc5314 2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 2fdcd4355cc5314 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15 2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================到目前為止,一切都很好。 現(xiàn)在,讓我們使用AWS SQS代替本地Active MQ實(shí)例。 只需在兩個(gè)Spring配置中簡(jiǎn)單地修改所用JmsFactory的配置即可輕松實(shí)現(xiàn):
...<bean id="credentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/><bean id="connectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder"><property name="regionName" value="eu-west-1"/><property name="numberOfMessagesToPrefetch" value="5"/><property name="awsCredentialsProvider" ref="credentialsProviderBean"/></bean><bean id="jmsFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"factory-bean="connectionFactoryBuilder"factory-method="build"/> ...現(xiàn)在,如果我們啟動(dòng)“服務(wù)器”應(yīng)用程序和“客戶端”應(yīng)用程序,我們將獲得以下輸出:
2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: f1db848691a26c85 2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Unsupported Methodat org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:986)at org.springframework.jms.core.JmsTemplate.sendAndReceive(JmsTemplate.java:922)at net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient.process(MyMessageServiceClient.java:29)at net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain.main(MessageServiceClientMain.java:29)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: javax.jms.JMSException: Unsupported Methodat com.amazon.sqs.javamessaging.SQSSession.createTemporaryQueue(SQSSession.java:744)at org.springframework.jms.core.JmsTemplate.doSendAndReceive(JmsTemplate.java:946)at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:926)at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:922)at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:983)... 8 more如您所見(jiàn),我們得到了一個(gè)堆棧跟蹤,告訴我們SQS不支持JMS方法“ createTemporaryQueue”! 到目前為止,對(duì)JMS的支持。 我猜這就是為什么他們稱其為簡(jiǎn)單隊(duì)列服務(wù),因?yàn)閮H實(shí)現(xiàn)了一些可能的JMS方法;-)。 我搜索了有關(guān)此的更多信息,但沒(méi)有任何運(yùn)氣。 但是,我確實(shí)遇到了這個(gè)框架: Nevado JMS 。 他們聲稱是AWS SQS / SNS的JMS驅(qū)動(dòng)程序,所以我決定嘗試一下。 首先,我在項(xiàng)目的pom中添加了以下依賴項(xiàng):
<dependency><groupId>org.skyscreamer</groupId><artifactId>nevado-jms</artifactId><version>1.3.1</version> </dependency>然后再次在兩個(gè)Spring配置中修改JmsFactory,這次是:
...<bean id="sqsConnectorFactory" class="org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory" /><bean id="jmsFactory" class="org.skyscreamer.nevado.jms.NevadoConnectionFactory"><property name="sqsConnectorFactory" ref="sqsConnectorFactory" /><property name="awsAccessKey" value="${aws.accessKey}" /><property name="awsSecretKey" value="${aws.secretKey}" /></bean> ...現(xiàn)在,當(dāng)我運(yùn)行主類時(shí),我得到了預(yù)期的結(jié)果:
2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: dad74fbff8e0a2f2 2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: ad74fbff8e0a2f2 2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: d74fbff8e0a2f2 2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 14 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 14 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 74fbff8e0a2f2 2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 13 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 13 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ====================================================== 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 4fbff8e0a2f2 2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 12 2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 12 2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================因此,這表明,盡管需要本地社區(qū)的一些幫助,但所謂的“簡(jiǎn)單”服務(wù)仍然可以使用更高級(jí)的東西:-)
翻譯自: https://www.javacodegeeks.com/2015/05/more-advanced-stuff-with-jms-and-aws-sqs.html
總結(jié)
以上是生活随笔為你收集整理的JMS和AWS SQS的更多高级内容的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
                            
                        - 上一篇: 9个最佳艺术标识以及如何免费制作自己的[
 - 下一篇: tp框架 db::name_玩! 框架: