Active Object 并发模式在 Java 中的应用--转载
原文地址:http://www.ibm.com/developerworks/cn/java/j-lo-activeobject/
本文主要從以下兩個方面進行闡述:
- 使用 C++ 語言,來描述 Active Object 設計模式。
Java 類庫對于這樣一個典型的模式做了很好的類庫層面的封裝,因此對于 Java 的開發者來說,很多關于該設計模式本身的東西被屏蔽掉了。本文試圖使用 Native C++ 語言,幫助讀者從本質上對 Active Object 設計模式有一個更全面的認識。
- 結合 C++ 版本的 Active Object 設計模式,引領讀者對于 Active Object 設計模式在 Java 類庫中的支持,有一個更深刻的認識,幫助讀者正確并且有效地使用這些類庫。
預備知識
并發對象 (Concurrent Object)
在這里,我們先定義一下,什么是并發對象。不同于一般的對象,并發對象指的是該對象方法的調用與方法的執行不在同一個線程內,也即:該對象方法被異步執行。這其實是在多線程編程環境下典型的計算特征,線程引入了異步。從另一個角度來看,并發對象其實是面向對象的概念應用于多線程計算環境下的產物。
回頁首
Active Object 設計模式 C++ 描述
我們將從以下幾個方面來討論 Active Object 模式。
問題描述
我們都知道,基于多線程機制的并發編程模式可以極大提高應用的 QoS(Quality of Service)。典型的例子如,我們在開發服務器端的應用時,通行的做法就是通過多線程機制并發地服務客戶端提交上來的請求,以期提高服務器對客戶端的反應度 (Responsiveness)。同時,相比于單線程的應用,并發的多線程應用相對要復雜得多。在多線程的計算環境里,并發對象被所有的調用者線程所共享。一般來說,并發對象的設計實現需要考慮下面的幾個重要因素:
- 并發對象的任何一次的方法執行,不允許無限地或者長時間阻止其它方法的調用執行,從而影響應用的 QoS。
- 由于并發對象被調用者線程所共享,其內部狀態必須保證是線程安全的,必須受限于某些線程同步約束,并且這些約束對調用者來說是透明的,不可見的。從調用者的角度來看,并發對象與普通對象沒有多大區別。
- 并發對象的設計實現,應該能夠透明地利用底層平臺所提供的并發機制。這樣做的好處是,當并發對象運行在諸如多核處理器這類底層硬件平臺上時,我們的應用能夠充分挖掘底層平臺帶來的并發優勢,以獲得足夠好的應用性能。
我們使用 Active Object 設計模式來解決這些問題。
Active Object 設計模式的本質是解耦合方法的調用 (Method invocation) 與方法的執行 (Method execution),方法調用發生在調用者線程上下文中,而方法的執行發生在獨立于調用者線程的 Active Object 線程上下文中。并且重要的一點是,該方法與其它普通的對象成員方法對于調用者來說,沒有什么特別的不同。從運行時的角度來看,這里涉及到兩類線程,一個是調用者線程,另外一個是 Active Object 線程,我們會在下面更詳細地談到。
結構
在 Active Object 模式中,主要有以下幾種類型的參與者:
- 代理 (Proxy) :代理是 Active Object 所定義的對于調用者的公共接口。運行時,代理運行在調用者線程的上下文中,負責把調用者的方法調用轉換成相應的方法請求 (Method Request),并將其插入相應的 Activation List,最后返回給調用者 Future 對象。
- 方法請求:方法請求定義了方法執行所需要的上下文信息,諸如調用參數等。
- Activation List:負責存儲所有由代理創建的,等待執行的方法請求。從運行時來看,Activation List 會被包括調用者線程及其 Active Object 線程并發存取訪問,所以,Activation List 實現應當是線程安全的。
- 調度者 (Scheduler):調度者運行在 Active Object 線程中,調度者來決定下一個執行的方法請求,而調度策略可以基于很多種標準,比如根據方法請求被插入的順序 FIFO 或者 LIFO,比如根據方法請求的優先級等等。
- Servant: Servant 定義了 Active Object 的行為和狀態,它是 Proxy 所定義的接口的事實實現。
- Future: 調用者調用 Proxy 所定義的方法,獲得 Future 對象。調用者可以從該 Future 對象獲得方法執行的最終結果。在真實的實現里,Future 對象會保留一個私有的空間,用來存放 Servant 方法執行的結果。
執行序列圖
在 Active Object 設計模式中,在參與者之間將發生如下的協作過程:
圖 1. Active Object Sequence Diagram.
從圖 1 我們可以看到,步驟 1 到步驟 6 運行在調用者線程中,而步驟 7 到步驟 12 運行在 Active Object 的線程中。
實現
在本節中,我們給出 Active Object 的 C++ 示例實現。
調用者調用 Proxy 的 get() 方法,從 Active Object 獲得 Message。我們可以假定,在真實的應用中, get() 方法的實現受制于某些慢速的 IO 操作,比如需要通過 TCP Socket 從遠端的機器獲得 Message, 然后返回給調用者。所以我們使用 Active Object 來實現該應用,通過線程的并發達到提高應用的 QoS。
清單 1. MQ_Servant
class MQ_Servant { public:// Constructor and destructor.MQ_Servant (size_t mq_size);virtual ~MQ_Servant ();// Message queue implementation operations.void put (const Message &msg);Message get ();// Predicates.bool empty () const;bool full () const; private:// Internal queue representation, e.g., a circular// array or a linked list, that does not use any// internal synchronization mechanism. };MQ_Servant 是真正的服務提供者,實現了 Proxy 中定義的方法。put() 和 get() 方法用來操作底層的隊列。另外,Servant 的實現是純粹的應用邏輯實現,或者稱為商業邏輯實現,沒有混合任何的線程同步機制 , 這有利于我們進行應用邏輯的重用,而不需要考慮不同的線程同步機制。
清單 2. MQ_Proxy
class MQ_Proxy { public:// Bound the message queue size.enum { MQ_MAX_SIZE = 100 };MQ_Proxy (size_t size = MQ_MAX_SIZE):scheduler_ (size), servant_ (size) { }// Schedule <put> to execute on the active object.void put (const Message &msg) {Method_Request *mr = new Put(servant_,msg);scheduler_.insert (mr);}// Return a <Message_Future> as the "future" result of// an asynchronous <get> method on the active object.Message_Future get () {Message_Future result;Method_Request *mr = new Get (&servant_,result);scheduler_.insert (mr);return result;}// empty() and full() predicate implementations ... private:// The servant that implements the active object// methods and a scheduler for the message queue.MQ_Servant servant_;MQ_Scheduler scheduler_; };同一個進程中的多個調用者線程可以共享同一個 Proxy。
清單 3. Method_Request
class Method_Request { public:// Evaluate the synchronization constraint.virtual bool can_run () const = 0// Execute the method. virtual void call () = 0; }; // Inherites from Method_Request class Get : public Method_Request { public:Get (MQ_Servant *rep, const Message_Future &f):servant_ (rep), result_ (f) { }virtual bool can_run () const {// Synchronization constraint: cannot call a // <get> method until queue is not empty.return !servant_->empty ();}virtual void call () {// Bind dequeued message to the future result.result_ = servant_->get ();} private:MQ_Servant *servant_;Message_Future result_; };清單 4. Activation_List
class Activation_List { public:// Block for an "infinite" amount of time waiting// for <insert> and <remove> methods to complete.enum { INFINITE = -1 };// Define a "trait".typedef Activation_List_Iterator iterator;Activation_List ();// Insert <method_request> into the list, waiting up// to <timeout> amount of time for space to become// available in the queue. Throws the <System_Ex>// exception if <timeout> expires.void insert (Method_Request *method_request,Time_Value *timeout = 0);// Remove <method_request> from the list, waiting up// to <timeout> amount of time for a <method_request>// to be inserted into the list. Throws the// <System_Ex> exception if <timeout> expires.void remove (Method_Request *&method_request, Time_Value *timeout = 0);private:// Synchronization mechanisms, e.g., condition// variables and mutexes, and the queue implementation, // e.g., an array or a linked list, go here. };Activation List 的實際上就是一個線程同步機制保護下的 Method Request 隊列,對該隊列的所有操作 (insert/remove) 都應該是線程安全的。從本質上講,Activation List 所基于的就是典型的生產者 / 消費者并發編程模型,調用者線程作為生產者把 Method Request 放入該隊列,Active Object 線程作為消費者從該隊列拿出 Method Request, 并執行。
清單 5. MQ_Scheduler
class MQ_Scheduler { public:// Initialize the <Activation_List> and make <MQ_Scheduler>// run in its own thread of control.// we call this thread as Active Object thread.MQ_Scheduler (): act_list_() {// Spawn separate thread to dispatch method requests.// The following call is leveraging the parallelism available on native OS// transparentlyThread_Manager::instance ()->spawn (&svc_run,this);}// ... Other constructors/destructors, etc.// Put <Method_Request> into <Activation_List>. This// method runs in the thread of its client,i.e.// in the proxy's thread.void insert (Method_Request *mr) {act_list_.insert (mr);}// Dispatch the method requests on their servant// in its scheduler's thread of control.virtual void dispatch () {// Iterate continuously in a separate thread(Active Object thread).for (;;) {Activation_List::iterator request;// The iterator's <begin> method blocks// when the <Activation_List> is empty.for(request = act_list_.begin (); request != act_list_.end ();++request){// Select a method request whose// guard evaluates to true.if ((*request).can_run ()) {// Take <request> off the list.act_list_.remove (*request);(*request).call () ;delete *request;}// Other scheduling activities can go here,// e.g., to handle when no <Method_Request>s// in the <Activation_List> have <can_run>// methods that evaluate to true.}}}private:// List of pending Method_Requests.Activation_List act_list_;// Entry point into the new thread.static void *svc_run (void *arg) {MQ_Scheduler *this_obj = static_cast<MQ_Scheduler *> (args);this_obj->dispatch ();} };清單 6. Message_Future
class Message_Future { public:// Initializes <Message_Future> to// point to <message> immediately.Message_Future (const Message &message);//Other implementatio……// Block upto <timeout> time waiting to obtain result// of an asynchronous method invocation. Throws// <System_Ex> exception if <timeout> expires.Message result (Time_Value *timeout = 0) const; private://members definition here…… };事實上,對于調用者來說,可以通過以下的方式從 Future 對象獲得真實的執行結果 Message:
- 同步等待。調用者調用 Future 對象的 result() 方法同步等待,直到后端的 Servant 相應方法執行結束,并把結果存儲到了 Future 對象中來,result 返回,調用者獲得 Message。
- 同步超時等待。調用者調用 Future 對象的 result(timeout) 方法。如果過了 timeout 時間之后,后端的 Servant 相應方法執行仍未結束,則調用失敗,否則,調用者線程被喚醒,result 方法返回,調用者獲得 Message。
- 異步查詢。調用者可以通過調用 Future 對象定義的查詢方法 ( 清單 6 沒有提供相應的定義 ),查看真實的結果是否準備好了,如果準備好了,調用 result 方法,直接獲得 Message。
清單 7 是使用該 Active Object 的示例。
清單 7. Active Object 使用
MQ_Proxy message_queue;//Optioin 1. Obtain future and block thread until message arrives. Message_Future future = message_queue.get(); Message msg = future.result(); //Handle received message here handle(msg);//2. Obtain a future (does not block the client). Message_Future future = message_queue.get ();//The current thread is not blocked, do something else here... //Evaluate future and block if result is not available. Message msg = future.result (); //Handle received message here handle(msg);從清單 7 可以看到,MQ_Proxy 對于調用者而言,和一個普通的 C++ 定義的對象并沒有區別,并發的實現細節已經被隱藏。
回頁首
Java 對 Active Object 支持
Java JDK 1.3 引入了 java.util.Timer 和 java.util.TimerTask,提供了對 timer-based 并發任務支持,Timer 和 TimerTask 可以看作是 Active Object 設計模式在 Java 中的實現。不過,在這里我們不打算過多討論 Timer 及其 TimerTask。由于 Timer 和 TimerTask 的缺陷性,例如 Timer 使用單線程執行 TimerTask 導致的 Task 調度時間的不精確性等問題。從 Java1.5 開始,Java 建議使用 ScheduledThreadPoolExecutor 作為 Timer 的替代。
在這里,我們討論一下自 Java1.5 引入的 Executor Framework。Java1.5 的 Executor Framework 可以看作是 Active Object 設計模式在 Java 中的體現。不過 Java 的 Executor Framework 極大地簡化了我們前面所討論的 Active Object 所定義的模式。
Java 的 Executor Framework 是一套靈活強大的異步任務執行框架,它提供了標準的方式解耦合任務的提交與任務的執行。Java Executor 框架中的任務指的是實現了 Runnable 或者 Callable 接口的對象。Executor 的示例用法如清單 8 所示:
清單 8. Java Executor 示例代碼
public class TaskExecutionTcpServer {private static final int NTHREADS = 100;private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(80);while (true) {final Socket connection = socket.accept();Runnable task = new Runnable() {public void run() {handleRequest(connection);}public void handleRequest(Socket connection) {// Handle the incoming socket connection from //individual client.}};exec.execute(task);}} }在示例 8 中,我們創建了一個基于線程池的 Java Executor, 每當新的 TCP 連接進來的時候,我們就分配一個獨立的實現了 Runnable 任務來處理該連接,所有這些任務運行在我們創建的有 100 個線程的線程池上。
我們可以從 Active Object 設計模式的角度來審視一下 Java Executor 框架。Java Executor 框架以任務 (Task) 為中心,簡化了 Active Object 中的角色分工。可以看到,實現 Runnable 或者 Callable 接口的 Java Executor 任務整合了 Method Request 和 Servant 的角色 , 通過實現 run() 或者 call() 方法實現應用邏輯。Java Executor 框架并沒有顯式地定義 Proxy 接口,而是直接調用 Executor 提交任務,這里的 Executor 相當于 Active Object 中調度者角色。從調用者的角度來看,這看起來并不像是在調用一個普通對象方法,而是向 Executor 提交了一個任務。所以,在這個層面上說,并發的底層細節已經暴露給了調用者。對于 Java 的開發者來說,如果你不擔心這樣的底層并發細節直接暴露給調用者,或者說你的應用并不需要像對待普通對象一樣對待并發對象,Java 的 Executor 框架是一個很好的選擇。相反,如果你希望隱藏這樣的并發細節,希望像操縱普通對象一樣操縱并發對象,那你就需要如本文上節所描述的那樣,遵循 Active Object 設計原則 , 清晰地定義各個角色,實現自己的 Active Object 模式。
總而言之,Java Executor 框架簡化了 Active Object 所定義的模式,模糊了 Active Object 中角色的分工,其基于生產者 / 消費者模式,生產者和消費者基于任務相互協作。
回頁首
總結
最后,我們討論一下 Active Object 設計模式的優缺點。
Active Object 給我們的應用帶來的好處:
- 極大提高了應用的并發性以及簡化了線程同步帶來的復雜性。并發性的提高得益于調用者線程與 Active Object 線程的并發執行。簡化的線程同步復雜性主要表現在所有線程同步細節封裝在調度者內 ( 也就是 Java 的 Executor 對象 ),Active Object 調用者并不需要關心。
- 在 Active Object 中,方法的執行順序可以不同于方法的調用順序。用 Java 的話說,也就是任務執行的順序可以不同于任務提交的順序。在一定情況下,這可以幫助優化我們應用的性能,提高應用的 QoS 及其 Responsiveness。在 Java Executor 框架下,你可以根據當前的計算資源,確定優化的執行策略 (Execution Policy),該執行策略的內容包括:任務將分配在多少線程上執行,以什么順序執行,多少任務可以同時執行等等。
當然,Active Object 也有缺點:
- 額外的性能開銷。這涉及到從調用者線程到 Active Object 線程的上下文切換,線程同步,額外的內存拷貝等。
- 難于調試。Active Object 引入了方法的異步執行,從調試者的角度看,調試這樣的方法調用不像普通方法那樣直截了當,并且這其中涉及到了線程的調度,同步等。
轉載于:https://www.cnblogs.com/davidwang456/p/4485228.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Active Object 并发模式在 Java 中的应用--转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分布式计算的模式语言读后感--基线架构
- 下一篇: graylog2+syslog-ng+m