alibaba sentinel限流组件 源码分析
如何使用?
maven引入:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.5.1</version>
</dependency>
該組件是保護(hù)資源用,什么資源呢?Conceptually, physical or logical resource,明白了吧。
web應(yīng)用中大部分情況都是用于保護(hù)接口,防止負(fù)載過大,支持限流、降級(jí)處理,規(guī)則可配。
入門級(jí)使用方法如下:
public void foo() {Entry entry = null;try {entry = SphU.entry("abc");// resource that need protection} catch (BlockException blockException) {// when goes there, it is blocked// add blocked handle logic here} catch (Throwable bizException) {// business exception Tracer.trace(bizException);} finally {// ensure finally be executedif (entry != null){entry.exit();}}}或者
public void foo() {
if (SphO.entry("abc")) {
try {
// business logic
} finally {
SphO.exit(); // must exit()
}
} else {
// failed to enter the protected resource.
}
}
入口為com.alibaba.csp.sentinel.SphU和com.alibaba.csp.sentinel.SphO。兩者的區(qū)別從使用就可以看出,限流發(fā)生時(shí),SphU是通過異常的形式反饋出來,SphO是通過entry的返回值反饋出來的。
查看SphO的代碼,如下圖,其實(shí)是內(nèi)部捕獲了異常,然后返回boolean類型。所以,我們就從SphU開始分析吧。
SphU
最終會(huì)調(diào)用到com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)這個(gè)方法。
里面的核心為調(diào)用com.alibaba.csp.sentinel.CtSph.lookProcessChain(ResourceWrapper)獲取對(duì)應(yīng)的處理鏈,方法如下:
從上面代碼可知,一個(gè)chain關(guān)聯(lián)一個(gè)資源,這一點(diǎn)很重要,后面分析node節(jié)點(diǎn)結(jié)構(gòu)時(shí)會(huì)用到。
public static ProcessorSlotChain newSlotChain() {if (builder != null) {return builder.build();}// 構(gòu)建chainBuilder,具體說明見下面代碼resolveSlotChainBuilder();if (builder == null) {RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");builder = new DefaultSlotChainBuilder();}return builder.build();}private static void resolveSlotChainBuilder() {List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();boolean hasOther = false;// java.util.ServiceLoader 方式擴(kuò)展功能,這是jdk的,簡稱spi擴(kuò)展,sentinel有很多地方用到這種擴(kuò)展方式
// 關(guān)于ServiceLoader擴(kuò)展的詳解不在這里討論,后面有時(shí)間可以單獨(dú)拎出來分析下
for (SlotChainBuilder builder : LOADER) {if (builder.getClass() != DefaultSlotChainBuilder.class) {hasOther = true;list.add(builder);}}if (hasOther) {builder = list.get(0);} else {// No custom builder, using default.builder = new DefaultSlotChainBuilder();}RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "+ builder.getClass().getCanonicalName());}
通過上面代碼可知,一般情況下采用的默認(rèn)chainBuilder,那我們就來看看這個(gè)build:
通過上面com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)方法可知,最終調(diào)用方式如下:
通過entry調(diào)用把chain中的所有ProcessorSlot串起來,挨個(gè)調(diào)用。
主體已經(jīng)清晰了,重點(diǎn)其實(shí)也就是chain中添加的那些ProcessorSlot,下面我們就一個(gè)個(gè)看下這些slot到底干了啥,劃下重點(diǎn),StatisticSlot這個(gè)slot是核心,用于統(tǒng)計(jì)各種數(shù)量。
?NodeSelectorSlot
構(gòu)建調(diào)用資源調(diào)用路徑,最終在內(nèi)存中會(huì)形成一個(gè)樹狀結(jié)構(gòu)。
ContextUtil.enter("entrance1", "appA");Entry nodeA = SphU.entry("nodeA");if (nodeA != null) {nodeA.exit();}ContextUtil.exit();上述代碼對(duì)象結(jié)構(gòu)如下:
machine-root
/
/
EntranceNode1
/
/
DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
上述代碼會(huì)形成結(jié)構(gòu)如下:
? ? ? ? ? ? ? ? machine-root
? ? ? ? ? ? ? ? ? /? ? ? ? ? ? \
? ? ? ? ? ? ? ? /? ? ? ? ? ? ? ? \
? ?EntranceNode1? ? ?EntranceNode2
? ? ? ? ? ? ?/? ? ? ? ? ? ? ? ? ? ? ?\
/ ? ? ? ? \
DefaultNode(nodeA) DefaultNode(nodeA)
| ?|
+- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
上述代碼會(huì)形成結(jié)構(gòu)如下:
? ? ? ? ? ? ? ? machine-root
? ? ? ? ? ? ? ? ? /? ? ? ??
? ? ? ? ? ? ? ? /? ? ? ? ??
? ?EntranceNode1?
? ? ? ? ? ? ?/? ? ? ? ??
/
DefaultNode(nodeA)?- - - - - -> ClusterNode(nodeA);
/
/
DefaultNode(nodeB)- - - - - -> ClusterNode(nodeB);
說明:
a.一條路徑對(duì)應(yīng)一個(gè)Context;如果是入門的那種使用方法,即沒有顯式使用ContextUtil.enter("entrance1", "appA")方式創(chuàng)建context的話,會(huì)默認(rèn)使用MyContextUtil.myEnter創(chuàng)建一個(gè)名為sentinel_default_context的context;
源碼釋義:
一次資源的訪問都會(huì)走到com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...),方法開始處通過ContextUtil.getContext()從threadlocal中獲取context,ContextUtil.enter會(huì)檢查threadlocal有沒有context,沒有就創(chuàng)建,有就直接返回。如果限流時(shí)沒有調(diào)用ContextUtil.enter顯式開啟context,就會(huì)走到下面MyContextUtil.myEnter處創(chuàng)建默認(rèn)名為
sentinel_default_context的context。最終都會(huì)調(diào)用到com.alibaba.csp.sentinel.context.ContextUtil.trueEnter(String, String),創(chuàng)建context部分代碼如下,每一個(gè)context會(huì)先創(chuàng)建一個(gè)EntranceNode入口node,然后掛到Constants.ROOT下,結(jié)構(gòu)見上面的樹狀圖。?
b.一次資源的調(diào)用會(huì)根據(jù)threadlocal獲取Context,同一線程下如果要切換context必須調(diào)用ContextUtil.exit();結(jié)束上一個(gè)context,再ContextUtil.enter("entrance2", "appA")開啟新的context;
源碼釋義:
context的獲取見上面一條釋義,這里看下ContextUtil.exit()方法,要想真正的將threadlocal清空,還得將context的CurEntry清空,怎么做?就是按照entry的調(diào)用順序反向依次調(diào)用com.alibaba.csp.sentinel.CtEntry.exit(int, Object...):
c.一個(gè)DefaultNode對(duì)應(yīng)同一個(gè)資源調(diào)用在某一個(gè)Context下的統(tǒng)計(jì)數(shù)據(jù);
源碼釋義:
首先,一次資源的entry調(diào)用會(huì)先去尋找ProcessorSlotChain,查看代碼可知,chain是以ResourceWrapper為key緩存在map中的,由DefaultSlotChainBuilder可以對(duì)應(yīng)到一次entry調(diào)用對(duì)應(yīng)一個(gè)NodeSelectorSlot實(shí)例,而NodeSelectorSlot中緩存了一個(gè)以contextName為key,DefaultNode為value的map,一句話總結(jié)下就是先去com.alibaba.csp.sentinel.context.ContextUtil.trueEnter(String, String)方法中通過contextNameNodeMap獲取以contextName緩存的該name對(duì)應(yīng)的EntranceNode節(jié)點(diǎn)并以該EntranceNode為參數(shù)new一個(gè)context對(duì)象返回,然后在該資源對(duì)應(yīng)的slotChain中的NodeSelectorSlot獲取一contextName為key緩存的DefaultNode。
這里解釋下DefaultNode第一次是怎么掛到context的EntranceNode下的。
代碼如下,重點(diǎn)是紅框中的代碼,獲取context中最末尾的節(jié)點(diǎn),并把當(dāng)前節(jié)點(diǎn)掛在后面。
我們?cè)倏聪耤ontext的getLastNode節(jié)點(diǎn),代碼如下:
由上面代碼自然會(huì)想的curEntry是什么時(shí)候設(shè)置的呢?如下圖所示,是在com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)中構(gòu)建CtEntry時(shí)設(shè)置的,entry之間的關(guān)系在context中是以雙向鏈表結(jié)構(gòu)維護(hù)的:
回到上一步,我們看下entry的getLastNode方法,如下圖所示,從parentEntry中獲取curNode,如果是第一次調(diào)用的話parent肯定為null,回到上一步的話就是返回該context的entranceNode。
NodeSelectorSlot中就會(huì)將當(dāng)次entry在該context下首次調(diào)用創(chuàng)建的DefaultNode掛到剛剛獲取到的entranceNode節(jié)點(diǎn)下,也就是該context的入口節(jié)點(diǎn)下,如果該context下之前有過別的資源調(diào)用,就會(huì)順序掛到那次調(diào)用產(chǎn)生的DefaultNode下面,形成上面描述的樹狀結(jié)構(gòu)圖。
DefaultNode處理完后會(huì)將context的CurNode設(shè)置為該DefaultNode,如下圖所示:
實(shí)際上是設(shè)置的context中當(dāng)前entry對(duì)應(yīng)的CurNode,如下圖所示,所以上面getLastNode是從parenEntry中獲取的CurNode。
d.ClusterNode對(duì)應(yīng)同一個(gè)資源在所有context下的統(tǒng)計(jì)數(shù)據(jù);
源碼釋義:
ClusterBuilderSlot中維護(hù)的一個(gè)實(shí)例變量實(shí)際就是對(duì)應(yīng)一個(gè)資源,原因上面已經(jīng)分析了,一個(gè)資源對(duì)應(yīng)一個(gè)slotChain,然后將該clusterNode設(shè)置到本次調(diào)用對(duì)應(yīng)的DefaultNode中,在DefaultNode做加法操作時(shí),會(huì)同時(shí)調(diào)用clusterNode的加法操作,這樣,分布在不同context下的同一個(gè)資源對(duì)應(yīng)的所有DefaultNode都會(huì)去調(diào)用clusterNode去匯總統(tǒng)計(jì)相同資源的統(tǒng)計(jì)數(shù)據(jù)。
e.一個(gè)資源對(duì)應(yīng)一個(gè)ProcessChain,自然就對(duì)應(yīng)一個(gè)NodeSelectorSlot實(shí)例,所以在NodeSelectorSlot里面DefaultNode node = map.get(context.getName());這行代碼的一維是資源,二維才是context,如果搞反了可能看到這行會(huì)蒙圈;
源碼釋義:
見c的分析。
ClusterBuilderSlot
構(gòu)建ClusterNode并設(shè)置到上面slot選中的node中,如果context中有設(shè)置調(diào)用來源Origin就創(chuàng)建一個(gè)StatisticNode針對(duì)具體某個(gè)調(diào)用方統(tǒng)計(jì)數(shù)據(jù)
Note that 'origin' usually is Service Consumer's app name:
StatisticSlot
核心slot,用于統(tǒng)計(jì)各種數(shù)據(jù)的地方,然后被后面的slot應(yīng)用。
先調(diào)用fireEntry執(zhí)行后面的slot,檢查本次請(qǐng)求能否通過,如果通過,就給對(duì)應(yīng)的node做加法操作。
先給自身node做加法,在給ClusterBuilderSlot中創(chuàng)建并傳入的closterNode做加法。
最終調(diào)用ArrayMetric的addPass:
需要申明的是,sentinel統(tǒng)計(jì)采用的是滑動(dòng)窗口的實(shí)現(xiàn)方式,這里的重點(diǎn)是com.alibaba.csp.sentinel.slots.statistic.base.LeapArray.currentWindow(long)方法,我們看下如何獲取當(dāng)前窗口。
// 根據(jù)當(dāng)前時(shí)間戳計(jì)算當(dāng)前窗口的索引private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// 索引從0開始,整除后得到的結(jié)果就是當(dāng)前時(shí)間戳所在的窗口,實(shí)際上我們的窗口只有array長度的固定幾個(gè),
// 可以想象一下這是在一個(gè)無限延長的虛擬時(shí)間線窗口,再對(duì)array的length取余數(shù)就得到了實(shí)際所在的窗口索引long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
// 計(jì)算虛擬窗口對(duì)應(yīng)的開始時(shí)間,當(dāng)前時(shí)間減去超出當(dāng)前窗口的那一段時(shí)間就得到開始時(shí)間return timeMillis - timeMillis % windowLengthInMs;}/*** Get bucket item at provided timestamp.** @param timeMillis a valid timestamp in milliseconds* @return current bucket item at provided timestamp if the time is valid; null if time is invalid*/public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}// 獲取當(dāng)前窗口索引int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.
// 獲取窗口開始時(shí)間
long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {WindowWrap<T> old = array.get(idx);
// 如果索引對(duì)應(yīng)的窗口還沒有創(chuàng)建就新建窗口對(duì)象if (old == null) {/** B0 B1 B2 NULL B4* ||_______|_______|_______|_______|_______||___* 200 400 600 800 1000 1200 timestamp* ^* time=888* bucket is empty, so create new and update** If the old bucket is absent, then we create a new bucket at {@code windowStart},* then try to update circular array via a CAS operation. Only one thread can* succeed to update, while other threads yield its time slice.*/WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 原子操作,如果設(shè)置成功就返回,如果設(shè)置失敗就讓出cpu,等待再次被翻牌if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield();}
// 如果當(dāng)前窗口時(shí)間一致,說明當(dāng)前窗口還未過期,就返回該窗口對(duì)象} else if (windowStart == old.windowStart()) {/** B0 B1 B2 B3 B4* ||_______|_______|_______|_______|_______||___* 200 400 600 800 1000 1200 timestamp* ^* time=888* startTime of Bucket 3: 800, so it's up-to-date** If current {@code windowStart} is equal to the start timestamp of old bucket,* that means the time is within the bucket, so directly return the bucket.*/return old;
// 如果窗口開始時(shí)間過期了,就重置當(dāng)前窗口的開始時(shí)間為最新的時(shí)間} else if (windowStart > old.windowStart()) {/** (old)* B0 B1 B2 NULL B4* |_______||_______|_______|_______|_______|_______||___* ... 1200 1400 1600 1800 2000 2200 timestamp* ^* time=1676* startTime of Bucket 2: 400, deprecated, should be reset** If the start timestamp of old bucket is behind provided time, that means* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.* Note that the reset and clean-up operations are hard to be atomic,* so we need a update lock to guarantee the correctness of bucket update.** The update lock is conditional (tiny scope) and will take effect only when* bucket is deprecated, so in most cases it won't lead to performance loss.*/if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield();}
// 正常情況下不會(huì)走這里,因?yàn)闀r(shí)間是往前走的} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}
最終是在MetricBucket中用LongAdder做的原子加,LongAdder是jdk8的特性,這里sentinel直接挪了過來,避免要求sentinel用戶必須使用jdk8.為什么是LongAdder而不是AtomicLong,因?yàn)榍罢咴诓l(fā)下表現(xiàn)更優(yōu)異,具體區(qū)別請(qǐng)自行了解。
SystemSlot
通過之前統(tǒng)計(jì)節(jié)點(diǎn)中Constants.ENTRY_NODE這個(gè)node中的數(shù)據(jù)檢查全局qps等是否滿足要求。
AuthoritySlot
黑白名單匹配。通過com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleManager.loadRules(List<AuthorityRule>)設(shè)置規(guī)則。
FlowSlot
流量控制。通過com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.loadRules(List<FlowRule>)設(shè)置規(guī)則。
DegradeSlot
降級(jí)處理。通過com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager.loadRules(List<DegradeRule>)設(shè)置規(guī)則。
轉(zhuǎn)載于:https://www.cnblogs.com/restart30/p/10796725.html
總結(jié)
以上是生活随笔為你收集整理的alibaba sentinel限流组件 源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 项目Alpha冲刺--5/10
- 下一篇: 广告小程序后端开发(4.导入地区数据,修