Netty ObjectPool对象池技术原理分析
一、ObjectPool使用示例
1.對需要使用對象池的對象,定義一個ObjectPool的靜態全局變量RECYCLE,用于對象的分配和回收。并在對象內定義一個ObjectPool.Handle成員變量,并且將此變量作為構造函數參數傳入,并將構造函數作為私有。然后添加一個回收的方法Recycle,在不需要此對象時調用handle.recycle()
獲取對象則調用ObjectPool.get
@Slf4j public class ObjectRecycleTest {private static ExecutorService executor = new ThreadPoolExecutor(3, 6, 10,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), new NamedThreadFactory("local", false));public static final class People {String name;int age;boolean sex;private ObjectPool.Handle<People> handle;private static final ObjectPool<People> RECYCLE = ObjectPool.newPool((handle) -> {return new People(handle);});private People(ObjectPool.Handle<People> handle) {this.handle = handle;}public static People newInstance(String name, int age, boolean sex) {People people = RECYCLE.get();people.age = age;people.name = name;people.sex = sex;return people;}public void recycle() {name = "";age = 0;handle.recycle(this);}}public void testRecycle() {Set<People> peopleList = new HashSet<People>();for (int i = 0; i < 5; i++) {People p1 = People.newInstance("jack", 28, i % 2 == 0); // log.debug(" p1:{}",p1);peopleList.add(p1);}peopleList.stream().forEach(t -> {if (t.sex) {t.recycle();}});try {executor.submit(() -> {peopleList.stream().forEach(t -> {if (!t.sex) {t.recycle();}});}).get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}for (int i = 0; i < 2; i++) {People p2 = People.newInstance("jack", 28, i % 2 == 0);if (peopleList.contains(p2)) {log.debug(" in exist p2:{}", p2);} else { // log.debug(" new instance p2:{}",p2);}}executor.submit(() -> {People p3 = People.newInstance("qiuye", 26, true);log.debug(" p3:{}", p3);});}public static void main(String[] args) {ObjectRecycleTest objectRecycleTest = new ObjectRecycleTest();objectRecycleTest.testRecycle();} }二、實例對象創建流程
1.調用ObjectPool.get來獲取實例,這是對Recycler的一個封裝。
? ?
public abstract class ObjectPool<T> {ObjectPool() { }public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));}private static final class RecyclerObjectPool<T> extends ObjectPool<T> {private final Recycler<T> recycler;RecyclerObjectPool(final ObjectCreator<T> creator) {recycler = new Recycler<T>() {@Overrideprotected T newObject(Handle<T> handle) {return creator.newObject(handle);}};}@Overridepublic T get() {return recycler.get();}} }2.Recycler的類中有一個threadLocal的stack類型的棧對象(保證每個線程數據私有和安全),這里面保存有所有曾經回收到準備重利用的對象。
public abstract class Recycler<T> {private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {@Overrideprotected Stack<T> initialValue() {return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,interval, maxDelayedQueuesPerThread, delayedQueueInterval);}@Overrideprotected void onRemoval(Stack<T> value) {// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overheadif (value.threadRef.get() == Thread.currentThread()) {if (DELAYED_RECYCLED.isSet()) {DELAYED_RECYCLED.get().remove(value);}}}};3.Recycler.get方法,就是從stack中獲取對象,獲取為空,則新建一個對象和Handle,并將Handle與Stack關聯。
public final T get() {if (maxCapacityPerThread == 0) {return newObject((Handle<T>) NOOP_HANDLE);}Stack<T> stack = threadLocal.get();DefaultHandle<T> handle = stack.pop();if (handle == null) {handle = stack.newHandle();handle.value = newObject(handle);}return (T) handle.value;}4.stack.pop方法,則是獲取歷史的回收的對象。
private static final class Stack<T> { DefaultHandle<?>[] elements; private volatile WeakOrderQueue head; DefaultHandle<T> pop() {int size = this.size;if (size == 0) {if (!scavenge()) {return null;}size = this.size;if (size <= 0) {// double check, avoid racesreturn null;}}size --;DefaultHandle ret = elements[size];elements[size] = null;this.size = size;if (ret.lastRecycledId != ret.recycleId) {throw new IllegalStateException("recycled multiple times");}ret.recycleId = 0;ret.lastRecycledId = 0;return ret;}這里有兩種對象來源。
? ? ?(1).elements數組是保存新建對象和回收對象在同一個線程的回收對象。
? ? (2).類型WeakOrderQueue的head對象則是一個弱引用鏈接隊列,這個隊列用來保存所有創建和回收線程不一致的回收對象。
? ? ?(3).首先從elements獲取回收對象,如果這里不為空,則返回第一個,總數減1.
? ? (4).如果elements為空,則將head隊列中的回收對象復制到elements,再次獲取。
5.我們首先看一下WeakOrderQueue的數據結構
這里面有兩個next屬性
?(1).本身的next成員變量,是指向下一個線程回收的WeakOrderQueue,WeakOrderQueue這個在一個stack中也是多個,是一個單身鏈接。
(2).本身有個head,tail節點,用來指向對象內部的Link對象隊列。插入數據從tail進行,彈出數據從head進行。
private static final class WeakOrderQueue extends WeakReference<Thread> {static final WeakOrderQueue DUMMY = new WeakOrderQueue();@SuppressWarnings("serial")static final class Link extends AtomicInteger {final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];int readIndex;Link next;}private static final class Head {private final AtomicInteger availableSharedCapacity;Link link;Head(AtomicInteger availableSharedCapacity) {this.availableSharedCapacity = availableSharedCapacity;}private void reclaimSpace(int space) {availableSharedCapacity.addAndGet(space);}void relink(Link link) {reclaimSpace(LINK_CAPACITY);this.link = link;}}// chain of data itemsprivate final Head head;private Link tail;// pointer to another queue of delayed items for the same stackprivate WeakOrderQueue next;private final int id = ID_GENERATOR.getAndIncrement();private final int interval;private int handleRecycleCount;5.從head隊列復制回收對象列表到elements的流程, 這里先找一個可用的不為空的隊列,并且將回收對象(這里的回收對象都為Handle類型)列表 復制到elements數組。
private boolean scavengeSome() {WeakOrderQueue prev;WeakOrderQueue cursor = this.cursor;if (cursor == null) {prev = null;cursor = head;if (cursor == null) {return false;}} else {prev = this.prev;}boolean success = false;do {if (cursor.transfer(this)) {success = true;break;}WeakOrderQueue next = cursor.getNext();if (cursor.get() == null) {if (cursor.hasFinalData()) {for (;;) {if (cursor.transfer(this)) {success = true;} else {break;}}}if (prev != null) {// Ensure we reclaim all space before dropping the WeakOrderQueue to be GC'ed.cursor.reclaimAllSpaceAndUnlink();prev.setNext(next);}} else {prev = cursor;}cursor = next;} while (cursor != null && !success);this.prev = prev;this.cursor = cursor;return success;}6.具體的從WeakOrderQueue復制數據到stack,是由WeakOrderQueue的transfer完成。
boolean transfer(Stack<?> dst) {Link head = this.head.link;final int srcStart = head.readIndex;int srcEnd = head.get();final int srcSize = srcEnd - srcStart;if (srcSize == 0) {return false;}final int dstSize = dst.size;final int expectedCapacity = dstSize + srcSize;if (expectedCapacity > dst.elements.length) {final int actualCapacity = dst.increaseCapacity(expectedCapacity);srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);}if (srcStart != srcEnd) {final DefaultHandle[] srcElems = head.elements;final DefaultHandle[] dstElems = dst.elements;int newDstSize = dstSize;for (int i = srcStart; i < srcEnd; i++) {DefaultHandle<?> element = srcElems[i];if (element.recycleId == 0) {element.recycleId = element.lastRecycledId;} else if (element.recycleId != element.lastRecycledId) {throw new IllegalStateException("recycled already");}srcElems[i] = null;if (dst.dropHandle(element)) {// Drop the object.continue;}element.stack = dst;dstElems[newDstSize ++] = element;}head.readIndex = srcEnd;dst.size = newDstSize;return true;} else {// The destination stack is full already.return false;}}這里面的代碼比較長,也就是從head.link節點開始,復制handle對象數據到elements中。這里特殊要注意一下,這種跨線程的對象,也會采取丟失策略。即丟7取1.即stack.drophandle,到時在回收對象時講述。
7.這里再回到第4步,由于stack.elements有數據了 ,所以跟之前的流程一樣。從這里可以看出,同線程回收的對象,都可以重復使用,跨線程回收的對象,只能丟7取1.
三、實例對象回收過程。
1.從handle.recycle開始,handle保存在People實例對象的屬性中。
private static final class DefaultHandle<T> implements Recycler.Handle<T> {int lastRecycledId;int recycleId;boolean hasBeenRecycled;Recycler.Stack<?> stack;Object value;DefaultHandle(Recycler.Stack<?> stack) {this.stack = stack;}public void recycle(Object object) {if (object != this.value) {throw new IllegalArgumentException("object does not belong to handle");} else {Recycler.Stack<?> stack = this.stack;if (this.lastRecycledId == this.recycleId && stack != null) {stack.push(this);} }}}2.這里會調用stack.push(this),stack即為創建時當前線程所綁定的threadlocal對象。
Stack void push(Recycler.DefaultHandle<?> item) {Thread currentThread = Thread.currentThread();if (this.threadRef.get() == currentThread) {this.pushNow(item);} else {this.pushLater(item, currentThread);}}3.判斷如果是當前線程,則pushNow,否則pushLater,pushNow就是放到elements數組,pushLater就是放到WeakOrderQueue類型的head隊列。
我們先看pushNow
private void pushNow(Recycler.DefaultHandle<?> item) {if ((item.recycleId | item.lastRecycledId) != 0) {throw new IllegalStateException("recycled already");} else {item.recycleId = item.lastRecycledId = Recycler.OWN_THREAD_ID;int size = this.size;if (size < this.maxCapacity && !this.dropHandle(item)) {if (size == this.elements.length) {this.elements = (Recycler.DefaultHandle[])Arrays.copyOf(this.elements, Math.min(size << 1, this.maxCapacity));}this.elements[size] = item;this.size = size + 1;}}}這個比較簡單,就是放到elements數組,并且 SIZE自增,但是要注意的是會進行一次dropHandle過濾,8個對象只有一個會存下來,其它的都會被GC回收,不能重復使用。
boolean dropHandle(Recycler.DefaultHandle<?> handle) {if (!handle.hasBeenRecycled) {if (this.handleRecycleCount < this.interval) {++this.handleRecycleCount;return true;}this.handleRecycleCount = 0;handle.hasBeenRecycled = true;}return false;}4.接下來我們看pushLater, 這里面會判斷當前stack對象是否在當前回收線程的
private static final FastThreadLocal<Map<Recycler.Stack<?>, Recycler.WeakOrderQueue>> DELAYED_RECYCLED這個LOCAL線程數據的Map中是否存在(一個線程可以有多個stack,多個對象回收器)。如果不存在,則創建一個newWeakOrderQueue,然后將當前handle保存到WeakOrderQueue中。 private void pushLater(Recycler.DefaultHandle<?> item, Thread thread) {if (this.maxDelayedQueues != 0) {Map<Recycler.Stack<?>, Recycler.WeakOrderQueue> delayedRecycled = (Map)Recycler.DELAYED_RECYCLED.get();Recycler.WeakOrderQueue queue = (Recycler.WeakOrderQueue)delayedRecycled.get(this);if (queue == null) {if (delayedRecycled.size() >= this.maxDelayedQueues) {delayedRecycled.put(this, Recycler.WeakOrderQueue.DUMMY);return;}if ((queue = this.newWeakOrderQueue(thread)) == null) {return;}delayedRecycled.put(this, queue);} else if (queue == Recycler.WeakOrderQueue.DUMMY) {return;}queue.add(item);}}5.新建隊列,這里可以看到新建一個head,tail,并且head.link指向第一個tail,
static Recycler.WeakOrderQueue newQueue(Recycler.Stack<?> stack, Thread thread) {if (!Recycler.WeakOrderQueue.Head.reserveSpaceForLink(stack.availableSharedCapacity)) {return null;} else {Recycler.WeakOrderQueue queue = new Recycler.WeakOrderQueue(stack, thread);stack.setHead(queue);return queue;}}private WeakOrderQueue(Recycler.Stack<?> stack, Thread thread) {super(thread);this.id = Recycler.ID_GENERATOR.getAndIncrement();this.tail = new Recycler.WeakOrderQueue.Link();this.head = new Recycler.WeakOrderQueue.Head(stack.availableSharedCapacity);this.head.link = this.tail;this.interval = stack.delayedQueueInterval;this.handleRecycleCount = this.interval;}6.入隊過程,就是放到tail.elements數組中,因為tail為Link類型,這個類又繼承于
AtomicInteger,所以巧妙的運行AtomicInteger.value來標志為寫位置。這個有個要注意,如果tail中的elements已滿,則新建一個Link作為新的tail,老的tail的NEXT指向新tail節點,而head.link則一直指向第一個tail,構成一個單向鏈接。 void add(Recycler.DefaultHandle<?> handle) {handle.lastRecycledId = this.id;if (this.handleRecycleCount < this.interval) {++this.handleRecycleCount;} else {this.handleRecycleCount = 0;Recycler.WeakOrderQueue.Link tail = this.tail;int writeIndex;if ((writeIndex = tail.get()) == Recycler.LINK_CAPACITY) {Recycler.WeakOrderQueue.Link link = this.head.newLink();if (link == null) {return;}this.tail = tail = tail.next = link;writeIndex = tail.get();}tail.elements[writeIndex] = handle;handle.stack = null;tail.lazySet(writeIndex + 1);}}7.查看堆棧快照數據,先他截6個對象,然后當前線程回收3個,其它線程回收3個。則是在stack的elements中有一個和WeakOrderQueue的head中有一個(丟7取1策略)
8.至此,分析完成,可以看到,對象池不是完全重復使用,而是丟7存1,防止內存溢出。?
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Netty ObjectPool对象池技术原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ThreadLocal的原理和FastT
- 下一篇: ScheduledThreadPoolE