使用JMS实现请求/应答程序
2019獨角獸企業重金招聘Python工程師標準>>>
JMS是用來為發送者和接收者解耦的;
消息通過一個進程發送給代理,然后代理在另外一個進程異步的接收消息,一種可以利用JMS來實現的系統架構被稱為請求/應答。
概括的說:一個請求/應答場景包括一個發送消息(請求)并期望接收消息返回值(應答)的應用程序。通常,這樣的系統被設計成CS架構,服務端和客戶端通過網絡傳輸協議(TCP,UDP等等)同步的進行通信。這種架構方式在可擴展方面具有明顯的限制,很難獲得長遠發展。消息系統正是為此而生,通過基于消息的請求/應答設計模式能夠設計出易于擴展的系統主要以異步處理方式實現。
請求/應答系統:注意,客戶端包含消息生產者(producer)和消息消費者(consumer),并且工作者(worker)也包含消息生產者(producer)和消息消費者(consumer)。后面將解釋客戶端和工作者(worker)。
首先,消息生產者創建一個以JMS消息格式封裝的請求并在消息中設置一些重要的屬性,包括correlation ID(通過消息的JMSCorrelationID屬性設置)和reply destination(響應發送目的地,通過JMSReplyTo屬性設置)。correlation ID屬性非常重要,因為在請求數量非常多時需要使用這個屬性來關聯請求和應答。屬性指定應答發往的目的地(通常是一個臨時的JMS目的地,因為reply destination比較消耗資源)。接下來,客戶端配置,一個消息消費者監聽響應消息目的地(reply destination)。
其次,一個工作者(woker)接收到請求,并處理請求,然后發送一個響應消息到請求消息的JMSReplyTo屬性指定的目的中。響應消息必須用原始請求消息correlation ID的屬性值來設置JMSCorrelationID屬性,當客戶端收到響應消息后,可以通過correlation ID關聯到初始的請求。
這種結構如何實現高可擴展性,想象一個場景:單一的工作者無法處理大量并發的請求負載時怎么辦?當然沒問題:可以添加工作者來平衡負載。這些工作者甚至分布到自不同的主機,這也是這種可擴展性設計中最重要的部分。因為工作者并不是在爭奪相同主機上的資源,所以唯一的限制是代理中消息的最大吞吐量,它比使用普通的客戶端服務器架構能達到的最大吞吐量要大得多。并且,ActiveMQ可以進行水平和垂直擴展。
下面讓我們看看請求/應答程序的基本實現.
實現服務和工作者(worker)
????首先,需要關注的是系統中使用的消息代理。先要啟動代理,以便兩邊程序都啟動時可以連接到代理。為方便說明本例中使用一個嵌入式代理。其次,需要啟動系統中的工作者(worker)。工作者由消息監聽器組成,用來接收處理消息和發送消息響應。
在請求/響應實例創建中一個代理,消費者以及生產者
public void start() throws Exception {createBroker();setupConsumer(); } private void createBroker() throws Exception {broker = new BrokerService();broker.setPersistent(false);broker.setUseJmx(false);broker.addConnector(brokerUrl);broker.start(); } private void setupConsumer() throws JMSException {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);Connection connection;connection = connectionFactory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination adminQueue = session.createQueue(requestQueue);producer = session.createProducer(null);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);consumer = session.createConsumer(adminQueue);consumer.setMessageListener(this); } public void stop() throws Exception {producer.close();consumer.close();session.close();broker.stop(); }從代碼中可以看到,start()方法調用一個方法創建并啟動一個嵌入式代理,另外一個方法用于啟動工作者. createBroker()方法使用BrokerService類來創建一個嵌入式代理.setupConsumer()方法通過創建 JMS所需的所有對象來發送和接收消息,這些JMS對象包括:一個連接,一個session,一個消息目的地,一個消息消費者和一個生產者。 創建消息生產者的時候沒有設置默認的消息目的地,因為該生產者會將消息發送到每個消息的 JMSReplyTo屬性所指定的目的地中。下面再詳細看下請求/響應中的監聽者,看看它是如何處理每個請求的:
public void onMessage(Message message) {try{TextMessage response = this.session.createTextMessage();if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message;String messageText = txtMsg.getText();response.setText(handleRequest(messageText));}response.setJMSCorrelationID(message.getJMSCorrelationID());producer.send(message.getJMSReplyTo(), response);} catch (JMSException e) {e.printStackTrace();} } public String handleRequest(String messageText) {return "Response to '" + messageText + "'"; }消息監聽器創建一個新消息,并設置合適的correlation ID,然后將消息發送到響應消息隊列。很簡單但是很重要,盡管在這個消息監聽器的實現中沒做什么驚天動地的事情,但是它展示了工作者完成器任務的必要的基本步驟。根據需求,可以在監聽器中添加其他任意額外的操作或者數據庫訪問操作。
啟動服務很簡單:創建一個server實例并調用start()方法。main方法容納了server的的所有功能,如下面的代碼清單所示:
public static void main(String[] args) throws Exception {Server server = new Server();server.start();System.out.println();System.out.println("Press any key to stop the server");System.out.println();System.in.read();server.stop(); }一旦server啟動完成,worker就正常運行了,這樣所有準備接收客戶端請求的工作已經就緒。
實現客戶端:客戶端要做到工作是初始化發送到代理的請求。這是整個請求/應答過程的起點,并且通常在一個業務邏輯處理過程中觸發。這個過程可能是接受訂單,履行訂單,整合各類業務系統,財務狀況中的買入賣出等,不管是什么情況,請求/響應過程從發送一個消息開始。發送一個消息到代理需要標準的連接(connection),session,消息目的地(destination)以及消息生產者(producer),它們都是在client的start()方法中創建的。下面的的代碼清單中提供了完整的
示例:
啟動和停止響應/應答系統客戶端的方法
public void start() throws JMSException {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);connection = connectionFactory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination adminQueue = session.createQueue(requestQueue);producer = session.createProducer(adminQueue);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);tempDest = session.createTemporaryQueue();consumer = session.createConsumer(tempDest);consumer.setMessageListener(this); }public void stop() throws JMSException {producer.close();consumer.close();session.close();connection.close(); }消息生產者發送消息到請求隊列中,然后消息消費者監聽新創建的臨時隊列。下面的代碼中展示了實現客戶端的真正邏輯:
public void request(String request) throws JMSException {System.out.println("Requesting: " + request);TextMessage txtMessage = session.createTextMessage();txtMessage.setText(request);txtMessage.setJMSReplyTo(tempDest);String correlationId = UUID.randomUUID().toString();txtMessage.setJMSCorrelationID(correlationId);this.producer.send(txtMessage); } public void onMessage(Message message) {try{System.out.println("Received response for: " + ((TextMessage) message).getText());} catch(JMSException e) {e.printStackTrace();} }所示的request()方法使用請求內容創建一個消息并設置JMSReplyTo屬性值,接著發送這個消息到臨時隊列,最后設置correlation ID 屬性值。上述3個步驟很重要.在這個例子中,是使用一個隨機的UUID值來設置correlation ID的,也還可以使用其他任何ID生成器來生成這個ID。
接下就可以發送一個請求了,啟動客戶端也可以像啟動sever一樣,簡單的使用一個main方法即可,下面是代碼清單:
啟動請求/應答系統客戶端
public static void main(String[] args) throws Exception {Client client = new Client();client.start();int i = 0;while (i++ < 10) {client.request("REQUEST-" + i);}Thread.sleep(3000); //wait for repliesclient.stop(); }?
????如前文所述,這個是一個簡單的請求/應答系統的實現。因此,啟動客戶端以后,會發送10個請求到代理。運行這個實例程序需要兩個終端:一個用于運行server,另一個用于client,必須先運行server。sever通過Server類來實現,client通過Client類實現。因為這兩個類都是通過main方法初始化的,所以運行它們很容易。注意:到當client啟動后,送了10個請求用于激活請求/響應進程,然后收到了來自worker的響應。盡管這個例子很簡單,但是日后必將是你在其他業務中實現請求/響應系統的參考。
????使用請求/應答模式,代理將每秒鐘收到的來自無數的客戶端的成千上萬個請求全部分發到不同的主機中處理。?在生產系統中,會使用更多的代理實例用于備份,失效轉移以及負載均衡。這些代理也會被分布于很多的主機上。處理如此多請求的唯一方法是使用多工作者(worker)。因為消息發送者發送消息的速度可能比消息消費者接收并處理消息的速度快的多。所以就需要大量的工作者(worker),這些工作者同樣也分布于大量的主機上。
????使用多工作者的好處是任何的工作者都可以根據需要進行啟用或者停用,而整個系統不會收到影響。消息生產者和工作者會正常處理消息,即使她們當中的一些已經崩潰了,也不會影響系統運行。這正是那些大型系統可以處理海量負載的原因--使用前文介紹過的基于請求/應答模式的異步消息系統.
????JMS的API可以說是繁瑣的,因為它要求開發者書寫大量的初始化代碼用于初始化必要的JMS對象,包括connection, session, producer, consumer等等。使用Spring框架通過提供可靠的API來幫助開發者移除(類似于JMS對象初始化)的那些固定的代碼,以便簡化整個配置過程。這正式使用Spring框架帶來的好處。
轉載于:https://my.oschina.net/mclimber/blog/1510875
總結
以上是生活随笔為你收集整理的使用JMS实现请求/应答程序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是OOA/OOD
- 下一篇: 求一颗二叉树中两个节点的最低公共父节点