Java消息中间件
1.概述
中間件
非底層操作系統軟件,非業務應用軟件,不是直接給最終用戶使用的,不能直接給客戶帶來價值的軟件統稱為中間件。
消息中間件
管制關注于數據的發送和接收,利用高效可靠的異步消息傳遞機制集成分布式系統。
優點
① 解耦 ② 異步 ③ 橫向擴展 ④ 安全可靠 ⑤ 順序保證(比如kafka)
jms
java消息服務(Java Message Service)即JMS,是一個Java平臺中關于面向消息中間件的api,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信
什么是AMQP
AMQP(advanced message queuing protocol)是一個提供統一消息服務的應用層標準協議,基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同開發語言等條件的限制
常見消息中間件
activeMQ(支持多語言,實現jms1.1,j2ee1.4規范),RabbitMQ(支持更多語言,基于AMQP規范),kafka(高吞吐量,分布式,分區,O(1)磁盤順序提供消息持久化)
JMS消息模式
隊列模式
客戶端包括生產者和消費者
隊列中的消息只能被一個消費者消費
消費者可以隨時消費隊列中的消息
主題模式
客戶端包括發布者和訂閱者
主題中的消息被所有訂閱者消費
消費者不能消費訂閱之前就發送到主題中的消息
JMS編碼接口
ConnectionFactory 用于創建連接到消息中間件的連接工廠
Connection 代表了應用程序和消息服務器之間的通信鏈路
Destination 指消息發布和接收的地點,包括隊列或主題
Session 表示一個單線程的上下文,用于發送和接收消息
MessageConsumer 由會話創建,用于接收發送到目標的消息
MessageProducer 由會話創建,用于發送消息到目標
Message 是在消費者和生產者之間傳送的對象,消息頭,一組消息屬性,一個消息體
流程
2.安裝activeMQ
官網 : http://activemq.apache.org/
第一種啟動
進入bin ,activemq.bat 啟動
進入瀏覽器 http://127.0.0.1:8161
用戶名密碼默認為admin
第二種以服務啟動
InstallService.bat以管理員身份運行
服務中有ActiveMQ
linux
解壓壓縮包
進入bin 輸入 activemq start,啟動完成
3.jms 演示
創建maven項目
添加依賴
創建隊列模式的生產者AppProducer
package com.jms.queue;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class AppProducer {public static final String url = "tcp://localhost:61616";public static final String queueName = "queue-test ";public static void main(String[] args) throws JMSException {//1.創建連接工廠ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//2.創建ConnectionConnection connection = connectionFactory.createConnection();//3.啟動連接connection.start();//4.創建會話 第一個參數:是否在事務中去處理, 第二個參數.應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.創建一個目標Destination destination = session.createQueue(queueName);//6創建一個生產者MessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {//7.創基建消息TextMessage textMessage = session.createTextMessage("test:"+i);producer.send(textMessage);System.out.println("發送消息:"+textMessage.getText());}//9關閉連接connection.close();} }運行項目,打開activeMQ管理工具
100個消息,沒有消費者消費,連接已關閉
消費者 AppConsumer
當啟動2個消費者,再啟動生產者,結果是2個消費者平均消費
創建主題模式發布者AppProducer
package com.jms.topic;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class AppProducer {public static final String url = "tcp://localhost:61616";public static final String topicName = "topic-test ";public static void main(String[] args) throws JMSException {//1.創建連接工廠ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//2.創建ConnectionConnection connection = connectionFactory.createConnection();//3.啟動連接connection.start();//4.創建會話 第一個參數:是否在事務中去處理, 第二個參數.應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.創建一個目標Destination destination = session.createTopic(topicName);//6創建一個生產者MessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {//7.創基建消息TextMessage textMessage = session.createTextMessage("test:"+i);producer.send(textMessage);System.out.println("發送消息:"+textMessage.getText());}//9關閉連接connection.close();} }
這時直接運行訂閱者接收不到消息,因為發布者先運行了
訂閱者AppConsumer
package com.jms.topic;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class AppConsumer {public static final String url = "tcp://localhost:61616";public static final String topicName = "topic-test ";public static void main(String[] args) throws JMSException {//1.創建連接工廠ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//2.創建ConnectionConnection connection = connectionFactory.createConnection();//3.啟動連接connection.start();//4.創建會話 第一個參數:是否在事務中去處理, 第二個參數.應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.創建一個目標Destination destination = session.createTopic(topicName);//6創建一個消費者MessageConsumer consumber = session.createConsumer(destination);//7創建一個監聽器consumber.setMessageListener(new MessageListener() {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("接收消息:"+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});//8關閉連接 消息是異步的 ,在程序退出是關閉//connection.close();} }啟動兩個訂閱者,再啟動發布者,兩個訂閱者均可收到發布者的消息
4.使用spring集成jms鏈接activeMQ
ConnectionFactory 用于管理連接的工廠
JmsTemplate 用于發送和接收消息的模板類
MessageListerner 消息監聽器
ConnectionFactory 是spring 為我們提供的連接池
兩種連接池SingleConnectionFactory 和 CachingConnectionFactory
SingleConnectionFactory 是對于jms建立請求,只會返回一個連接
CachingConnectionFactory 實現了SingleConnectionFactory 的所有功能,還提供了緩存
JmsTemplate spring提供,線程安全,可以使用JmsTemplate 操作jms
MessageListerner 實現onMessage方法,接收Message參數
添加依賴
<dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version><exclusions><exclusion><artifactId>spring-context</artifactId><groupId>org.springframework</groupId></exclusion></exclusions></dependency>生產者 創建接口ProducerService
package com.spring.producer;public interface ProducerService {void sendMessage(String message);}實現類
package com.spring.producer.impl;import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service;import com.spring.producer.ProducerService; @Service public class ProducerServiceImpl implements ProducerService {@Autowiredprivate JmsTemplate jmsTemplate;@Resource(name= "queueDestination")//因為可能配置多個目的地,所以使用resource name進行區分Destination destination;public void sendMessage(final String message) {//使用JmsTemplate發送消息jmsTemplate.send(destination,new MessageCreator() {//創建消息public Message createMessage(Session session) throws JMSException {TextMessage textMessage = session.createTextMessage(message);return textMessage;}});System.out.println("發送消息:"+message);}}配置
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/mvchttp://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd"><context:annotation-config></context:annotation-config><context:component-scan base-package="com.spring.*" /><!-- ActiveMQ提供 ConnectionFactory--><bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"></property></bean><!-- spring-jms提供的連接池 --><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactory"></property></bean> <!-- 一個隊列目的地,點對點 --><bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><!-- 指定隊列名字 --><constructor-arg value="springQueue"></constructor-arg></bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"></property></bean><!-- <bean id="ProducerServiceImpl" class="com.spring.producer.impl.ProducerServiceImpl"></bean> --> </beans>啟動類
package com.spring.producer;import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;public class AppProducer {public static void main(String[] args) {ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");ProducerService service = context.getBean(ProducerService.class);for(int i=0;i<100;i++){service.sendMessage("test"+i);}//會自動清理資源((AbstractApplicationContext) context).close();} }可以把公共地方提取出來
<import resource="common.xml"/>創建消費者
配置
監聽類
package com.spring.consumer;import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;import org.springframework.stereotype.Service; @Service public class ConsumberMessageListener implements MessageListener {public void onMessage(Message message) {TextMessage textMessage = (TextMessage)message;try {System.out.println("接收消息"+textMessage.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}啟動類
package com.spring.consumer;import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;public class AppConsumber {public static void main(String[] args) {ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");} }主題模式
在common配置添加
改ProducerServiceImpl Resource為
name= "topicDestination"改consumer.xml jmsContainer
<property name="destination" ref="queueDestination"></property>總結
- 上一篇: python转换为c代码_bash 转换
- 下一篇: Activemq源码、编译、导入idea