消息中间件和JMS介绍
在一個公司創立初期,他可能只有幾個應用,系統之間的關聯也不是那么大,A系統調用B系統就直接調用B提供的API接口;后來這個公司做大了,他一步步發展有了幾十個系統,這時候A系統要調用B系統的接口,但是B系統前幾天剛改了一下接口A并不知情。所以A發現調不通于是給B系統管理員打電話,小王啊,改了接口咋不告訴我呢。我還以為我們系統出錯了呢。弄得小王一頓尷尬,我這自己改個東西還的通知這個通知那個的。
1 中間件介紹
我們看到上面的故事中的小王他真的是很累啊。自己修改一個接口還的給所有調用接口的系統管理員打電話告知API發生變化。說到這個問題啊,還是的說我們系統之間的耦合。對于一個小公司來說是無所謂,但是對于一個大公司這種情況簡直是致命的。于是最近幾年這些越來越大的互聯網公司在這種挑戰下提出了中間件這個概念:中間件在操作系統軟件,網絡和數據庫之上,應用軟件之下,總的作用是為處于自己上層的軟件提供靈活的開發環境。因而中間件是指一類軟件,是基于分布式處理的軟件,最突出的特點是其網絡通信功能。也可認為中間件是位于平臺和應用之間的通用服務,這些服務具有標準的程序接口和協議。針對不同的操作系統和硬件平臺,可以有符合接口和協議的多種實現。
1.1 中間件分類
中間件可以分為六類:
1) 終端仿真/屏幕轉換
2) 數據訪問中間件(UDA)
3) 遠程過程調用中間件(RPC)
4) 消息中間件(MOM)
5) 交易中間件(TPM)
6) 對象中間件
然而在實際應用中,一般將中間件分為兩大類:
一類是底層中間件,用于支撐單個應用系統或解決一類問題,包括交易中間件、應用服務器、消息中間件、數據訪問中間件等;
另一類是高層中間件,更多的用于系統整合,包括企業應用集成中間件、工作流中間件、門戶中間件等,他們通常會與多個應用系統打交道,在系統中層次較高,并大多基于前一類的底層中間件運行。
終端仿真/屏幕轉換
此類中間件用于實現客戶機圖形用戶接口與已有的字符接口方式的服務器應用程序之間的互操作,應用與早期的大型機系統,現在已很少使用。
數據訪問中間件
此類中間件是為了建立數據應用資源互操作的模式,對異構環境下的數據庫或文件系統實現聯接。
遠程過程調用中間件
此類中間件可以使開發人員在需要時調用位于遠端服務器上的過程,屏蔽了在調用過程中的通信細節。一個應用程序使用RPC來遠程執行一個位于不同地址空間里的過程,在效果上看和執行本地調用相同。
交易中間件
此類中間件是專門針對聯機交易系統而設計的。聯機交易系統需要處理大量并發進程,處理并發涉及到操作系統,文件系統,編程語言,數據通信,數據庫系統,系統管理,應用軟件等。而交易中間件根據分布式交易處理的標準及參考模型,對資源管理,交易管理和應用進行了實現,從而使得基于交易中間件開發應用程序更為簡單。交易中間件基本上只適用于聯機交易系統,是一種較為專用的中間件。
消息中間件
此類中間件是指利用高效可靠的消息傳遞機制進行平臺無關的數據交流,并基于數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。
消息中間件可以即支持同步方式,又支持異步方式。異步中間件比同步中間件具有更強的容錯性,在系統故障時可以保證消息的正常傳輸。異步中間件技術又分為兩類:廣播方式和發布/訂閱方式。由于發布/訂閱方式可以指定哪種類型的用戶可以接受哪種類型的消息,更加有針對性,事實上已成為異步中間件的非正式標準。目前主流的消息中間件產品有IBM的MQSeries,BEA的MessageQ和Sun的JMS等[1]。
對象中間件
傳統的對象技術通過封裝、繼承及多態提供了良好的代碼重用功能。但這些對象只存在與一個程序中,外界并不知道它們的存在,也無法訪問它們。對象中間件提供了一個標準的構建框架,能使不同廠家的軟件通過不同的地址空間,網絡和操作系統實現交互訪問。對象中間件的目標是為軟件用戶及開發者提供一種應用級的即插即用的互操作性。目前主流的對象中間件有OMG的CORBA,Microsoft 的COM以及IBM的SOM,Sun的RMI等。
中間件的特點
一般來講,中間件具有以下一些特點:滿足大量應用的需求,運行于多種硬件和操作系統平臺,支持分布式計算,支持標準接口和協議。開發人員通過調用中間件提供的大量API,實現異構環境的通信,從而屏蔽異構系統中復雜的操作系統和網絡協議。
由于標準接口對于可移植性和標準協議對于互操作性的重要性,中間件已成為許多標準化工作的主要部分。分布式應用軟件借助中間件可以在不同的技術之間共享資源。
總的來說,中間件屏蔽了底層操作系統的復雜性,使程序開發人員面對一個簡單而統一的開發環境,減少了程序設計的復雜性,將注意力集中與自己的業務上,不必再為程序在不同軟件系統上的移植而重復工作,從而大大減少了技術上的負擔。
2 消息中間件
面向消息的中間件(MOM),提供了以松散耦合的靈活方式集成應用程序的一種機制。它們提供了基于存儲和轉發的應用程序之間的異步數據發送,即應用程序彼此不直接通信,而是與作為中介的MOM通信。MOM提供了有保證的消息發送(至少是在盡可能地做到這一點),應用程序開發人員無需了解遠程過程調用(RPC)和網絡/通信協議的細節。
消息隊列技術是分布式應用間交換信息的一種技術。消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被用程序讀走。通過消息隊列,應用程序可獨立地執行–它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程序接收此消息。在分布式計算環境中,為了集成分布式應用,開發者需要對異構網絡環境下的分布式應用提供有效的通信手段。為了管理需要共享的信息,對應用提供公共的信息交換機制是重要的。設計分布式應用的方法主要有:遠程過程調用(RPC)–分布式計算環境(DCE)的基礎標準成分之一;對象事務監控(OTM)–基于CORBA的面向對象工業標準與事務處理(TP)監控技術的組合;消息隊列(MessageQueue)–構造分布式應用的松耦合方法。
MOM將消息路由給應用程B,這樣消息就可以存在于完全不同的計算機上,MOM負責處理網絡通信。如果網絡連接不可用,MOM會存儲消息,直到連接變得可用時,再將消息轉發給應用程序B。
靈活性的另一方面體現在,當應用程序A發送其消息時,應用程序B甚至可以不處于執行狀態。MOM將保留這個消息,直到應用程序B開始執行并試著檢索消息為止。這還防止了應用程序A因為等待應用程序B檢索消息而出現阻塞。這種異步通信要求應用程序的設計與現在大多數應用程序不同,不過,對于時間無關或并行處理,它可能是一個極其有用的方法。
2.1 消息中間件的傳遞模式
消息中間件一般有兩種傳遞模式:點對點模式(P2P)和發布-訂閱模式(Pub/Sub)。
點對點模式
Point-to-Point(P2P)我們很容易理解,即生產者和消費者之間的消息往來。?
每個消息都被發送到特定的消息隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。
P2P的特點:
每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中);
發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列;
接收者在成功接收消息之后需向隊列應答成功。
發布-訂閱模式(Pub/Sub)
我們可以聯想到賣報紙的過程:印刷廠把當天的報紙印好然后送到郵遞員手里,郵遞員風雨兼程的把報紙送到每一位訂閱者手里。由此我們可以看到發布-訂閱模式的一些特點:
每個消息可以有多個消費者;
發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態;
由上介紹我們可以看出這兩種模式各有千秋,如果你需要點對點的發送消息那么使用P2P更專注,如果你是群發消息,顯然pub/sub模式更適合。
3 基于多種協議的消息傳遞機制
目前市場上對于網絡消息傳遞的協議版本很多,不同的協議有不同的規范,我們在使用時要比對實現不同協議的產品。下面我們看一下目前主流的消息傳遞協議:
3.1 AMQP協議
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。AMQP協議是一種二進制協議,提供客戶端應用與消息中間件之間異步、安全、高效地交互。
AMQP是一個應用層的異步消息傳遞協議,為面向消息的中間件而設計。其目的是通過協議使應用模塊之間或應用程序與中間件等進行充分解耦。而在設計初期,AMQP的原始用途只是為金融界提供一個可以彼此協作的消息協議?,F在已經有相當一部分遵循AMQP的服務器和客戶端供使用。其中RabbitMQ是AMQP的一款開源標準實現。
支持所有消息中間件的功能:消息交換、文件傳輸、流傳輸、遠程進程調用等。
AMQP的服務器(Broker)主要由交換器、消息、隊列組成。Broker的主要功能是消息的路由和緩存。對于需要保障可靠性的消息,RabbitMQ可以將消息、隊列和交換器的數據寫入本地硬盤。而對于響應時間敏感的消息,RabbitMQ可以不配置持久化機制。
解決的問題:
1)信息的發送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數據如何防止丟失?
2)如何降低發送者和接收者的耦合度?
3)如何讓Priority高的接收者先接到數據?
4)如何做到load balance?有效均衡接收者的負載?
5)如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不同的數據,如何做有效的filter。
6)如何做到可擴展,甚至將這個通信模塊發到cluster上?
7)如何保證接收者接收到了完整,正確的數據?
AMQP協議解決了以上的問題,而RabbitMQ實現了AMQP。
3.2 STOMP協議
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,簡單(流)文本定向消息協議。
它提供了一個可互操作的連接格式,允許STOMP客戶端與任意STOMP消息代理(Broker)進行交互。STOMP協議由于設計簡單,易于開發客戶端,因此在多種語言和多種平臺上得到廣泛地應用。
STOMP協議的前身是TTMP協議(一個簡單的基于文本的協議),專為消息中間件設計。
STOMP是一個非常簡單和容易實現的協議,其設計靈感源自于HTTP的簡單性。盡管STOMP協議在服務器端的實現可能有一定的難度,但客戶端的實現卻很容易。例如,可以使用Telnet登錄到任何的STOMP代理,并與STOMP代理進行交互。
STOMP是除AMQP開放消息協議之外地另外一個選擇, 實現了被用在JMS brokers中特定的有線協議,比如OpenWire。它僅僅是實現通用消息操作中的一部分,并非想要覆蓋全面的消息API。
STOMP server就好像是一系列的目的地, 消息會被發送到這里。STOMP協議把目的地當作不透明的字符串,其語法是服務端具體的實現。 此外STOMP沒有定義目的地的交付語義是什么,語義的目的地可以從服務器到服務器,甚至從目的地到目的地。這使得服務器有可創造性的語義,去支持STOMP。
STOMP client的用戶代理可以充當兩個角色(可能同時):
作為生產者,通過SENDframe發送消息到server
作為消費者,發送SUBSCRIBEframe到目的地并且通過MESSAGEframe從server獲取消息。
STOMP協議工作于TCP協議之上,使用了下列命令:
SEND 發送
SUBSCRIBE 訂閱
UNSUBSCRIBE 退訂
BEGIN 開始
COMMIT 提交
ABORT 取消
ACK 確認
DISCONNECT 斷開
目前最流行的STOMP消息代理是Apache ActiveMQ。
3.3 JMS協議
JMS是Java Message Service的縮寫,即Java消息服務。
在大型互聯網中,我們采用消息中間件可以進行應用之間的解耦以及操作的異步,這是消息中間件兩個最基礎的特點,也正是我們所需要的。在此基礎上,我們著重思考的是消息的順序保證、擴展性、可靠性、業務操作與消息發送一致性,以及多集群訂閱者等方面的問題。當然,這些我們要思考的東西,JMS都已經想到了,先看下JMS能幫開發者做什么:
1、定義一組消息公用概念和實用工具
所有Java應用程序都可以使用JMS中定義的API去完成消息的創建、接收與發送,任何實現了JMS標準的MOM都可以作為消息的中介,完成消息的存儲轉發
2、最大化消息應用程序的可移植性
MOM提供了有保證的消息發送,應用程序開發人員無需了解遠程過程調用(RPC)和網絡/通信協議的細節,提供了程序的可移植性
3、最大化降低應用程序與應用程序之間的耦合度
由于MOM的存在,各個應用程序只關心和MOM之間如何進行消息的接收與發送,而無須關注MOM的另一邊,其他程序是如何接收和發送的
JMS定義了一套通用的接口和相關語義,提供了諸如持久、驗證和事務的消息服務,它最主要的目的是允許Java應用程序訪問現有的消息中間件。JMS規范沒有指定在消息節點間所使用的通訊底層協議,來保證應用開發人員不用與其細節打交道,一個特定的JMS實現可能提供基于TCP/IP、HTTP、UDP或者其它的協議。
由于沒有統一的規范和標準,基于消息中間件的應用不可移植,不同的消息中間件也不能互操作,這大大阻礙了消息中間件的發展。 Java Message Service(JMS, Java消息服務)是SUN及其伙伴公司提出的旨在統一各種消息中間件系統接口的規范。
目前許多廠商采用并實現了JMS API,現在,JMS產品能夠為企業提供一套完整的消息傳遞功能,目前我們看到的比較流行的JMS商業軟件和開源產品:WebLogic、SonicMQ、ActiveMQ、OpenJMS都是基于JMS規范的實現。
4 JMS介紹
在 JMS 之前,每一家 MOM 廠商都用專有 API 為應用程序提供對其產品的訪問,通常可用于許多種語言,其中包括 Java 語言。JMS 通過 MOM 產品為 Java 程序提供了一個發送和接收消息的標準的、便利的方法。用 JMS 編寫的程序可以在任何實現 JMS 標準的 MOM 上運行。
JMS 可移植性的關鍵在于:JMS API 是由 Sun 作為一組接口而提供的。提供了 JMS 功能的產品是通過提供一個實現這些接口的提供者來做到這一點的。開發人員可以通過定義一組消息和一組交換這些消息的客戶機應用程序建立 JMS 應用程序。
JMS 支持兩種消息類型P2P 和Pub/Sub,在JMS消息模型中,根據點對點模式和發布/訂閱模式,這些要素由擴展出了各自的內容:
JMS標準?? ?點對點模式?? ?發布/訂閱模式
ConnectionFactory?? ?QueueConnectionFactory?? ?TopicConnectionFactory
Connection?? ?QueueConnection?? ?TopicConnection
Destination?? ?Queue?? ?Topic
Session?? ?QueueSession?? ?TopicSession
MessageProducer?? ?QueueSender?? ?TopicPublisher
MessageConsumer?? ?QueueReceiver?? ?TopicSubscriber
JMS為發開者提供了很多的要素,看一下比較重要的幾個:
要 素?? ?作 用
Destination?? ?表示消息所走通道的目標定義,用來定義消息從發送端發出后要走的通道,而不是接收方。Destination屬于管理類對象
ConnectionFactory?? ?顧名思義,用于創建連接對象,ConnectionFactory屬于管理類的對象
Connection?? ?連接接口,所負責的重要工作時創建Session
Session?? ?會話接口,這是一個非常重要的對象,消息發送者、消息接收者以及消息對象本身,都是通過這個會話對象創建的
MessageConsumer?? ?消息的消費者,也就是訂閱消息并處理消息的對象
MessageProducer?? ?消息的生產者,也就是用來發送消息的對象
XXXMessage?? ?指各種類型的消息對象,包括ByteMesage、ObjectMessage、StreamMessage和TextMessage這5種
JMS消息模型
JMS 消息由以下幾部分組成:消息頭,屬性,消息體。
消息頭(header):JMS消息頭包含了許多字段,它們是消息發送后由JMS提供者或消息發送者產生,用來表示消息、設置優先權和失效時間等等,并且為消息確定路由。
屬性(property):由消息發送者產生,用來添加刪除消息頭以外的附加信息。
消息體(body):由消息發送者產生,JMS中定義了5種消息體:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
JMS編程模型
一般來說我們在開發基于JMS協議的客戶端由一下幾部構成:
1) 用JNDI 得到ConnectionFactory對象;
2) 用JNDI 得到目標隊列或主題對象,即Destination對象;
3) 用ConnectionFactory創建Connection 對象;
4) 用Connection對象創建一個或多個JMS Session;
5) 用Session 和Destination 創建MessageProducer和MessageConsumer;
6) 通知Connection 開始傳遞消息。
因為jms需要使用到J2EE服務器,我們平常用的tomcat屬于J2SE類型的服務器,常見的J2EE服務器包括:Geronimo,JBoss 4, GlassFish,WebLogic 。我們在這里使用glassfish 容器。安裝和使用有很多教程,在此就不貼了。首先我們進去glassfish的控制臺,設置一下我們的發送者和接受者對象:
下面我們用oracle提供的jms接口來寫一個服務端,我們先來寫一個P2P模式的例子:
MySender.java
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.naming.*;
import javax.jms.*;
public class MySender {
? ? public static void main(String[] args) {
? ? ? ? try
? ? ? ? { ? //1)創建一個connection
? ? ? ? ? ? InitialContext ctx=new InitialContext();
? ? ? ? ? ? QueueConnectionFactory f=(QueueConnectionFactory)ctx.lookup("myQueueConnectionFactory");
? ? ? ? ? ? QueueConnection con=f.createQueueConnection();
? ? ? ? ? ? con.start();
? ? ? ? ? ? //2) 創建一個會話接口
? ? ? ? ? ? QueueSession ses=con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //3) 獲取會話接口對象
? ? ? ? ? ? Queue t=(Queue)ctx.lookup("myQueue");
? ? ? ? ? ? //4)創建一個發送者對象
? ? ? ? ? ? QueueSender sender=ses.createSender(t);
? ? ? ? ? ? //5) 創建一個消息對象
? ? ? ? ? ? TextMessage msg=ses.createTextMessage();
? ? ? ? ? ? //6) 把我們的消息寫入msg對象中
? ? ? ? ? ? BufferedReader b=new BufferedReader(new InputStreamReader(System.in));
? ? ? ? ? ? while(true)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? System.out.println("Enter Msg, end to terminate:");
? ? ? ? ? ? ? ? String s=b.readLine();
? ? ? ? ? ? ? ? if (s.equals("end"))
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? msg.setText(s);
? ? ? ? ? ? ? ? //7) 發送消息
? ? ? ? ? ? ? ? sender.send(msg);
? ? ? ? ? ? ? ? System.out.println("Message successfully sent.");
? ? ? ? ? ? }
? ? ? ? ? ? //8) 關閉連接
? ? ? ? ? ? con.close();
? ? ? ? }catch(Exception e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
MyReceiver.java
import javax.jms.*;
import javax.naming.InitialContext;
public class MyReceiver {
? ? public static void main(String[] args) {
? ? ? ? try{
? ? ? ? ? ? //1) 創建一個connection
? ? ? ? ? ? InitialContext ctx=new InitialContext();
? ? ? ? ? ? QueueConnectionFactory f=(QueueConnectionFactory)ctx.lookup("myQueueConnectionFactory");
? ? ? ? ? ? QueueConnection con=f.createQueueConnection();
? ? ? ? ? ? con.start();
? ? ? ? ? ? //2) 創建一個會話接口
? ? ? ? ? ? QueueSession ses=con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //3) 獲取會話接口對象
? ? ? ? ? ? Queue t=(Queue)ctx.lookup("myQueue");
? ? ? ? ? ? //4)創建一個發送者對象
? ? ? ? ? ? QueueReceiver receiver=ses.createReceiver(t);
? ? ? ? ? ? //5) 創建一個消監聽對象
? ? ? ? ? ? MyListener listener=new MyListener();
? ? ? ? ? ? //6) 將監聽器注冊到receiver,用來監聽receiver
? ? ? ? ? ? receiver.setMessageListener(listener);
? ? ? ? ? ? System.out.println("Receiver1 is ready, waiting for messages...");
? ? ? ? ? ? System.out.println("press Ctrl+c to shutdown...");
? ? ? ? ? ? while(true){
? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? }
? ? ? ? }catch(Exception e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
MyListener.java
import javax.jms.*;
public class MyListener implements MessageListener {
? ? public void onMessage(Message m) {
? ? ? ? try{
? ? ? ? ? ? TextMessage msg=(TextMessage)m;
? ? ? ? ? ? System.out.println("following message is received:"+msg.getText());
? ? ? ? }catch(JMSException e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
Pub/Sub模式:
MySender.java
import javax.jms.*;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class MySender {
? ? public static void main(String[] args) {
? ? ? ? try
? ? ? ? { ? //1)創建一個connection
? ? ? ? ? ? InitialContext ctx=new InitialContext();
? ? ? ? ? ? TopicConnectionFactory f=(TopicConnectionFactory)ctx.lookup("myTopicConnectionFactory");
? ? ? ? ? ? TopicConnection con=f.createTopicConnection();
? ? ? ? ? ? con.start();
? ? ? ? ? ? //2) 創建一個會話接口
? ? ? ? ? ? TopicSession ses=con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //3) 獲取會話接口對象
? ? ? ? ? ? Topic t=(Topic)ctx.lookup("myTopic");
? ? ? ? ? ? //4)創建一個發送者對象
? ? ? ? ? ? TopicPublisher publisher=ses.createPublisher(t);
? ? ? ? ? ? //5) 創建一個消息對象
? ? ? ? ? ? TextMessage msg=ses.createTextMessage();
? ? ? ? ? ? //6) 把我們的消息寫入msg對象中
? ? ? ? ? ? BufferedReader b=new BufferedReader(new InputStreamReader(System.in));
? ? ? ? ? ? while(true)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? System.out.println("Enter Msg, end to terminate:");
? ? ? ? ? ? ? ? String s=b.readLine();
? ? ? ? ? ? ? ? if (s.equals("end"))
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? msg.setText(s);
? ? ? ? ? ? ? ? //7) 發送消息
? ? ? ? ? ? ? ? publisher.publish(msg);
? ? ? ? ? ? ? ? System.out.println("Message successfully sent.");
? ? ? ? ? ? }
? ? ? ? ? ? //8) 關閉連接
? ? ? ? ? ? con.close();
? ? ? ? }catch(Exception e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
MyReceiver.java
import javax.jms.*;
import javax.naming.InitialContext;
public class MyReceiver {
? ? public static void main(String[] args) {
? ? ? ? try{
? ? ? ? ? ? //1) 創建一個connection
? ? ? ? ? ? InitialContext ctx=new InitialContext();
? ? ? ? ? ? TopicConnectionFactory f=(TopicConnectionFactory)ctx.lookup("myTopicConnectionFactory");
? ? ? ? ? ? TopicConnection con=f.createTopicConnection();
? ? ? ? ? ? //2) 創建一個會話接口
? ? ? ? ? ? TopicSession ses=con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //3) 獲取會話接口對象
? ? ? ? ? ? Topic t=(Topic)ctx.lookup("myTopic");
? ? ? ? ? ? //4)創建一個發送者對象
? ? ? ? ? ? TopicSubscriber receiver=ses.createSubscriber(t);
? ? ? ? ? ? //5) 創建一個消監聽對象
? ? ? ? ? ? MyListener listener=new MyListener();
? ? ? ? ? ? //6) 將監聽器注冊到receiver,用來監聽receiver
? ? ? ? ? ? receiver.setMessageListener(listener);
? ? ? ? ? ? System.out.println("Receiver1 is ready, waiting for messages...");
? ? ? ? ? ? System.out.println("press Ctrl+c to shutdown...");
? ? ? ? ? ? while(true){
? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? }
? ? ? ? }catch(Exception e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
MyListener.java
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyListener implements MessageListener {
? ? public void onMessage(Message m) {
? ? ? ? try{
? ? ? ? ? ? TextMessage msg=(TextMessage)m;
? ? ? ? ? ? System.out.println("following message is received:"+msg.getText());
? ? ? ? }catch(JMSException e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
上面兩個案例我們運行可以看到消息成功的發送出去了。熟悉了JMS的語法,使用起來還是很簡單。
上面我們介紹到了JMS,JMS是一個用于提供消息服務的技術規范,它制定了在整個消息服務提供過程中的所有數據結構和交互流程。JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關于面向消息中間件(MOM)的API。 Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
下面我們引入另一個概念:MQ(Message Queue)。
應用程序通過寫和檢索出入列隊的針對應用程序的數據(消息)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用于諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
MQ和JMS類似,但不同的是JMS是SUN Java消息中間件服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。JMS是一個用于提供消息服務的技術規范,它制定了在整個消息服務提供過程中的所有數據結構和交互流程。而MQ則是消息隊列服務,是面向消息中間件(MOM)的最終實現,是真正的服務提供者;MQ的實現可以基于JMS,也可以基于其他規范或標準。MQ 有很多產品:IBM的,rabbitmq, activemq 等,rabbitmq 只支持點對點的方式。所以沒有完全實現JMS的標準,所以說它不是一個JMS產品,而rabitmq 和Jobss JMS 它們實現了JMS的各項標準,是開源的JMS產品。目前完全實現JMS協議的mq是activemq,所以接下來我們先重點看一下activemq。從activemq入手去探索javaEE的世界。
---------------------?
作者:rickiyang?
來源:CSDN?
原文:https://blog.csdn.net/a953713428/article/details/70770087?
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
總結
以上是生活随笔為你收集整理的消息中间件和JMS介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JMS(Java消息服务)入门教程
- 下一篇: 从零开始玩转JMX(一)——简介和Sta