Java多线程(四)之ConcurrentSkipListMap深入分析
一、前言?
concurrentHashMap與ConcurrentSkipListMap性能測試
在4線程1.6萬數據的條件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。
但ConcurrentSkipListMap有幾個ConcurrentHashMap 不能比擬的優點:
1、ConcurrentSkipListMap 的key是有序的。
2、ConcurrentSkipListMap 支持更高的并發。ConcurrentSkipListMap 的存取時間是log(N),和線程數幾乎無關。也就是說在數據量一定的情況下,并發的線程越多,ConcurrentSkipListMap越能體現出他的優勢。?
?
二、使用建議
在非多線程的情況下,應當盡量使用TreeMap。此外對于并發性相對較低的并行程序可以使用Collections.synchronizedSortedMap將TreeMap進行包裝,也可以提供較好的效率。對于高并發程序,應當使用ConcurrentSkipListMap,能夠提供更高的并發度。
 所以在多線程程序中,如果需要對Map的鍵值進行排序時,請盡量使用ConcurrentSkipListMap,可能得到更好的并發度。
 注意,調用ConcurrentSkipListMap的size時,由于多個線程可以同時對映射表進行操作,所以映射表需要遍歷整個鏈表才能返回元素個數,這個操作是個O(log(n))的操作。
?
二、什么是SkipList
?
Skip list(跳表)是一種可以代替平衡樹的數據結構,默認是按照Key值升序的。Skip list讓已排序的數據分布在多層鏈表中,以0-1隨機數決定一個數據的向上攀升與否,通過“空間來換取時間”的一個算法,在每個節點中增加了向前的指針,在插入、刪除、查找時可以忽略一些不可能涉及到的結點,從而提高了效率。
 從概率上保持數據結構的平衡比顯示的保持數據結構平衡要簡單的多。對于大多數應用,用Skip list要比用樹算法相對簡單。由于Skip list比較簡單,實現起來會比較容易,雖然和平衡樹有著相同的時間復雜度(O(logn)),但是skip list的常數項會相對小很多。Skip list在空間上也比較節省。一個節點平均只需要1.333個指針(甚至更少)。
 ? ? ? ? ? ? ? ??
 圖1-1 Skip list結構圖(以7,14,21,32,37,71,85序列為例)
?
Skip list的性質
(1) 由很多層結構組成,level是通過一定的概率隨機產生的。
 (2) 每一層都是一個有序的鏈表,默認是升序,也可以根據創建映射時所提供的Comparator進行排序,具體取決于使用的構造方法。
 (3) 最底層(Level 1)的鏈表包含所有元素。
 (4) 如果一個元素出現在Level i 的鏈表中,則它在Level i 之下的鏈表也都會出現。
 (5) 每個節點包含兩個指針,一個指向同一鏈表中的下一個元素,一個指向下面一層的元素。
?
三、什么是ConcurrentSkipListMap
?
ConcurrentSkipListMap提供了一種線程安全的并發訪問的排序映射表。內部是SkipList(跳表)結構實現,在理論上能夠在O(log(n))時間內完成查找、插入、刪除操作。
 ???? ??注意,調用ConcurrentSkipListMap的size時,由于多個線程可以同時對映射表進行操作,所以映射表需要遍歷整個鏈表才能返回元素個數,這個操作是個O(log(n))的操作。
?
?ConcurrentSkipListMap存儲結構
?
 ConcurrentSkipListMap存儲結構圖
?
跳躍表(SkipList):(如上圖所示)
 1.多條鏈構成,是關鍵字升序排列的數據結構;
 2.包含多個級別,一個head引用指向最高的級別,最低(底部)的級別,包含所有的key;
 3.每一個級別都是其更低級別的子集,并且是有序的;
 4.如果關鍵字 key在 級別level=i中出現,則,level<=i的鏈表中都會包含該關鍵字key;
