Netty 高性能之道 - Recycler 对象池的复用
前言
我們知道,Java 創建一個實例的消耗是不小的,如果沒有使用棧上分配和 TLAB,那么就需要使用 CAS 在堆中創建對象。所以現在很多框架都使用對象池。Netty 也不例外,通過重用對象,能夠避免頻繁創建對象和銷毀對象帶來的損耗。
來看看具體實現。
1. Recycler 抽象類簡介
該類 doc:
Light-weight object pool based on a thread-local stack.
基于線程局部堆棧的輕量級對象池。
該類是個容器,內部主要是一個 Stack 結構。當需要使用一個實例時,就彈出,當使用完畢時,就清空后入棧。
- 該類有 2 個主要方法:
- 該類有 4 個內部接口 / 內部類:
- 實現線程局部緩存的 FastThreadLocal:
- 核心方法 get 操作
- 核心方法 DefaultHandle 的 recycle 操作
2. Netty 中的使用范例
io.netty.channel.ChannelOutboundBuffer.Entry 類
- 示例代碼如下:
從上面的 get 方法,我們知道,最終會從 threadLocal 取出 Stack,從 Stack 中彈出 DefaultHandle 對象(如果沒有就創建一個),然后調用我們重寫的 newObject 方法,將創建的對象和 handle 綁定。最后返回這個對象。
當調用 entry.recycle() 方法的時候,實際會調用 DefaultHandle 的 recycle 方法。我們看看該方法實現:
public void recycle(Object object) {if (object != value) {throw new IllegalArgumentException("object does not belong to handle");}stack.push(this); }這里的 value 就是 get 方法中賦值的。如果不相等,就拋出異常。反之,將 handle 入棧 stack。注意:這里并沒有對 value 做任何處理,只是在 Entry 內部做了清空處理。所以,這個 handle 和 handle 綁定的對象就保存在了 stack 中。
下次再次調用 get 時,就可以直接從該 threadLocal 中取出 handle 和 handle 綁定的 value了。完成了一次完美的對象池的實踐。也就是說,一個 handle 綁定一個實例。而這個 handle 還是比較輕量的。
從這里可以看出,Stack 就是真正的 “池子”。我們就看看這個池子的內部實現。
而這個 stack 對外常用的方法的 pop 和 push。我們就來看看這兩個方法。
3. pop 方法
代碼如下:
DefaultHandle<T> pop() {int size = this.size;if (size == 0) {if (!scavenge()) {return null;}size = this.size;}size --;DefaultHandle ret = elements[size];elements[size] = null;if (ret.lastRecycledId != ret.recycleId) {throw new IllegalStateException("recycled multiple times");}ret.recycleId = 0;ret.lastRecycledId = 0;this.size = size;return ret; }邏輯如下:
這個方法除了 scavenge 之外,還是比較簡單的。
4. push 方法
代碼如下:
void push(DefaultHandle<?> item) {Thread currentThread = Thread.currentThread();if (threadRef.get() == currentThread) { pushNow(item);} else { pushLater(item, currentThread);} }當一個對象使用 pop 方法取出來之后,可能會被別的線程使用,這時候,如果是你,你怎么處理呢?
先看看當前線程的處理:
看看 pushNow 方法:
private void pushNow(DefaultHandle<?> item) {if ((item.recycleId | item.lastRecycledId) != 0) {throw new IllegalStateException("recycled already");}item.recycleId = item.lastRecycledId = OWN_THREAD_ID;int size = this.size;if (size >= maxCapacity || dropHandle(item)) {return;}if (size == elements.length) {elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));}elements[size] = item;this.size = size + 1; }該方法主要邏輯如下:
看看 dropHandle 方法的實現:
boolean dropHandle(DefaultHandle<?> handle) {// 沒有被回收過if (!handle.hasBeenRecycled) {// 第一次是 -1,++ 之后變為0,取余7。其實如果正常情況下,結果應該都是0。// 如果下面的判斷不是0 的話,那么已經歸還。這個對象就沒有必要重復歸還。// 直接丟棄。if ((++handleRecycleCount & ratioMask) != 0) {// Drop the object.return true;}// 改為被回收過,下次就不會進入了handle.hasBeenRecycled = true;}// 刪除失敗return false; }已經寫了注釋,就不再過多解釋。
可以看到,pushNow 方法還是很簡單的。由于在當前線程里,只需要還原到 Stack 的數組中就好了。
關鍵是:如果是其他的線程做回收操作,該怎么辦?
5. pushLater 方法(多線程回收如何操作)
先說說 Netty 的解決辦法和思路:
每個線程都有一個 Stack 對象,每個線程也都有一個軟引用 Map,鍵為 Stack,值是 queue。
線程每次從 local 中獲取 Stack 對象,再從 Stack 中取出實例。如果取不到,嘗試從 queue 取,也就是從queue 中的 Link 中取出,并銷毀 Link。
但回收的時候,可能就不是原來的那個線程了,由于回收時使用的還是原來的 Stack,所以,需要考慮這個實例如何才能正確的回收。
這個時候,就需要 Map 出場了。創建一個 queue 關聯這個 Stack,將數據放到這個 queue 中。等到持有這個 Stack 的線程想拿數據了,就從 Stack 對應的 queue 中取出。
看出來了嗎?只有一個線程持有唯一的 Stack,其余的線程只持有這個 Stack 關聯的 queue。因此,可以說,這個 queue 是兩個線程共享的。除了 Stack 自己的線程外,其余的線程的歸還都是放到 自己的queue 中。
這個 queue 是無界的。內部的 Link 是有界的。每個線程對應一個 queue。
這些線程的 queue 組成了鏈表。
具體如下圖所示:
看完了設計,再看看代碼實現:
pushLater 方法
private void pushLater(DefaultHandle<?> item, Thread thread) {// 每個 Stack 對應一串 queue,找到當前線程的 mapMap<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();// 查看當前線程中是否含有這個 Stack 對應的隊列WeakOrderQueue queue = delayedRecycled.get(this);if (queue == null) {// 如果沒有// 如果 map 長度已經大于最大延遲數了,則向 map 中添加一個假的隊列if (delayedRecycled.size() >= maxDelayedQueues) {// 8delayedRecycled.put(this, WeakOrderQueue.DUMMY);return;}// 如果長度不大于最大延遲數,則嘗試創建一個queue,鏈接到這個 Stack 的 head 節點前(內部創建Link)if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {// drop objectreturn;}delayedRecycled.put(this, queue);} else if (queue == WeakOrderQueue.DUMMY) {// drop objectreturn;}queue.add(item); }該方法步驟如下:
我們主要關注如何 allocate 方法,關鍵方法 newQueue:
@1 static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {WeakOrderQueue queue = new WeakOrderQueue(stack, thread);stack.setHead(queue);return queue; }@2 private WeakOrderQueue(Stack<?> stack, Thread thread) {head = tail = new Link();owner = new WeakReference<Thread>(thread);availableSharedCapacity = stack.availableSharedCapacity; }@3 private static final class Link extends AtomicInteger {private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];private int readIndex;private Link next; }@4 synchronized void setHead(WeakOrderQueue queue) {queue.setNext(head);head = queue; }代碼1,2,3,4。
其中,有一個需要注意的地方就是 owner = new WeakReference(thread),使用了弱引用,當這個線程對象被 GC 后,這個 owner 也會變為 null,就可以像 threadLoca 一樣對該引用進行判 null,來檢查這個線程對象是否回收了。
再看看如何添加進 queue 中的:
void add(DefaultHandle<?> handle) {handle.lastRecycledId = id;Link tail = this.tail;int writeIndex;if ((writeIndex = tail.get()) == LINK_CAPACITY) {if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {// Drop it.return;}this.tail = tail = tail.next = new Link();writeIndex = tail.get();}tail.elements[writeIndex] = handle;handle.stack = null;tail.lazySet(writeIndex + 1); }首先,拿到這個 queue 的 tail 節點,如果這個 tiail 節點滿了,查看是否還有共享空間,如果沒了,就丟棄這個實例。
反之,則新建一個 Link,追加到 tail 節點的尾部。然后,將數據插入新 tail 的數組。然后,將這個 handle 的 stack 屬性設置成 null,表示這個 handle 不屬于任何 statck 了,其他 stack 都可以使用。
數據放進去了,怎么取出來呢?
6. scavenge 方法
我們剛剛留了這個方法,現在可以開始講了。代碼如下:
boolean scavenge() {// continue an existing scavenge, if any// 清理成功后,stack 的 size 會變化if (scavengeSome()) {return true;}// reset our scavenge cursorprev = null;cursor = head;return false; }主要調用的是 scavengeSome 方法,返回 true 表示將 queue 中的數據轉移成功。看看該方法。
boolean scavengeSome() {WeakOrderQueue prev;WeakOrderQueue cursor = this.cursor;if (cursor == null) {prev = null;cursor = head;if (cursor == null) {return false;}} else {prev = this.prev;}boolean success = false;do {// 將 head queue 的實例轉移到 this stack 中if (cursor.transfer(this)) {success = true;break;}// 如果上面失敗,找下一個節點WeakOrderQueue next = cursor.next;// 如果當前線程被回收了,if (cursor.owner.get() == null) {// 只要最后一個節點還有數據,就一直轉移if (cursor.hasFinalData()) {for (;;) {if (cursor.transfer(this)) {success = true;} else {break;}}}if (prev != null) {prev.setNext(next);}} else {prev = cursor;}cursor = next;} while (cursor != null && !success);// 轉移成功之后,將 cursor 重置this.prev = prev;this.cursor = cursor;return success; }方法還是挺長的。我們拆解一下:
可以看到,最重要的還是 transfer 方法。然而該方法更長,就不貼代碼了,說說主要邏輯,有興趣可以自己看看,邏輯如下:
其中有一個疑問:為什么在其他線程插入 Link 時將 handle 的 stack 的屬性置為 null?在取出時,又將 handle 的 stack 屬性恢復。
答:因為如果 stack 被用戶手動置為 null,而容器中的 handle 還持有他的引用的話,就無法回收了。同時 Map 也使用了軟引用map,當 stack 沒有了引用被 GC 回收時,對應的 queue 也就被回收了。避免了內存泄漏。實際上,在之前的 Recycler 版本中,確實存在內存泄漏的情況。
該方法的主要目的就是將 queue 所屬的 Link 中的數據轉移到 stack 中。從而完成多線程的最終回收。
總結
Netty 并沒有使用第三方庫實現對象池,而是自己實現了一個相對輕量的對象池。通過使用 threadLocal,避免了多線程下取數據時可能出現的線程安全問題,同時,為了實現多線程回收同一個實例,讓每個線程對應一個隊列,隊列鏈接在 Stack 對象上形成鏈表,這樣,就解決了多線程回收時的安全問題。同時,使用了軟引用的map 和 軟引用的 thradl 也避免了內存泄漏。
在本次的源碼閱讀中,確實收獲很大。再回顧以下 Recycler 的設計圖吧。設計的真的非常好。
轉載于:https://www.cnblogs.com/stateis0/p/9062167.html
總結
以上是生活随笔為你收集整理的Netty 高性能之道 - Recycler 对象池的复用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【学时总结】 ◆学时 · I◆ A*算法
- 下一篇: 渐进式增强