通用并发对象池
但是,有些對象在創(chuàng)建時肯定會付出高昂的代價。 諸如線程,數(shù)據(jù)庫連接對象等對象不是輕量級對象,并且創(chuàng)建起來會稍微貴一些。 在任何應用程序中,我們都需要使用上述多種對象。 因此,如果有一種非常容易的方法可以輕松創(chuàng)建和維護該類型的對象池,那么可以動態(tài)使用和重用對象,而不必擔心客戶端代碼對對象的生存周期的影響,那就太好了。
在實際編寫對象池的代碼之前,讓我們首先確定任何對象池必須回答的主要要求。
- 池必須允許客戶端使用對象(如果有)。
- 一旦客戶端將對象返回到池中,它必須重新使用對象。
- 如果需要,它必須能夠創(chuàng)建更多對象以滿足客戶不斷增長的需求。
- 它必須提供適當?shù)年P閉機制,以便在關閉時不會發(fā)生內(nèi)存泄漏。
毋庸置疑,以上幾點將構成我們向客戶公開的界面的基礎。
因此,我們的接口聲明如下:
特意使上述接口非常簡單通用,以支持任何類型的對象。 它提供了從池中獲取對象或?qū)ο蠓祷爻刂械姆椒ā?它還提供了一種關閉機制來處理對象。
現(xiàn)在,我們嘗試創(chuàng)建上述接口的實現(xiàn)。 但在此之前,必須注意,理想的release()方法將首先嘗試檢查客戶端返回的對象是否仍然可重用,這一點很重要。 如果是,則它將返回到池中,否則必須丟棄該對象。 我們希望Pool接口的每個實現(xiàn)都遵循此規(guī)則。 因此,在創(chuàng)建具體的實現(xiàn)之前,我們創(chuàng)建一個抽象的實現(xiàn)帽子,將這種限制強加給后續(xù)的實現(xiàn)。 我們的抽象實現(xiàn)將被稱為Surprise,AbstractPool,其定義如下:
package com.test.pool;/*** Represents an abstract pool, that defines the procedure* of returning an object to the pool.* * @author Swaranga** @param < T > the type of pooled objects.*/ abstract class AbstractPool < T > implements Pool < T > {/*** Returns the object to the pool. * The method first validates the object if it is* re-usable and then puts returns it to the pool.* * If the object validation fails, * some implementations* will try to create a new one * and put it into the pool; however * this behaviour is subject to change * from implementation to implementation* */@Overridepublic final void release(T t){if(isValid(t)){returnToPool(t);}else{handleInvalidReturn(t);}}protected abstract void handleInvalidReturn(T t);protected abstract void returnToPool(T t);protected abstract boolean isValid(T t); }在上面的類中,我們強制對象池必須先驗證對象,然后再將其返回到池中。 為了自定義其池的行為,實現(xiàn)可以自由選擇它們實現(xiàn)三種抽象方法的方式。 他們將決定使用自己的邏輯,如何檢查對象是否對重用有效[validate()方法,如果客戶端返回的對象無效[該方法,handleInvalidReturn()方法]以及實際邏輯,該怎么辦。將有效對象返回到池中[returnToPool()方法]。
現(xiàn)在有了上面的類集,我們幾乎可以進行具體的實現(xiàn)了。 但是要注意的是,由于上述類旨在支持通用對象池,因此上述類的通用實現(xiàn)將不知道如何驗證對象[因為對象將是通用的:-)。 因此,我們需要其他可以幫助我們的東西。
我們實際上需要的是一種驗證對象的常用方法,這樣,具體的Pool實現(xiàn)就不必擔心正在驗證的對象的類型。 因此,我們引入了一個新的接口Validator,該接口定義了驗證對象的方法。 我們對Validator接口的定義如下:
package com.test.pool;/*** Represents the functionality to * validate an object of the pool* and to subsequently perform cleanup activities.* * @author Swaranga** @param < T > the type of objects to validate and cleanup.*/public static interface Validator < T >{/*** Checks whether the object is valid.* * @param t the object to check.* * @return true * if the object is valid else false .*/public boolean isValid(T t);/*** Performs any cleanup activities * before discarding the object.* For example before discarding * database connection objects,* the pool will want to close the connections. * This is done via the * invalidate() method.* * @param t the object to cleanup*/public void invalidate(T t);}上面的接口定義了檢查對象是否有效的方法,以及使對象和對象無效的方法。 當我們要丟棄對象并清除該實例使用的任何內(nèi)存時,應使用invalidate方法。 請注意,此接口本身沒有什么意義,僅在對象池的上下文中使用時才有意義。 因此,我們在頂級Pool接口中定義此接口。 這類似于Java Collections庫中的Map和Map.Entry接口。 因此,我們的Pool接口如下所示:
package com.test.pool;/*** Represents a cached pool of objects.* * @author Swaranga** @param < T > the type of object to pool.*/ public interface Pool< T > {/*** Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation.* * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available.* In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown* when the thread waits for an object to become available.* * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not.* If any object is available the call returns it * else the call returns < code >null< /code >.* * The validity of the objects are determined using the* {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >.* * @return T one of the pooled objects.*/T get();/*** Releases the object and puts it back to the pool.* * The mechanism of putting the object back to the pool is* generally asynchronous, * however future implementations might differ.* * @param t the object to return to the pool*/void release(T t);/*** Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources.* Releasing resources are done * via the < code >invalidate()< /code >* method of the {@link Validator} interface.*/void shutdown();/*** Represents the functionality to * validate an object of the pool* and to subsequently perform cleanup activities.* * @author Swaranga** @param < T > the type of objects to validate and cleanup.*/public static interface Validator < T >{/*** Checks whether the object is valid.* * @param t the object to check.* * @return true * if the object is valid else false .*/public boolean isValid(T t);/*** Performs any cleanup activities * before discarding the object.* For example before discarding * database connection objects,* the pool will want to close the connections. * This is done via the * invalidate() method.* * @param t the object to cleanup*/public void invalidate(T t);} }我們幾乎準備好具體實施了。 但是在此之前,我們需要一種最終的武器,它實際上是對象池中最重要的武器。 這被稱為“創(chuàng)建新對象的能力”。c我們的對象池將是通用的,它們必須具有如何創(chuàng)建新對象以填充其池的知識。 此功能也必須不依賴于對象池的類型,并且必須是創(chuàng)建新對象的常用方法。 完成此操作的方法將是一個稱為ObjectFactory的接口,該接口僅定義一種方法,即“如何創(chuàng)建新對象”。 我們的ObjectFactory接口如下:
package com.test.pool;/*** Represents the mechanism to create * new objects to be used in an object pool.* * @author Swaranga** @param < T > the type of object to create. */ public interface ObjectFactory < T > {/*** Returns a new instance of an object of type T.* * @return T an new instance of the object of type T*/public abstract T createNew(); }最后,我們完成了我們的幫助程序類,現(xiàn)在我們將創(chuàng)建Pool接口的具體實現(xiàn)。 因為我們想要一個可以在并發(fā)應用程序中使用的池,所以我們將創(chuàng)建一個阻塞池,如果池中沒有可用的對象,它將阻塞客戶端。 阻塞機制將無限期阻塞,直到對象可用為止。 這種實現(xiàn)方式催生了另一個方法,該方法將僅在給定的超時時間段內(nèi)阻塞,如果在返回該對象的超時之前有任何對象可用,否則在超時之后而不是永遠等待,則返回一個空對象。 此實現(xiàn)類似于Java Concurrency API的LinkedBlockingQueue實現(xiàn),因此在實現(xiàn)實際的類之前,我們公開另一個實現(xiàn)BlockingPool,該實現(xiàn)類似于Java Concurrency API的BlockingQueue接口。
因此,Blockingpool接口聲明如下:
package com.test.pool;import java.util.concurrent.TimeUnit;/*** Represents a pool of objects that makes the * requesting threads wait if no object is available.* * @author Swaranga** @param < T > the type of objects to pool.*/ public interface BlockingPool < T > extends Pool < T > {/*** Returns an instance of type T from the pool.* * The call is a blocking call, * and client threads are made to wait* indefinitely until an object is available. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented.* * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available,* the current implementations * sets the interrupted state of the thread * to true and returns null. * However this is subject to change * from implementation to implementation.* * @return T an instance of the Object * of type T from the pool.*/T get();/*** Returns an instance of type T from the pool, * waiting up to the* specified wait time if necessary * for an object to become available..* * The call is a blocking call, * and client threads are made to wait* for time until an object is available * or until the timeout occurs. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented.* * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available,* the current implementations * set the interrupted state of the thread * to true and returns null. * However this is subject to change * from implementation to implementation.* * * @param time amount of time to wait before giving up, * in units of unit* @param unit a TimeUnit determining * how to interpret the* timeout parameter* * @return T an instance of the Object * of type T from the pool.* * @throws InterruptedException * if interrupted while waiting*/T get(long time, TimeUnit unit) throws InterruptedException; }我們的BoundedBlockingPool實現(xiàn)將如下所示:
package com.test.pool;import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit;public final class BoundedBlockingPool < T > extends AbstractPool < T >implements BlockingPool < T > {private int size;private BlockingQueue < T > objects;private Validator < T > validator;private ObjectFactory < T > objectFactory;private ExecutorService executor = Executors.newCachedThreadPool();private volatile boolean shutdownCalled;public BoundedBlockingPool(int size, Validator < T > validator, ObjectFactory < T > objectFactory){super();this.objectFactory = objectFactory;this.size = size;this.validator = validator;objects = new LinkedBlockingQueue < T >(size);initializeObjects();shutdownCalled = false;}public T get(long timeOut, TimeUnit unit){if(!shutdownCalled){T t = null;try{t = objects.poll(timeOut, unit);return t;}catch(InterruptedException ie){Thread.currentThread().interrupt();}return t;}throw new IllegalStateException('Object pool is already shutdown');}public T get(){if(!shutdownCalled){T t = null;try{t = objects.take();}catch(InterruptedException ie){Thread.currentThread().interrupt();}return t;}throw new IllegalStateException('Object pool is already shutdown');}public void shutdown(){shutdownCalled = true;executor.shutdownNow();clearResources();}private void clearResources(){for(T t : objects){validator.invalidate(t);}}@Overrideprotected void returnToPool(T t){if(validator.isValid(t)){executor.submit(new ObjectReturner(objects, t));}}@Overrideprotected void handleInvalidReturn(T t){}@Overrideprotected boolean isValid(T t){return validator.isValid(t);}private void initializeObjects(){for(int i = 0; i < size; i++){objects.add(objectFactory.createNew());}}private class ObjectReturner < E > implements Callable < Void >{private BlockingQueue < E > queue;private E e;public ObjectReturner(BlockingQueue < E > queue, E e){this.queue = queue;this.e = e;}public Void call(){while(true){try{queue.put(e);break;}catch(InterruptedException ie){Thread.currentThread().interrupt();}}return null;}} }上面是一個非常基本的對象池,在內(nèi)部由LinkedBlockingQueue支持。 唯一感興趣的方法是returnToPool()方法。 由于內(nèi)部存儲是一個阻塞池,因此,如果我們嘗試將返回的元素直接放入LinkedBlockingPool中,則如果隊列已滿,它可能會阻塞客戶端。 但是我們不希望對象池的客戶端僅為了執(zhí)行普通任務(例如將對象返回到池)而阻塞。 因此,我們完成了將對象作為異步任務插入LinkedBlockingQueue的實際任務,并將其提交給Executor實例,以便客戶端線程可以立即返回。
現(xiàn)在,我們將上述對象池用于代碼中。 我們將使用對象池來池一些數(shù)據(jù)庫連接對象。 因此,我們將需要一個驗證器來驗證我們的數(shù)據(jù)庫連接對象。
我們的JDBCConnectionValidator將如下所示:
package com.test;import java.sql.Connection; import java.sql.SQLException;import com.test.pool.Pool.Validator;public final class JDBCConnectionValidator implements Validator < Connection > {public boolean isValid(Connection con){ if(con == null){return false;}try{return !con.isClosed();}catch(SQLException se){return false;}}public void invalidate(Connection con){try{con.close();}catch(SQLException se){}} }我們的JDBCObjectFactory將使對象池能夠創(chuàng)建新對象,如下所示:
package com.test;import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException;import com.test.pool.ObjectFactory;public class JDBCConnectionFactory implements ObjectFactory < Connection > {private String connectionURL;private String userName;private String password;public JDBCConnectionFactory(String driver, String connectionURL, String userName, String password){super();try{Class.forName(driver);}catch(ClassNotFoundException ce){throw new IllegalArgumentException('Unable to find driver in classpath', ce);}this.connectionURL = connectionURL;this.userName = userName;this.password = password;}public Connection createNew(){ try{return DriverManager.getConnection(connectionURL, userName, password);}catch(SQLException se){throw new IllegalArgumentException('Unable to create new connection', se);}} }現(xiàn)在,我們使用上面的Validator和ObjectFactory創(chuàng)建一個JDBC對象池:
package com.test; import java.sql.Connection;import com.test.pool.Pool; import com.test.pool.PoolFactory;public class Main {public static void main(String[] args){Pool < Connection > pool = new BoundedBlockingPool < Connection > (10, new JDBCConnectionValidator(),new JDBCConnectionFactory('', '', '', ''));//do whatever you like} }作為閱讀整個帖子的獎勵。 我將提供Pool接口的另一種實現(xiàn),它實際上是一個非阻塞對象池。 此實現(xiàn)與上一個實現(xiàn)的唯一區(qū)別在于,如果某個元素不可用,則此實現(xiàn)不會阻塞客戶端,而是返回null。 它去了:
package com.test.pool;import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Semaphore;public class BoundedPool < T > extends AbstractPool < T > {private int size;private Queue < T > objects;private Validator < T > validator;private ObjectFactory < T > objectFactory;private Semaphore permits;private volatile boolean shutdownCalled;public BoundedPool(int size, Validator < T > validator, ObjectFactory < T > objectFactory){super();this.objectFactory = objectFactory;this.size = size;this.validator = validator;objects = new LinkedList < T >();initializeObjects();shutdownCalled = false;}@Overridepublic T get(){T t = null;if(!shutdownCalled){if(permits.tryAcquire()){t = objects.poll();}}else{throw new IllegalStateException('Object pool already shutdown');}return t;}@Overridepublic void shutdown(){shutdownCalled = true;clearResources();}private void clearResources(){for(T t : objects){validator.invalidate(t);}}@Overrideprotected void returnToPool(T t){boolean added = objects.add(t);if(added){permits.release();}}@Overrideprotected void handleInvalidReturn(T t){}@Overrideprotected boolean isValid(T t){return validator.isValid(t);}private void initializeObjects(){for(int i = 0; i < size; i++){objects.add(objectFactory.createNew());}} }考慮到我們現(xiàn)在有兩個強大的實現(xiàn),最好讓用戶通過帶有有意義名稱的工廠創(chuàng)建我們的池。 這是工廠:
package com.test.pool;import com.test.pool.Pool.Validator;/*** Factory and utility methods for * {@link Pool} and {@link BlockingPool} classes * defined in this package. * This class supports the following kinds of methods:**- *
- 創(chuàng)建并返回{@link Pool}接口的默認非阻塞*實現(xiàn)的方法。 *
- * *
- 創(chuàng)建并返回{@link BlockingPool}接口的默認實現(xiàn)的方法。 *
- *
因此,我們的客戶現(xiàn)在可以以更易讀的方式創(chuàng)建對象池:
package com.test; import java.sql.Connection;import com.test.pool.Pool; import com.test.pool.PoolFactory;public class Main {public static void main(String[] args){Pool < Connection > pool = PoolFactory.newBoundedBlockingPool(10, new JDBCConnectionFactory('', '', '', ''), new JDBCConnectionValidator());//do whatever you like} }這樣就結束了我們的長篇文章。 這個早就該了。 隨時使用,更改,添加更多實現(xiàn)。
祝您編程愉快,別忘了分享!
參考: The Java HotSpot博客上的JCG合作伙伴 Sarma Swaranga 提供的通用并發(fā)對象池 。
翻譯自: https://www.javacodegeeks.com/2012/09/a-generic-and-concurrent-object-pool.html
總結
- 上一篇: 花鱼怎么做才好吃 花鱼的烹饪方法
- 下一篇: 先打雷还是先闪电 到底是先先打雷还是先闪