activeMQ使用总结
這幾天在看異步消息,想到了jms,想到了activeMQ。下面是使用記錄:
activeMQ可以直接官網上下到。
理解:activeMQ實現了一個隊列,提供了tcp、socket等多種連接方式,通過java 的JMS進行發送消息和監聽消息。
? ? ? ? ? ? 消息分為兩種:queue(隊列)、topic(主題),隊列是點對點的生產者、消費者模式,主題是發布、訂閱模式。
開發與學習過程:
1、網上查找activeMQ和JMS相關資料,大致了解了上面理解的內容。
2、快速的進行開發,集成spring,因為集成spring把監聽、隊列等jms組件都定義好,更加方便學習。
spring集成jms組件包括:
?jmsTemplate:
<!-- 同步和異步消息jmsTemplate -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
</bean>
<bean id="asyJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="asyConnectionFactory" />
<property name="receiveTimeout" value="10000" />
</bean>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="asyConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" />
</bean>
?隊列:
<!-- 消息回復隊列 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="response_queue"/></bean>
監聽:
<!-- 回復消息監聽器 -->
<bean id="responseQueueListener" class="utry.jms.listener.ResponseQueueListener"/>
<!-- 回復對應的監聽容器 -->
<bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="responseQueue"/>
<property name="messageListener" ref="responseQueueListener"/>
</bean>
發送:
public void sendMsg(String destinationName, Serializable obj,
boolean asyncSend) {
if (asyncSend) {
asyJmsTemplate.convertAndSend(destinationName, obj);
} else {
jmsTemplate.convertAndSend(destinationName, obj);
}
}
3、分析監聽器類別并選取自己需要的監聽器類別:
監聽器是執行消息處理的最終要方法,JMS提供了MessageListener接口,實現MessageListener接口并實現OnMessage方法,即可實現消息的監聽。
但是,這種方式并沒有返回一個處理結束的結果,這是就考慮到了spring提供的SessionAwareMessageListener接口,實現SessionAwareMessageListener接口并實現OnMessage方法,即可實現監聽,但是不同的是,OnMessag有兩個參數TextMessage?message,?Session?session:通過session可是實現回復該線程? ? ? ? ?MessageProducer?producer?=?session.createProducer(destination); ?? ? ? ? Message?textMessage?=?session.createTextMessage("ConsumerSessionAwareMessageListener。。。"); ?producer.send(textMessage); ?
上面這種方式明顯已經可以實現回復信息,但是相對于MessageListenerAdapter還是略顯粗暴,MessageListenerAdapter直接實現了以上兩個接口,可以直接通過它選擇監聽方法、操作方法,綁定回復隊列。然后DefaultMessageListenerContainer設置messageListener為當前adapter就ok了。
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"><property name="delegate" ref="consumerMessageListener"/><property name="defaultListenerMethod" value="receiveMessage"/><property name="defaultResponseDestination" ref="responseQueue"/> </bean><!-- 消息監聽適配器對應的監聽容器 --> <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="queueDestination"/><property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為消息監聽器 --> </bean> 4、動態添加監聽創建隊列:在真正項目使用時,并不會簡單意義上的在配置文件中固定幾個隊列和幾個監聽。所以要提供接口供開發者進行調用:
注冊監聽:
public void receiveMsg(String destinationName, IMessageListener listener){
Hashtable<String, Class<IMessageListener>> cache=JmsCache.getCache();
if(cache.containsKey(destinationName)){
System.out.println("監聽已經存在");
}else{
cache.put(destinationName, (Class<IMessageListener>) listener.getClass());
System.out.println("注冊監聽");
//監聽容器
? ? DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
? ? //獲取adapter
? ? JmsMessageListenerAdapter messageListener=new JmsMessageListenerAdapter(listener);
? ? MessageListenerAdapter messageListenerAdapter=messageListener.getMessageListenerAdapter();
? ??
? ? //設置監聽容器
? ? jmsContainer.setMessageListener(messageListenerAdapter);
? ? jmsContainer.setConnectionFactory(connectionFactory);
? ? jmsContainer.setDestinationName(destinationName);
? ? jmsContainer.setSessionTransacted(false);
? ? jmsContainer.initialize();
? ? jmsContainer.start();
}
}
adapter實現類
?//MessageListenerAdapter
? ? private MessageListenerAdapter messageListenerAdapter=new MessageListenerAdapter();
? ? //監聽器
? ? private IMessageListener delegate;
? ? //實現方法
? ? private final String methodName="executeMessage";
? ? //監聽回復地址:responseQueue
? ? private final String defaultdestination="response_queue";
? ? /**
? ? ?* 默認無參構造
? ? ?*/
? ? public JmsMessageListenerAdapter(){}
? ? /**
? ? ?* 構造方法
? ? ?* @param delegate
? ? ?* 備注:responseQueue是為默認監聽
? ? ?*/
? ? public JmsMessageListenerAdapter(IMessageListener delegate){
? ? ? ? this.delegate=delegate;
? ? }
? ? /**
? ? ?* 獲取MessageListenerAdapter
? ? ?* @return
? ? ?*/
? ? public MessageListenerAdapter getMessageListenerAdapter(){
? ? ? ? messageListenerAdapter.setDelegate(delegate);
? ? ? ? messageListenerAdapter.setDefaultListenerMethod(methodName);
? ? ? ? messageListenerAdapter.setDefaultResponseQueueName(defaultdestination);
? ? ? ? return messageListenerAdapter;
? ? }
發送消息:
public void sendMsg(String destinationName, Serializable obj,
boolean asyncSend) {
if (asyncSend) {
asyJmsTemplate.convertAndSend(destinationName, obj);
} else {
jmsTemplate.convertAndSend(destinationName, obj);
}
}
總結
以上是生活随笔為你收集整理的activeMQ使用总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 标注过期方法 注解: @De
- 下一篇: 理解JVM运行原理