?
------------------------
ConcurrentSkipListMap主要用到了Node和Index兩種節點的存儲方式,通過volatile關鍵字實現了并發的操作
??
static final class Node<K,V> {
final K key;
volatile Object value;//value值
volatile Node<K,V> next;//next引用
……
}
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;//downy引用
volatile Index<K,V> right;//右邊引用
……
}
?
?
------------------------
ConcurrentSkipListMap的查找
?
通過SkipList的方式進行查找操作:(下圖以“查找91”進行說明:)
?
紅色虛線,表示查找的路徑,藍色向右箭頭表示right引用;黑色向下箭頭表示down引用;
?
/get方法,通過doGet操作實現
?
public V get(Object key) {
return doGet(key);
}
//doGet的實現
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
Node<K,V> bound = null;
Index<K,V> q = head;//把頭結點作為當前節點的前驅節點
Index<K,V> r = q.right;//前驅節點的右節點作為當前節點
Node<K,V> n;
K k;
int c;
for (;;) {//遍歷
Index<K,V> d;
// 依次遍歷right節點
if (r != null && (n = r.node) != bound && (k = n.key) != null) {
if ((c = key.compareTo(k)) > 0) {//由于key都是升序排列的,所有當前關鍵字大于所要查找的key時繼續向右遍歷
q = r;
r = r.right;
continue;
} else if (c == 0) {
//如果找到了相等的key節點,則返回該Node的value如果value為空可能是其他并發delete導致的,于是通過另一種
//遍歷findNode的方式再查找
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else
bound = n;
}
//如果一個鏈表中right沒能找到key對應的value,則調整到其down的引用處繼續查找
if ((d = q.down) != null) {
q = d;
r = d.right;
} else
break;
}
// 如果通過上面的遍歷方式,還沒能找到key對應的value,再通過Node.next的方式進行查找
for (n = q.node.next; n != null; n = n.next) {
if ((k = n.key) != null) {
if ((c = key.compareTo(k)) == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else if (c < 0)
break;
}
}
return null;
}
?
?
?
------------------------------------------------
ConcurrentSkipListMap的刪除
?
通過SkipList的方式進行刪除操作:(下圖以“刪除23”進行說明:)
?
紅色虛線,表示查找的路徑,藍色向右箭頭表示right引用;黑色向下箭頭表示down引用;
?
?
//remove操作,通過doRemove實現,把所有level中出現關鍵字key的地方都delete掉
public V remove(Object key) {
return doRemove(key, null);
}
final V doRemove(Object okey, Object value) {
Comparable<? super K> key = comparable(okey);
for (;;) {
Node<K,V> b = findPredecessor(key);//得到key的前驅(就是比key小的最大節點)
Node<K,V> n = b.next;//前驅節點的next引用
for (;;) {//遍歷
if (n == null)//如果next引用為空,直接返回
return null;
Node<K,V> f = n.next;
if (n != b.next) // 如果兩次獲得的b.next不是相同的Node,就跳轉到第一層循環重新獲得b和n
break;
Object v = n.value;
if (v == null) { // 當n被其他線程delete的時候,其value==null,此時做輔助處理,并重新獲取b和n
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // 當其前驅被delet的時候直接跳出,重新獲取b和n
break;
int c = key.compareTo(n.key);
if (c < 0)
return null;
if (c > 0) {//當key較大時就繼續遍歷
b = n;
n = f;
continue;
}
if (value != null && !value.equals(v))
return null;
if (!n.casValue(v, null))
break;
if (!n.appendMarker(f) || !b.casNext(n, f))//casNext方法就是通過比較和設置b(前驅)的next節點的方式來實現刪除操作
findNode(key); // 通過嘗試findNode的方式繼續find
else {
findPredecessor(key); // Clean index
if (head.right == null) //如果head的right引用為空,則表示不存在該level
tryReduceLevel();
}
return (V)v;
}
}
}
 ?
?
-------------------------------------
?
ConcurrentSkipListMap的插入
?
?
通過SkipList的方式進行插入操作:(下圖以“添加55”的兩種情況,進行說明:)
在level=2(該level存在)的情況下添加55的圖示:只需在level<=2的合適位置插入55即可
--------
在level=4(該level不存在,圖示level4是新建的)的情況下添加55的情況:首先新建level4,然后在level<=4的合適位置插入55
-----------
?
//put操作,通過doPut實現
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
private V doPut(K kkey, V value, boolean onlyIfAbsent) {
Comparable<? super K> key = comparable(kkey);
for (;;) {
Node<K,V> b = findPredecessor(key);//前驅
Node<K,V> n = b.next;
//定位的過程就是和get操作相似
for (;;) {
if (n != null) {
Node<K,V> f = n.next;
if (n != b.next) // 前后值不一致的情況下,跳轉到第一層循環重新獲得b和n
break;;
Object v = n.value;
if (v == null) { // n被delete的情況下
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b 被delete的情況,重新獲取b和n
break;
int c = key.compareTo(n.key);
if (c > 0) {
b = n;
n = f;
continue;
}
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value))
return (V)v;
else
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
Node<K,V> z = new Node<K,V>(kkey, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
int level = randomLevel();//得到一個隨機的level作為該key-value插入的最高level
if (level > 0)
insertIndex(z, level);//進行插入操作
return null;
}
}
}
/**
* 獲得一個隨機的level值
*/
private int randomLevel() {
int x = randomSeed;
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
if ((x & 0x8001) != 0) // test highest and lowest bits
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
return level;
}
//執行插入操作:如上圖所示,有兩種可能的情況:
//1.當level存在時,對level<=n都執行insert操作
//2.當level不存在(大于目前的最大level)時,首先添加新的level,然后在執行操作1
private void insertIndex(Node<K,V> z, int level) {
HeadIndex<K,V> h = head;
int max = h.level;
if (level <= max) {//情況1
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)//首先得到一個包含1~level個級別的down關系的鏈表,最后的inx為最高level
idx = new Index<K,V>(z, idx, null);
addIndex(idx, h, level);//把最高level的idx傳給addIndex方法
} else { // 情況2 增加一個新的級別
level = max + 1;
Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)//該步驟和情況1類似
idxs[i] = idx = new Index<K,V>(z, idx, null);
HeadIndex<K,V> oldh;
int k;
for (;;) {
oldh = head;
int oldLevel = oldh.level;
if (level <= oldLevel) { // lost race to add level
k = level;
break;
}
HeadIndex<K,V> newh = oldh;
Node<K,V> oldbase = oldh.node;
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);//創建新的
if (casHead(oldh, newh)) {
k = oldLevel;
break;
}
}
addIndex(idxs[k], oldh, k);
}
}
/**
*在1~indexlevel層中插入數據
*/
private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {
// insertionLevel 代表要插入的level,該值會在indexLevel~1間遍歷一遍
int insertionLevel = indexLevel;
Comparable<? super K> key = comparable(idx.node.key);
if (key == null) throw new NullPointerException();
// 和get操作類似,不同的就是查找的同時在各個level上加入了對應的key
for (;;) {
int j = h.level;
Index<K,V> q = h;
Index<K,V> r = q.right;
Index<K,V> t = idx;
for (;;) {
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = key.compareTo(n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {//在該層level中執行插入操作
// Don't insert index if node already deleted
if (t.indexesDeletedNode()) {
findNode(key); // cleans up
return;
}
if (!q.link(r, t))//執行link操作,其實就是inset的實現部分
break; // restart
if (--insertionLevel == 0) {
// need final deletion check before return
if (t.indexesDeletedNode())
findNode(key);
return;
}
}
if (--j >= insertionLevel && j < indexLevel)//key移動到下一層level
t = t.down;
q = q.down;
r = q.right;
}
}
}
 參考:
?
集合框架 Map篇(5)----ConcurrentSkipListMap?http://hi.baidu.com/yao1111yao/item/0f3008163c4b82c938cb306d
 Java里多個Map的性能比較(TreeMap、HashMap、ConcurrentSkipListMap)?http://blog.hongtium.com/java-map-skiplist/
 跳表SkipList的原理和實現 http://imtinx.iteye.com/blog/1291165?
總結
以上是生活随笔為你收集整理的Java多线程(四)之ConcurrentSkipListMap深入分析的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Java多线程(六)之Deque与Lin
- 下一篇: Java多线程(三)之Concurren
