缓存框架OSCache部分源码分析
? 在并發量比較大的場景,如果采用直接訪問數據庫的方式,將會對數據庫帶來巨大的壓力,嚴重的情況下可能會導致數據庫不可用狀態,并且時間的消耗也是不能容忍的,尤其對于某些獲取起來比較昂貴的數據。在這種情況下,一般采用緩存的方式。將經常訪問的熱點數據提前加載到內存中,這樣能夠大大降低數據庫的壓力。
? OSCache是一個開源的緩存框架,雖然現在已經停止維護了,但是對于OSCache的實現還是值得學習和借鑒的。下面通過OSCache的部分源碼分析OSCache的設計思想。
緩存數據結構
? 通常緩存都是通過<K,V>這種數據結構存儲,但緩存都是應用在多線程的場景下,需要保證線程安全。Java中可以選擇HashTable、ConcurrentHashMap、synchronizedMap等。OSCache本質上使用的HashTable實現的,具體實現代碼在com.opensymphony.oscache.base.algorithm.AbstractConcurrentReadCache中。由于HashTable在保證線程安全上采用的加鎖整個數據塊,因此,適合讀多寫少(mostly-concurrent reading, but exclusive writing)的場景(OSCache對原始的HashTable進行了優化,下面會講到)。如果寫很多的話,可以采用ConcurrentHashMap數據結構。
獲得緩存內容
//從緩存中獲取指定key對應的內容 public?Object?getFromCache(String?key,?int?refreshPeriod,?String?cronExpiry)?throws?NeedsRefreshException?{//首先嘗試獲取內容,如果獲取不到,則新建一個CacheEntry對象CacheEntry?cacheEntry?=?this.getCacheEntry(key,?null,?null);Object?content?=?cacheEntry.getContent();CacheMapAccessEventType?accessEventType?=?CacheMapAccessEventType.HIT;boolean?reload?=?false;//?檢查緩存是否過期,如果過期分為以下幾種情況處理if?(this.isStale(cacheEntry,?refreshPeriod,?cronExpiry))?{//獲取更新狀態,如果沒有,新建一個同時引用計數默認為1EntryUpdateState?updateState?=?getUpdateState(key);try?{synchronized?(updateState)?{if?(updateState.isAwaitingUpdate()?||?updateState.isCancelled())?{//?如果狀態為等待刷新或者已經取消刷新,說明當前沒有其他線程對其進行刷新操作//?因此這里啟動刷新操作(這里會將狀態更新為刷新中同時引用計數+1)updateState.startUpdate();//如果是新建的CacheEntry對象(即之前未緩存該key對應的對象)if?(cacheEntry.isNew())?{//設置命中狀態為未命中accessEventType?=?CacheMapAccessEventType.MISS;}?else?{//否則說明雖然命中了,但是需要刷新accessEventType?=?CacheMapAccessEventType.STALE_HIT;}}?else?if?(updateState.isUpdating())?{//?如果更新狀態為刷新中,說明另有一個線程正對該緩存對象執行刷新操作//?此時如果是新建的CacheEntry對象或者同步模式設置為true,那么該線程將阻塞//通過putInCache或者cancelUpdate可以讓線程繼續運行//?否則獲取到的很有可能是臟數據if?(cacheEntry.isNew()?||?blocking)?{do?{try?{updateState.wait();}?catch?(InterruptedException?e)?{}}?while?(updateState.isUpdating());//如果更新狀態變成了取消,說明另外一個線程取消了刷新緩存操作,那么讓該線程嘗試刷新if?(updateState.isCancelled())?{//更新狀態設置為更新中并將引用計數+1updateState.startUpdate();if?(cacheEntry.isNew())?{accessEventType?=?CacheMapAccessEventType.MISS;}?else?{accessEventType?=?CacheMapAccessEventType.STALE_HIT;}}?else?if?(updateState.isComplete())?{reload?=?true;}?else?{log.error("Invalid?update?state?for?cache?entry?"?+?key);}}}?else?{reload?=?true;}}}?finally?{//將引用計數-1同時檢查如果引用計數=0,將updateState移除releaseUpdateState(updateState,?key);}}//?如果該標志位為true,說明緩存一定刷新了if?(reload)?{cacheEntry?=?(CacheEntry)?cacheMap.get(key);if?(cacheEntry?!=?null)?{content?=?cacheEntry.getContent();}?else?{log.error("Could?not?reload?cache?entry?after?waiting?for?it?to?be?rebuilt");}}dispatchCacheMapAccessEvent(accessEventType,?cacheEntry,?null);//?如果緩存不存在或者緩存過期將拋出需要刷新的異常if?(accessEventType?!=?CacheMapAccessEventType.HIT)?{throw?new?NeedsRefreshException(content);}return?content;}? 從上面可以看到EntryUpdateState很關鍵。EntryUpdateState用來標記某個key對應的緩存的更新狀態以及線程引用計數(可以理解為一個計數器),并且每一個key對應一個EntryUpdateState。如果緩存存在并且沒有過期,EntryUpdateState為空。OSCache使用的HashTable相對于原始的HashTable在get操作中是沒有synchronize關鍵字的,而為了防止并發問題,所以引入了EntryUpdateState這個數據結構。這樣做的目的就是防止過多的使用synchronize,從而對性能不會造成很大的影響。
? 查看定義如下:
//默認public?static?final?int?NOT_YET_UPDATING?=?-1;public?static?final?int?UPDATE_IN_PROGRESS?=?0;public?static?final?int?UPDATE_COMPLETE?=?1;public?static?final?int?UPDATE_CANCELLED?=?2;int?state?=?NOT_YET_UPDATING;//引用計數private?int?nbConcurrentUses?=?1;? 這里的引用計數,代表了當前有多少線程在緩存更新或存入過程中進行訪問。
? 通過上面從緩存獲取指定key的代碼可以發現一個問題:當緩存不存在或者緩存過期的情況下,都會拋出NeedsRefreshException的異常,在這種情況下,如果blocking設置為true(通常設置為true),其他訪問的線程將處于阻塞狀態,直到緩存更新完畢才會繼續運行,倘若這里處理不當,將會導致死鎖的發生。
? 因此在該異常產生時,需要進行緩存的刷新操作,官方給出了兩種方法:
//第一種:with?fail?over String?myKey?=?"myKey"; String?myValue; int?myRefreshPeriod?=?1000; try?{//?Get?from?the?cachemyValue?=?(String)?admin.getFromCache(myKey,?myRefreshPeriod); }?catch?(NeedsRefreshException?nre)?{try?{//?Get?the?value?(probably?by?calling?an?EJB)myValue?=?"This?is?the?content?retrieved.";//?Store?in?the?cacheadmin.putInCache(myKey,?myValue);}?catch?(Exception?ex)?{//?We?have?the?current?content?if?we?want?fail-over.myValue?=?(String)?nre.getCacheContent();//?It?is?essential?that?cancelUpdate?is?called?if?the//?cached?content?is?not?rebuiltadmin.cancelUpdate(myKey);} }//第二種:without?fail?over String?myKey?=?"myKey"; String?myValue; int?myRefreshPeriod?=?1000; try?{//?Get?from?the?cachemyValue?=?(String)?admin.getFromCache(myKey,?myRefreshPeriod); }?catch?(NeedsRefreshException?nre)?{try?{//?Get?the?value?(probably?by?calling?an?EJB)myValue?=?"This?is?the?content?retrieved.";//?Store?in?the?cacheadmin.putInCache(myKey,?myValue);updated?=?true;}?finally?{if?(!updated)?{//?It?is?essential?that?cancelUpdate?is?called?if?the//?cached?content?could?not?be?rebuiltadmin.cancelUpdate(myKey);}} }? 正如之前的代碼,如果這里不調用putInCache或者cancelUpdate,其他訪問該緩存的線程將會由于得不到資源始終處于阻塞狀態,導致死鎖的發生。因此這里是一個非常重要的關注點。只有調用了putInCache或者cancelUpdate方法,阻塞的線程才會開始運行。
? 下面看一下putInCache和cancelUpdate方法具體做了什么:
public?void?putInCache(String?key,?Object?content,?String[]?groups,?EntryRefreshPolicy?policy,?String?origin)?{CacheEntry?cacheEntry?=?this.getCacheEntry(key,?policy,?origin);boolean?isNewEntry?=?cacheEntry.isNew();//?首先判斷緩存中是否已經存在if?(!isNewEntry)?{cacheEntry?=?new?CacheEntry(key,?policy);}cacheEntry.setContent(content);cacheEntry.setGroups(groups);cacheMap.put(key,?cacheEntry);//?更新狀態及引用計數,通知其它阻塞線程可以獲取緩存了completeUpdate(key);//......//......} } protected?void?completeUpdate(String?key)?{EntryUpdateState?state;synchronized?(updateStates)?{state?=?(EntryUpdateState)?updateStates.get(key);if?(state?!=?null)?{synchronized?(state)?{//更新狀態為UPDATE_COMPLETE,引用計數-1int?usageCounter?=?state.completeUpdate();//喚醒其它等待該緩存資源的線程state.notifyAll();checkEntryStateUpdateUsage(key,?state,?usageCounter);}}?else?{//如果putInCache方法直接調用(如不是因NeedRefreshException異常調用)這樣EntryUpdateState將為null,不執行操作?}}}?cancelUpdate的邏輯和putInCache基本相同:
public?void?cancelUpdate(String?key)?{EntryUpdateState?state;if?(key?!=?null)?{synchronized?(updateStates)?{state?=?(EntryUpdateState)?updateStates.get(key);if?(state?!=?null)?{synchronized?(state)?{//更新狀態為UPDATE_CANCELLED,引用計數-1int?usageCounter?=?state.cancelUpdate();state.notify();checkEntryStateUpdateUsage(key,?state,?usageCounter);}}?else?{if?(log.isErrorEnabled())?{log.error("internal?error:?expected?to?get?a?state?from?key?["?+?key?+?"]");}}}}}?從上面的代碼可以看到,當發生需要刷新緩存(NeedsRefreshException)的異常時,需要通過putInCache()方法進行緩存的更新或者cancelUpdate()方法放棄刷新緩存,從而釋放資源,喚醒其它阻塞的線程。
緩存淘汰(替換)策略
? 因為我們的內存不是無限的,緩存不可能無限的擴大,因此在緩存占滿時,我們需要將緩存中一些“不重要”的內容剔除,從而騰出空間緩存新的內容。如何丈量這個“不重要”,就是我們需要考慮的緩存淘汰(替換)策略。
? 一般有以下策略:
Least Frequently Used(LFU):計算每個緩存對象的使用頻率,將頻率最低的剔除;
Least Recently User(LRU):最近最少使用,具體是將最近訪問的內容始終放在最頂端,一直未訪問或者最久未訪問的內容放在最底端,當需要替換的時候,只需將最底端的剔除即可,這樣可以使得最常訪問的內容始終在緩存中,使用比較廣泛,OSCache中默認也是采用該方法。LRU的這種特性,在Java中很容易通過LinkedHashMap實現,具體實現方法可以參考下面的介紹。
First in First out(FIFO):先進先出。實現起來最為簡單,但是不適用。
Random Cache:隨機替換。
? 當然還有很多替換算法,這里就不一一列舉了。僅就最常用的LRU算法進行介紹。
Java中的LinkedHashMap可以保持插入順序或者訪問順序,對于第二個特性,跟LRU的機制很相似,因此,可以很簡單的采用LinkedHashMap來實現LRU算法。
? 查看LinkedHashMap的定義,有下面一個參數:
final?boolean?accessOrder;?再看幾個構造函數的定義:
public?LinkedHashMap(int?initialCapacity,?float?loadFactor)?{super(initialCapacity,?loadFactor);accessOrder?=?false; } public?LinkedHashMap(int?initialCapacity)?{super(initialCapacity);accessOrder?=?false; } public?LinkedHashMap()?{super();accessOrder?=?false; } public?LinkedHashMap(Map<??extends?K,???extends?V>?m)?{super();accessOrder?=?false;putMapEntries(m,?false); } public?LinkedHashMap(int?initialCapacity,float?loadFactor,boolean?accessOrder)?{super(initialCapacity,?loadFactor);this.accessOrder?=?accessOrder; }? 可以看到,除了最后一個構造函數,其余的accessOrder默認為false。當accessOrder為false時,LinkedHashMap保持插入順序,而accessOrder如果為true,將保持訪問順序,因此這正是關鍵點。具體如何保持插入順序或者訪問順序,可以參考LinkedHashMap的實現代碼,并不復雜。
? 僅僅是保持訪問順序還不行,我們還要淘汰最近最少使用的對象。LinkedHashMap重寫了父類HashMap的afterNodeInsertion方法:
void?afterNodeInsertion(boolean?evict)?{?//?possibly?remove?eldestLinkedHashMap.Entry<K,V>?first;if?(evict?&&?(first?=?head)?!=?null?&&?removeEldestEntry(first))?{K?key?=?first.key;removeNode(hash(key),?key,?null,?false,?true);}}/***?Returns?<tt>true</tt>?if?this?map?should?remove?its?eldest?entry.*?This?method?is?invoked?by?<tt>put</tt>?and?<tt>putAll</tt>?after*?inserting?a?new?entry?into?the?map.??It?provides?the?implementor*?with?the?opportunity?to?remove?the?eldest?entry?each?time?a?new?one*?is?added.??This?is?useful?if?the?map?represents?a?cache:?it?allows*?the?map?to?reduce?memory?consumption?by?deleting?stale?entries.**?<p>Sample?use:?this?override?will?allow?the?map?to?grow?up?to?100*?entries?and?then?delete?the?eldest?entry?each?time?a?new?entry?is*?added,?maintaining?a?steady?state?of?100?entries.*?<pre>*?????private?static?final?int?MAX_ENTRIES?=?100;**?????protected?boolean?removeEldestEntry(Map.Entry?eldest)?{*????????return?size()?>?MAX_ENTRIES;*?????}*?</pre>**?<p>This?method?typically?does?not?modify?the?map?in?any?way,*?instead?allowing?the?map?to?modify?itself?as?directed?by?its*?return?value.??It?<i>is</i>?permitted?for?this?method?to?modify*?the?map?directly,?but?if?it?does?so,?it?<i>must</i>?return*?<tt>false</tt>?(indicating?that?the?map?should?not?attempt?any*?further?modification).??The?effects?of?returning?<tt>true</tt>*?after?modifying?the?map?from?within?this?method?are?unspecified.**?<p>This?implementation?merely?returns?<tt>false</tt>?(so?that?this*?map?acts?like?a?normal?map?-?the?eldest?element?is?never?removed).**?@param????eldest?The?least?recently?inserted?entry?in?the?map,?or?if*???????????this?is?an?access-ordered?map,?the?least?recently?accessed*???????????entry.??This?is?the?entry?that?will?be?removed?it?this*???????????method?returns?<tt>true</tt>.??If?the?map?was?empty?prior*???????????to?the?<tt>put</tt>?or?<tt>putAll</tt>?invocation?resulting*???????????in?this?invocation,?this?will?be?the?entry?that?was?just*???????????inserted;?in?other?words,?if?the?map?contains?a?single*???????????entry,?the?eldest?entry?is?also?the?newest.*?@return???<tt>true</tt>?if?the?eldest?entry?should?be?removed*???????????from?the?map;?<tt>false</tt>?if?it?should?be?retained.*/protected?boolean?removeEldestEntry(Map.Entry<K,V>?eldest)?{return?false;}? removeEldestEntry方法默認返回false,即默認不移除。因此我們只要在這里加以判斷:如果緩存已經占滿,返回true,就可以將最近最少使用的對象移除了。因此,通過使用LinkedHashMap,僅需要非常簡單的修改即可實現LRU算法。
? 下面附上LRU的實現代碼:
import?java.util.LinkedHashMap; import?java.util.Collection; import?java.util.Map; import?java.util.ArrayList;/***?An?LRU?cache,?based?on?<code>LinkedHashMap</code>.**?<p>*?This?cache?has?a?fixed?maximum?number?of?elements?(<code>cacheSize</code>).*?If?the?cache?is?full?and?another?entry?is?added,?the?LRU?(least?recently*?used)?entry?is?dropped.**?<p>*?This?class?is?thread-safe.?All?methods?of?this?class?are?synchronized.**?<p>*?Author:?Christian?d'Heureuse,?Inventec?Informatik?AG,?Zurich,?Switzerland<br>*?Multi-licensed:?EPL?/?LGPL?/?GPL?/?AL?/?BSD.*/ public?class?LRUCache<K,?V>?{private?static?final?float?hashTableLoadFactor?=?0.75f;private?LinkedHashMap<K,?V>?map;private?int?cacheSize;/***?Creates?a?new?LRU?cache.?在該方法中,new?LinkedHashMap<K,V>(hashTableCapacity,*?hashTableLoadFactor,?true)中,true代表使用訪問順序*?*?@param?cacheSize*????????????the?maximum?number?of?entries?that?will?be?kept?in?this?cache.*/public?LRUCache(int?cacheSize)?{this.cacheSize?=?cacheSize;int?hashTableCapacity?=?(int)?Math.ceil(cacheSize?/?hashTableLoadFactor)?+?1;map?=?new?LinkedHashMap<K,?V>(hashTableCapacity,?hashTableLoadFactor,true)?{//?(an?anonymous?inner?class)private?static?final?long?serialVersionUID?=?1;@Overrideprotected?boolean?removeEldestEntry(Map.Entry<K,?V>?eldest)?{return?size()?>?LRUCache.this.cacheSize;}};}/***?Retrieves?an?entry?from?the?cache.<br>*?The?retrieved?entry?becomes?the?MRU?(most?recently?used)?entry.*?*?@param?key*????????????the?key?whose?associated?value?is?to?be?returned.*?@return?the?value?associated?to?this?key,?or?null?if?no?value?with?this*?????????key?exists?in?the?cache.*/public?synchronized?V?get(K?key)?{return?map.get(key);}/***?Adds?an?entry?to?this?cache.?The?new?entry?becomes?the?MRU?(most?recently*?used)?entry.?If?an?entry?with?the?specified?key?already?exists?in?the*?cache,?it?is?replaced?by?the?new?entry.?If?the?cache?is?full,?the?LRU*?(least?recently?used)?entry?is?removed?from?the?cache.*?*?@param?key*????????????the?key?with?which?the?specified?value?is?to?be?associated.*?@param?value*????????????a?value?to?be?associated?with?the?specified?key.*/public?synchronized?void?put(K?key,?V?value)?{map.put(key,?value);}/***?Clears?the?cache.*/public?synchronized?void?clear()?{map.clear();}/***?Returns?the?number?of?used?entries?in?the?cache.*?*?@return?the?number?of?entries?currently?in?the?cache.*/public?synchronized?int?usedEntries()?{return?map.size();}/***?Returns?a?<code>Collection</code>?that?contains?a?copy?of?all?cache*?entries.*?*?@return?a?<code>Collection</code>?with?a?copy?of?the?cache?content.*/public?synchronized?Collection<Map.Entry<K,?V>>?getAll()?{return?new?ArrayList<Map.Entry<K,?V>>(map.entrySet());} }?最后,緩存在使用過程中,需要考慮一致性問題。緩存的刷新就是為了保持一致性。具體如何去刷新,需要根據具體的使用場景進行設計。
轉載于:https://blog.51cto.com/jincheng/1826160
總結
以上是生活随笔為你收集整理的缓存框架OSCache部分源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 几何画板有没有计算器
- 下一篇: JAVA回调机制