通过transmittable-thread-local源码理解线程池线程本地变量传递的原理
前提
最近一兩個月花了很大的功夫做UCloud服務和中間件遷移到阿里云的工作,沒什么空閑時間擼文。想起很早之前寫過ThreadLocal的源碼分析相關文章,里面提到了ThreadLocal存在一個不能向預先創建的線程中進行變量傳遞的局限性,剛好有一位HSBC的技術大牛前同事提到了團隊引入了transmittable-thread-local解決了此問題。
借著這個契機,順便clone了transmittable-thread-local源碼進行分析,這篇文章會把ThreadLocal和InheritableThreadLocal的局限性分析完畢,并且從一些基本原理以及設計模式的運用分析transmittable-thread-local(下文簡稱為TTL)整套框架的實現。
如果對線程池和ThreadLocal不熟悉的話,可以先參看一下前置文章:
《JUC同步器框架AbstractQueuedSynchronizer源碼圖文分析》
《JUC線程池ThreadPoolExecutor源碼分析》
《ThreadLocal源碼分析-黃金分割數的使用》
這篇文章前后花了兩周時間編寫,行文比價干硬,文字比較多(接近5W字),希望帶著耐心閱讀。
父子線程的變量傳遞
在Java中沒有明確給出一個API可以基于子線程實例獲取其父線程實例,有一個相對可行的方案就是在創建子線程Thread實例的時候獲取當前線程的實例,用到的API是Thread#currentThread():
1public?class?Thread?implements?Runnable?{ 2 3????//?省略其他代碼 4 5????@HotSpotIntrinsicCandidate 6????public?static?native?Thread?currentThread(); 7 8????//?省略其他代碼 9}Thread#currentThread()方法是一個靜態本地方法,它是由JVM實現,這是在JDK中唯一可以獲取父線程實例的API。一般而言,如果想在子線程實例中得到它的父線程實例,那么需要像如下這樣操作:
1public?class?InheritableThread?{23????public?static?void?main(String[]?args)?throws?Exception{4????????//?父線程就是main線程5????????Thread?parentThread?=?Thread.currentThread();6????????Thread?childThread?=?new?Thread(()->?{7????????????System.out.println("Parent?thread?is:"?+?parentThread.getName());8????????},"childThread");9????????childThread.start(); 10????????TimeUnit.SECONDS.sleep(Long.MAX_VALUE); 11????} 12} 13//?輸出結果: 14Parent?thread?is:main類似地,如果我們想把一個父子線程共享的變量實例傳遞,也可以這樣做:
1public?class?InheritableVars?{23????public?static?void?main(String[]?args)?throws?Exception?{4????????//?父線程就是main線程5????????Thread?parentThread?=?Thread.currentThread();6????????final?Var?var?=?new?Var();7????????var.setValue1("var1");8????????var.setValue2("var2");9????????Thread?childThread?=?new?Thread(()?->?{ 10????????????System.out.println("Parent?thread?is:"?+?parentThread.getName()); 11????????????methodFrame1(var); 12????????},?"childThread"); 13????????childThread.start(); 14????????TimeUnit.SECONDS.sleep(Long.MAX_VALUE); 15????} 16 17????private?static?void?methodFrame1(Var?var)?{ 18????????methodFrame2(var); 19????} 20 21????private?static?void?methodFrame2(Var?var)?{ 22 23????} 24 25????@Data 26????private?static?class?Var?{ 27 28????????private?Object?value1; 29????????private?Object?value2; 30????} 31}這種做法其實是可行的,子線程調用的方法棧中的所有方法都必須顯示傳入需要從父線程傳遞過來的參數引用Var實例,這樣就會產生硬編碼問題,既不靈活也導致方法不能復用,所以才衍生出線程本地變量Thread Local,具體的實現有ThreadLocal和InheritableThreadLocal。它們兩者的基本原理是類似的,實際上所有的變量實例是緩存在線程實例的變量ThreadLocal.ThreadLocalMap中,線程本地變量實例都只是線程實例獲取ThreadLocal.ThreadLocalMap的一道橋梁:
1public?class?Thread?implements?Runnable?{23????//?省略其他代碼45????//?KEY為ThreadLocal實例,VALUE為具體的值6????ThreadLocal.ThreadLocalMap?threadLocals?=?null;78????//?KEY為InheritableThreadLocal實例,VALUE為具體的值9????ThreadLocal.ThreadLocalMap?inheritableThreadLocals?=?null; 10 11????//?省略其他代碼 12}ThreadLocal和InheritableThreadLocal之間的區別可以結合源碼分析一下(見下一小節)。前面的分析聽起來如果覺得抽象的話,可以自己寫幾個類推敲一下,假如線程其實叫ThrowableThread,而線程本地變量叫ThrowableThreadLocal,那么它們之間的關系如下:
1public?class?Actor?{23????static?ThrowableThreadLocal?THREAD_LOCAL?=?new?ThrowableThreadLocal();45????public?static?void?main(String[]?args)?throws?Exception?{6????????ThrowableThread?throwableThread?=?new?ThrowableThread()?{78????????????@Override9????????????public?void?run()?{ 10????????????????methodFrame1(); 11????????????} 12????????}; 13????????throwableThread.start(); 14????} 15 16????private?static?void?methodFrame1()?{ 17????????THREAD_LOCAL.set("throwable"); 18????????methodFrame2(); 19????} 20 21????private?static?void?methodFrame2()?{ 22????????System.out.println(THREAD_LOCAL.get()); 23????} 24 25????/** 26?????*?這個類暫且認為是java.lang.Thread 27?????*/ 28????private?static?class?ThrowableThread?implements?Runnable?{ 29 30????????ThrowableThreadLocal.ThrowableThreadLocalMap?threadLocalMap; 31 32????????@Override 33????????public?void?run()?{ 34 35????????} 36 37????????//?這里模擬VM的實現,返回ThrowableThread自身,大家先認為不是返回NULL 38????????public?static?ThrowableThread?getCurrentThread()?{ 39//????????????return?new?ThrowableThread(); 40????????????return?null;???//?<---?假設這里在VM的實現里面返回的不是NULL而是當前的ThrowableThread 41????????} 42 43????????public?void?start()?{ 44????????????run(); 45????????} 46????} 47 48????private?static?class?ThrowableThreadLocal?{ 49 50????????public?ThrowableThreadLocal()?{ 51 52????????} 53 54????????public?void?set(Object?value)?{ 55????????????ThrowableThread?currentThread?=?ThrowableThread.getCurrentThread(); 56????????????assert?null?!=?currentThread; 57????????????ThrowableThreadLocalMap?threadLocalMap?=?currentThread.threadLocalMap; 58????????????if?(null?==?threadLocalMap)?{ 59????????????????threadLocalMap?=?currentThread.threadLocalMap?=?new?ThrowableThreadLocalMap(); 60????????????} 61????????????threadLocalMap.put(this,?value); 62????????} 63 64????????public?Object?get()?{ 65????????????ThrowableThread?currentThread?=?ThrowableThread.getCurrentThread(); 66????????????assert?null?!=?currentThread; 67????????????ThrowableThreadLocalMap?threadLocalMap?=?currentThread.threadLocalMap; 68????????????if?(null?==?threadLocalMap)?{ 69????????????????return?null; 70????????????} 71????????????return?threadLocalMap.get(this); 72????????} 73 74????????//?這里其實在ThreadLocal中用的是WeakHashMap 75????????public?static?class?ThrowableThreadLocalMap?extends?HashMap<ThrowableThreadLocal,?Object>?{ 76 77????????} 78????} 79}上面的代碼不能運行,只是通過一個自定義的實現說明一下其中的原理和關系。
ThreadLocal和InheritableThreadLocal的局限性
InheritableThreadLocal是ThreadLocal的子類,它們之間的聯系是:兩者都是線程Thread實例獲取ThreadLocal.ThreadLocalMap的一個中間變量。區別是:兩者控制ThreadLocal.ThreadLocalMap創建的時機和通過Thread實例獲取ThreadLocal.ThreadLocalMap在Thread實例中對應的屬性并不一樣,導致兩者的功能有一點差別。通俗來說兩者的功能聯系和區別是:
ThreadLocal:單個線程生命周期強綁定,只能在某個線程的生命周期內對ThreadLocal進行存取,不能跨線程存取。
InheritableThreadLocal:(1)可以無感知替代ThreadLocal的功能,當成ThreadLocal使用。(2)明確父-子線程關系的前提下,繼承(拷貝)父線程的線程本地變量緩存過的變量,而這個拷貝的時機是子線程Thread實例化時候進行的,也就是子線程實例化完畢后已經完成了InheritableThreadLocal變量的拷貝,這是一個變量傳遞的過程。
上面提到的兩點可以具體參看ThreadLocal、InheritableThreadLocal和Thread三個類的源碼,這里筆者把一些必要的注釋和源碼段貼出:
1//?-->?java.lang.Thread類的源碼片段2public?class?Thread?implements?Runnable?{34????//?省略其他代碼?56????//?這是Thread最基本的構造函數7????private?Thread(ThreadGroup?g,?Runnable?target,?String?name,8???????????????????long?stackSize,?AccessControlContext?acc,9???????????????????boolean?inheritThreadLocals)?{1011????????//?省略其他代碼1213????????Thread?parent?=?currentThread();14????????this.group?=?g;15????????this.daemon?=?parent.isDaemon();16????????this.priority?=?parent.getPriority();17????????if?(security?==?null?||?isCCLOverridden(parent.getClass()))18????????????this.contextClassLoader?=?parent.getContextClassLoader();19????????else20????????????this.contextClassLoader?=?parent.contextClassLoader;21????????this.inheritedAccessControlContext?=22????????????????acc?!=?null???acc?:?AccessController.getContext();23????????this.target?=?target;24????????setPriority(priority);25????????//?inheritThreadLocals一般情況下為true26????????//?當前子線程實例拷貝父線程的inheritableThreadLocals屬性,創建一個新的ThreadLocal.ThreadLocalMap實例賦值到自身的inheritableThreadLocals屬性27????????if?(inheritThreadLocals?&&?parent.inheritableThreadLocals?!=?null)28????????????this.inheritableThreadLocals?=?ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);29????????this.stackSize?=?stackSize;30????????this.tid?=?nextThreadID();31????}3233????//?省略其他代碼34}3536//?-->?java.lang.ThreadLocal源碼片段37public?class?ThreadLocal<T>?{3839????//?省略其他代碼?4041????public?void?set(T?value)?{42????????Thread?t?=?Thread.currentThread();43????????//?通過當前線程獲取線程實例中的threadLocals44????????ThreadLocalMap?map?=?getMap(t);45????????//?線程實例中的threadLocals為NULL,實例則創建一個ThreadLocal.ThreadLocalMap實例添加當前ThreadLocal->VALUE到ThreadLocalMap中,如果已經存在ThreadLocalMap則進行覆蓋對應的Entry46????????if?(map?!=?null)?{47????????????map.set(this,?value);48????????}?else?{49????????????createMap(t,?value);50????????}51????}5253????//?通過線程實例獲取該線程的threadLocals實例,其實是ThreadLocal.ThreadLocalMap類型的屬性54????ThreadLocalMap?getMap(Thread?t)?{55????????return?t.threadLocals;56????}575859????public?T?get()?{60????????Thread?t?=?Thread.currentThread();61????????//?通過當前線程獲取線程實例中的threadLocals,再獲取ThreadLocal.ThreadLocalMap中匹配上KEY為當前ThreadLocal實例的Entry對應的VALUE62????????ThreadLocalMap?map?=?getMap(t);63????????if?(map?!=?null)?{64????????????ThreadLocalMap.Entry?e?=?map.getEntry(this);65????????????if?(e?!=?null)?{66????????????????@SuppressWarnings("unchecked")67????????????????T?result?=?(T)e.value;68????????????????return?result;69????????????}70????????}71????????//?找不到則嘗試初始化ThreadLocal.ThreadLocalMap72????????return?setInitialValue();73????}7475????//?如果不存在ThreadLocal.ThreadLocalMap,則通過初始化initialValue()方法的返回值,構造一個ThreadLocal.ThreadLocalMap76????private?T?setInitialValue()?{77????????T?value?=?initialValue();78????????Thread?t?=?Thread.currentThread();79????????ThreadLocalMap?map?=?getMap(t);80????????if?(map?!=?null)81????????????map.set(this,?value);82????????else83????????????createMap(t,?value);84????????return?value;85????}8687????//?省略其他代碼?88}8990//?-->?java.lang.InheritableThreadLocal源碼?-?太簡單,全量貼出91public?class?InheritableThreadLocal<T>?extends?ThreadLocal<T>?{9293????//?這個方法使用在線程Thread的構造函數里面ThreadLocal.createInheritedMap(),基于父線程InheritableThreadLocal的屬性創建子線程的InheritableThreadLocal屬性,它的返回值決定了拷貝父線程的屬性時候傳入子線程的值94????protected?T?childValue(T?parentValue)?{95????????return?parentValue;96????}9798????//?覆蓋獲取線程實例中的綁定的ThreadLocalMap為Thread#inheritableThreadLocals,這個方法其實是覆蓋了ThreadLocal中對應的方法,應該加@Override注解99????ThreadLocalMap?getMap(Thread?t)?{ 100???????return?t.inheritableThreadLocals; 101????} 102 103????//?覆蓋創建ThreadLocalMap的邏輯,賦值到線程實例中的inheritableThreadLocals,而不是threadLocals,這個方法其實是覆蓋了ThreadLocal中對應的方法,應該加@Override注解 104????void?createMap(Thread?t,?T?firstValue)?{ 105????????t.inheritableThreadLocals?=?new?ThreadLocalMap(this,?firstValue); 106????} 107}一定要注意,這里的setInitialValue()方法很重要,一個新的線程Thread實例在初始化(對于InheritableThreadLocal而言繼承父線程的線程本地變量)或者是首次調用ThreadLocal#set(),會通過此setInitialValue()方法去構造一個全新的ThreadLocal.ThreadLocalMap,會直接使用createMap()方法。
以前面提到的兩個例子,貼一個圖加深理解:
Example-1:
Example-2:
ThreadLocal、InheritableThreadLocal的最大局限性就是:無法為預先創建好(未投入使用)的線程實例傳遞變量(準確來說是首次傳遞某些場景是可行的,而后面由于線程池中的線程是復用的,無法進行更新或者修改變量的傳遞值),泛線程池Executor體系、TimerTask和ForkJoinPool等一般會預先創建(核心)線程,也就它們都是無法在線程池中由預創建的子線程執行的Runnable任務實例中使用。例如下面的方式會導致參數傳遞失敗:
1public?class?InheritableThreadForExecutor?{23????static?final?InheritableThreadLocal<String>?ITL?=?new?InheritableThreadLocal<>();4????static?final?Executor?EXECUTOR?=?Executors.newFixedThreadPool(1);56????public?static?void?main(String[]?args)?throws?Exception?{7????????ITL.set("throwable");8????????EXECUTOR.execute(()?->?{9????????????System.out.println(ITL.get()); 10????????}); 11????????ITL.set("doge"); 12????????EXECUTOR.execute(()?->?{ 13????????????System.out.println(ITL.get()); 14????????}); 15????????TimeUnit.SECONDS.sleep(Long.MAX_VALUE); 16????} 17} 18//?輸出結果: 19throwable 20throwable???#?<---?可見此處參數傳遞出現異常首次變量傳遞成功是因為線程池中的所有子線程都是派生自main線程。
TTL的簡單使用
TTL的使用方式在它的項目README.md或者項目中的單元測試有十分詳細的介紹,先引入依賴com.alibaba:transmittable-thread-local:2.11.4,這里演示一個例子:
1//?父-子線程2public?class?TtlSample1?{34????static?TransmittableThreadLocal<String>?TTL?=?new?TransmittableThreadLocal<>();56????public?static?void?main(String[]?args)?throws?Exception?{7????????new?Thread(()?->?{8????????????//?在父線程中設置變量9????????????TTL.set("throwable"); 10????????????new?Thread(TtlRunnable.get(()?->?{ 11????????????????methodFrame1(); 12????????????}),?"childThread").start(); 13????????},?"parentThread").start(); 14????????TimeUnit.SECONDS.sleep(Long.MAX_VALUE); 15????} 16 17????private?static?void?methodFrame1()?{ 18????????methodFrame2(); 19????} 20 21????private?static?void?methodFrame2()?{ 22????????System.out.println(TTL.get()); 23????} 24} 25//?輸出: 26throwable 27 28//?線程池 29public?class?TtlSample2?{ 30 31????static?TransmittableThreadLocal<String>?TTL?=?new?TransmittableThreadLocal<>(); 32????static?final?Executor?EXECUTOR?=?Executors.newFixedThreadPool(1); 33 34????public?static?void?main(String[]?args)?throws?Exception?{ 35????????TTL.set("throwable"); 36????????EXECUTOR.execute(TtlRunnable.get(()?->?{ 37????????????System.out.println(TTL.get()); 38????????})); 39????????TTL.set("doge"); 40????????EXECUTOR.execute(TtlRunnable.get(()?->?{ 41????????????System.out.println(TTL.get()); 42????????})); 43????????TimeUnit.SECONDS.sleep(Long.MAX_VALUE); 44????} 45} 46//?輸出: 47throwable 48dogeTTL實現的基本原理
TTL設計上使用了大量的委托(Delegate),委托是C#里面的說法,對標Java的設計模式就是代理模式。舉個簡單的例子:
1@Slf4j2public?class?StaticDelegate?{34????public?static?void?main(String[]?args)?throws?Exception?{5????????new?RunnableDelegate(()?->?log.info("Hello?World!")).run();6????}78????@Slf4j9????@RequiredArgsConstructor 10????private?static?final?class?RunnableDelegate?implements?Runnable?{ 11 12????????private?final?Runnable?runnable; 13 14????????@Override 15????????public?void?run()?{ 16????????????try?{ 17????????????????log.info("Before?run..."); 18????????????????runnable.run(); 19????????????????log.info("After?run..."); 20????????????}?finally?{ 21????????????????log.info("Finally?run..."); 22????????????} 23????????} 24????} 25} 26//?輸出結果: 2723:45:27.763?[main]?INFO?club.throwable.juc.StaticDelegate$RunnableDelegate?-?Before?run... 2823:45:27.766?[main]?INFO?club.throwable.juc.StaticDelegate?-?Hello?World! 2923:45:27.766?[main]?INFO?club.throwable.juc.StaticDelegate$RunnableDelegate?-?After?run... 3023:45:27.766?[main]?INFO?club.throwable.juc.StaticDelegate$RunnableDelegate?-?Finally?run...委托如果使用純熟的話,可以做出很多十分有用的功能,例如可以基于Micrometer去統計任務的執行時間,上報到Prometheus,然后用Grafana做監控和展示:
1//?需要引入io.micrometer:micrometer-core:${version}2@Slf4j3public?class?MeterDelegate?{45????public?static?void?main(String[]?args)?throws?Exception?{6????????Executor?executor?=?Executors.newFixedThreadPool(1);7????????Runnable?task?=?()?->?{8????????????try?{9????????????????//?模擬耗時 10????????????????Thread.sleep(1000); 11????????????}?catch?(Exception?ignore)?{ 12 13????????????} 14????????}; 15????????Map<String,?String>?tags?=?new?HashMap<>(8); 16????????tags.put("_class",?"MeterDelegate"); 17????????executor.execute(new?MicrometerDelegate(task,?"test-task",?tags)); 18????????TimeUnit.SECONDS.sleep(Long.MAX_VALUE); 19????} 20 21????@Slf4j 22????@RequiredArgsConstructor 23????private?static?final?class?MicrometerDelegate?implements?Runnable?{ 24 25????????private?final?Runnable?runnable; 26????????private?final?String?taskType; 27????????private?final?Map<String,?String>?tags; 28 29????????@Override 30????????public?void?run()?{ 31????????????long?start?=?System.currentTimeMillis(); 32????????????try?{ 33????????????????runnable.run(); 34????????????}?finally?{ 35????????????????long?end?=?System.currentTimeMillis(); 36????????????????List<Tag>?tagsList?=?Lists.newArrayList(); 37????????????????Optional.ofNullable(tags).ifPresent(x?->?x.forEach((k,?v)?->?{ 38????????????????????tagsList.add(Tag.of(k,?v)); 39????????????????})); 40????????????????Metrics.summary(taskType,?tagsList).record(end?-?start); 41????????????} 42????????} 43????} 44}委托理論上只要不線程棧溢出,可以無限層級地包裝,有點像洋蔥的結構,原始的目標方法會被包裹在最里面并且最后執行:
1????public?static?void?main(String[]?args)?throws?Exception?{2????????Runnable?target?=?()?->?log.info("target");3????????Delegate?level1?=?new?Delegate(target);4????????Delegate?level2?=?new?Delegate(level1);5????????Delegate?level3?=?new?Delegate(level2);6????????//?......7????}89????@RequiredArgsConstructor 10????static?class?Delegate?implements?Runnable{ 11 12????????private?final?Runnable?runnable; 13 14????????@Override 15????????public?void?run()?{ 16????????????runnable.run(); 17????????} 18????}當然,委托的層級越多,代碼結構就會越復雜,不利于理解和維護。多層級委托這個洋蔥結構,再配合Java反射API剝離對具體方法調用的依賴,就是Java中切面編程的普遍原理,spring-aop就是這樣實現的。委托如果再結合Agent和字節碼增強(使用ASM、Javassist等),可以實現類加載時期替換對應的Runnable、Callable或者一般接口的實現,這樣就能無感知完成了增強功能。此外,TTL中還使用了模板方法模式,如:
1@Slf4j2public?class?TemplateMethod?{34????public?static?void?main(String[]?args)?throws?Exception?{5????????Runnable?runnable?=?()?->?log.info("Hello?World!");6????????Template?template?=?new?Template(runnable)?{7????????????@Override8????????????protected?void?beforeExecute()?{9????????????????log.info("BeforeExecute..."); 10????????????} 11 12????????????@Override 13????????????protected?void?afterExecute()?{ 14????????????????log.info("AfterExecute..."); 15????????????} 16????????}; 17????????template.run(); 18????} 19 20????@RequiredArgsConstructor 21????static?abstract?class?Template?implements?Runnable?{ 22 23????????private?final?Runnable?runnable; 24 25????????protected?void?beforeExecute()?{ 26 27????????} 28 29????????@Override 30????????public?void?run()?{ 31????????????beforeExecute(); 32????????????runnable.run(); 33????????????afterExecute(); 34????????} 35 36????????protected?void?afterExecute()?{ 37 38????????} 39????} 40} 41//?輸出結果: 4200:25:32.862?[main]?INFO?club.throwable.juc.TemplateMethod?-?BeforeExecute... 4300:25:32.865?[main]?INFO?club.throwable.juc.TemplateMethod?-?Hello?World! 4400:25:32.865?[main]?INFO?club.throwable.juc.TemplateMethod?-?AfterExecute...分析了兩種設計模式,下面簡單理解一下TTL實現的偽代碼:
1#?TTL?extends?InheritableThreadLocal 2#?Holder?of?TTL?->?InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,??>>?[??=>?NULL] 3(1)創建一個全局的Holder,用于保存父線程(或者明確了父線程的子線程)的TTL對象,這里注意,是TTL對象,Holder是當作Set使用 4(2)(父)線程A中使用了TTL,則所有設置的變量會被TTL捕獲 5(3)(子)線程B使用了TtlRunnable(Runnable的TTL實現,使用了前面提到的委托,像Callable的實現是TtlCallable),會重放所有存儲在TTL中的,來自于線程A的存儲變量 6(4)線程B重放完畢后,清理線程B獨立產生的ThreadLocal變量,歸還變TTL的變量主要就是這幾步,里面的話術有點抽象,后面一節分析源碼的時候會詳細講解。
TTL的源碼分析
主要分析:
框架的骨架。
核心類TransmittableThreadLocal。
發射器Transmitter。
捕獲、重放和復原。
Agent模塊。
TTL框架骨架
TTL是一個十分精悍的框架,它依賴少量的類實現了比較強大的功能,除了提供給用戶使用的API,還提供了基于Agent和字節碼增強實現了無感知增強泛線程池對應類的功能,這一點是比較驚艷的。這里先分析編程式的API,再簡單分析Agent部分的實現。筆者閱讀TTL框架的時間是2020年五一勞動節前后,當前的最新發行版本為2.11.4。TTL的項目結構很簡單:
1-?transmittable-thread-local 2??-?com.alibaba.ttl 3???-?spi???SPI接口和一些實現 4???-?threadpool???線程池增強,包括ThreadFactory和線程池的Wrapper等 5?????-?agent???線程池的Agent實現相關 6???最外層的包有一些Wrapper的實現和TTL先看spi包:
1-?spi 2??TtlAttachments 3??TtlAttachmentsDelegate 4??TtlEnhanced 5??TtlWrapperTtlEnhanced是TTL的標識接口(空接口),標識具體的組件被TTL增強:
1public?interface?TtlEnhanced?{ 2 3}通過instanceof關鍵字就可以判斷具體的實現是否TTL增強過的組件。TtlWrapper接口繼承自接口TtlEnhanced,用于標記實現類可以解包裝獲得原始實例:
1public?interface?TtlWrapper<T>?extends?TtlEnhanced?{ 2 3????//?返回解包裝實例,實際是就是原始實例 4????@NonNull 5????T?unwrap(); 6}TtlAttachments接口也是繼承自接口TtlEnhanced,用于為TTL添加K-V結構的附件,TtlAttachmentsDelegate是其實現類,K-V的存儲實際上是委托給ConcurrentHashMap:
1public?interface?TtlAttachments?extends?TtlEnhanced?{23????//?添加K-V附件4????void?setTtlAttachment(@NonNull?String?key,?Object?value);56????//?通過KEY獲取值7????<T>?T?getTtlAttachment(@NonNull?String?key);89????//?標識自動包裝的KEY,Agent模式會使用自動包裝,這個時候會傳入一個附件的K-V,其中KEY就是KEY_IS_AUTO_WRAPPER 10????String?KEY_IS_AUTO_WRAPPER?=?"ttl.is.auto.wrapper"; 11} 12 13//?TtlAttachmentsDelegate 14public?class?TtlAttachmentsDelegate?implements?TtlAttachments?{ 15 16????private?final?ConcurrentMap<String,?Object>?attachments?=?new?ConcurrentHashMap<String,?Object>(); 17 18????@Override 19????public?void?setTtlAttachment(@NonNull?String?key,?Object?value)?{ 20????????attachments.put(key,?value); 21????} 22 23????@Override 24????@SuppressWarnings("unchecked") 25????public?<T>?T?getTtlAttachment(@NonNull?String?key)?{ 26????????return?(T)?attachments.get(key); 27????} 28}因為TTL的實現覆蓋了泛線程池Executor、ExecutorService、ScheduledExecutorService、ForkJoinPool和TimerTask(在TTL中組件已經標記為過期,推薦使用ScheduledExecutorService),范圍比較廣,短篇幅無法分析所有的源碼,而且它們的實現思路是基本一致的,筆者下文只會挑選Executor的實現路線進行分析。
核心類TransmittableThreadLocal
TransmittableThreadLocal是TTL的核心類,TTL框架就是用這個類來命名的。先看它的構造函數和關鍵屬性:
1//?函數式接口,TTL拷貝器2@FunctionalInterface3public?interface?TtlCopier<T>?{45????//?拷貝父屬性6????T?copy(T?parentValue);7}89public?class?TransmittableThreadLocal<T>?extends?InheritableThreadLocal<T>?implements?TtlCopier<T>?{ 10 11????//?日志句柄,使用的不是SLF4J的接口,而是java.util.logging的實現 12????private?static?final?Logger?logger?=?Logger.getLogger(TransmittableThreadLocal.class.getName()); 13 14????//?是否禁用忽略NULL值的語義 15????private?final?boolean?disableIgnoreNullValueSemantics; 16 17????//?默認是false,也就是不禁用忽略NULL值的語義,也就是忽略NULL值,也就是默認的話,NULL值傳入不會覆蓋原來已經存在的值 18????public?TransmittableThreadLocal()?{ 19????????this(false); 20????} 21 22????//?可以通過手動設置,去覆蓋IgnoreNullValue的語義,如果設置為true,則是支持NULL值的設置,設置為true的時候,與ThreadLocal的語義一致 23????public?TransmittableThreadLocal(boolean?disableIgnoreNullValueSemantics)?{ 24????????this.disableIgnoreNullValueSemantics?=?disableIgnoreNullValueSemantics; 25????} 26 27????//?先忽略其他代碼 28}disableIgnoreNullValueSemantics屬性相關可以查看Issue157,下文分析方法的時候也會說明具體的場景。TransmittableThreadLocal繼承自InheritableThreadLocal,本質就是ThreadLocal,那它到底怎么樣保證變量可以在線程池中的線程傳遞?接著分析其他所有方法:
1public?class?TransmittableThreadLocal<T>?extends?InheritableThreadLocal<T>?implements?TtlCopier<T>?{23????//?拷貝器的拷貝方法實現4????public?T?copy(T?parentValue)?{5????????return?parentValue;6????}78????//?模板方法,留給子類實現,在TtlRunnable或者TtlCallable執行前回調9????protected?void?beforeExecute()?{10????}1112????//?模板方法,留給子類實現,在TtlRunnable或者TtlCallable執行后回調13????protected?void?afterExecute()?{14????}1516????//?獲取值,直接從InheritableThreadLocal#get()獲取17????@Override18????public?final?T?get()?{19????????T?value?=?super.get();20????????//?如果值不為NULL?或者?禁用了忽略空值的語義(也就是和ThreadLocal語義一致),則重新添加TTL實例自身到存儲器21????????if?(disableIgnoreNullValueSemantics?||?null?!=?value)?addThisToHolder();22????????return?value;23????}2425????@Override26????public?final?void?set(T?value)?{27????????//?如果不禁用忽略空值的語義,也就是需要忽略空值,并且設置的入參值為空,則做一次徹底的移除,包括從存儲器移除TTL自身實例,TTL(ThrealLocalMap)中也移除對應的值28????????if?(!disableIgnoreNullValueSemantics?&&?null?==?value)?{29????????????//?may?set?null?to?remove?value30????????????remove();31????????}?else?{32????????????//?TTL(ThrealLocalMap)中設置對應的值33????????????super.set(value);34????????????//?添加TTL實例自身到存儲器35????????????addThisToHolder();36????????}37????}3839????//?從存儲器移除TTL自身實例,從TTL(ThrealLocalMap)中移除對應的值40????@Override41????public?final?void?remove()?{42????????removeThisFromHolder();43????????super.remove();44????}4546????//?從TTL(ThrealLocalMap)中移除對應的值47????private?void?superRemove()?{48????????super.remove();49????}5051????//?拷貝值,主要是拷貝get()的返回值52????private?T?copyValue()?{53????????return?copy(get());54????}5556????//?存儲器,本身就是一個InheritableThreadLocal(ThreadLocal)57????//?它的存放對象是WeakHashMap<TransmittableThreadLocal<Object>,??>類型,而WeakHashMap的VALUE總是為NULL,這里當做Set容器使用,WeakHashMap支持NULL值58????private?static?InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,??>>?holder?=59????????????new?InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,??>>()?{60????????????????@Override61????????????????protected?WeakHashMap<TransmittableThreadLocal<Object>,??>?initialValue()?{62????????????????????return?new?WeakHashMap<TransmittableThreadLocal<Object>,?Object>();63????????????????}6465????????????????@Override66????????????????protected?WeakHashMap<TransmittableThreadLocal<Object>,??>?childValue(WeakHashMap<TransmittableThreadLocal<Object>,??>?parentValue)?{67????????????????????//?注意這里的WeakHashMap總是拷貝父線程的值68????????????????????return?new?WeakHashMap<TransmittableThreadLocal<Object>,?Object>(parentValue);69????????????????}70????????????};7172????//?添加TTL自身實例到存儲器,不存在則添加策略73????@SuppressWarnings("unchecked")74????private?void?addThisToHolder()?{75????????if?(!holder.get().containsKey(this))?{76????????????holder.get().put((TransmittableThreadLocal<Object>)?this,?null);?//?WeakHashMap?supports?null?value.77????????}78????}7980????//?從存儲器移除TTL自身的實例81????private?void?removeThisFromHolder()?{82????????holder.get().remove(this);83????}8485????//?執行目標方法,isBefore決定回調beforeExecute還是afterExecute,注意此回調方法會吞掉所有的異常只打印日志86????private?static?void?doExecuteCallback(boolean?isBefore)?{87????????for?(TransmittableThreadLocal<Object>?threadLocal?:?holder.get().keySet())?{88????????????try?{89????????????????if?(isBefore)?threadLocal.beforeExecute();90????????????????else?threadLocal.afterExecute();91????????????}?catch?(Throwable?t)?{92????????????????if?(logger.isLoggable(Level.WARNING))?{93????????????????????logger.log(Level.WARNING,?"TTL?exception?when?"?+?(isBefore???"beforeExecute"?:?"afterExecute")?+?",?cause:?"?+?t.toString(),?t);94????????????????}95????????????}96????????}97????}9899????//?DEBUG模式下打印TTL里面的所有值 100????static?void?dump(@Nullable?String?title)?{ 101????????if?(title?!=?null?&&?title.length()?>?0)?{ 102????????????System.out.printf("Start?TransmittableThreadLocal[%s]?Dump...%n",?title); 103????????}?else?{ 104????????????System.out.println("Start?TransmittableThreadLocal?Dump..."); 105????????} 106 107????????for?(TransmittableThreadLocal<Object>?threadLocal?:?holder.get().keySet())?{ 108????????????System.out.println(threadLocal.get()); 109????????} 110????????System.out.println("TransmittableThreadLocal?Dump?end!"); 111????} 112 113????//?DEBUG模式下打印TTL里面的所有值 114????static?void?dump()?{ 115????????dump(null); 116????} 117 118????//?省略靜態類Transmitter的實現代碼 119}這里一定要記住holder是全局靜態的,并且它自身也是一個InheritableThreadLocal(get()方法也是線程隔離的),它實際上就是線程管理所有TransmittableThreadLocal的橋梁。這里可以考慮一個單線程的例子來說明TransmittableThreadLocal的存儲架構:
1public?class?TtlSample3?{23????static?TransmittableThreadLocal<String>?TTL1?=?new?TransmittableThreadLocal<>();4????static?TransmittableThreadLocal<String>?TTL2?=?new?TransmittableThreadLocal<>();5????static?TransmittableThreadLocal<String>?TTL3?=?new?TransmittableThreadLocal<>();67????public?static?void?main(String[]?args)?throws?Exception?{8????????TTL1.set("VALUE-1");9????????TTL2.set("VALUE-2"); 10????????TTL3.set("VALUE-3"); 11????} 12}這里簡化了例子,只演示了單線程的場景,圖中的一些對象的哈希碼有可能每次啟動JVM實例都不一樣,這里只是做示例:
注釋里面也提到,holder里面的WeakHashMap是當成Set容器使用,映射的值都是NULL,每次遍歷它的所有KEY就能獲取holder里面的所有的TransmittableThreadLocal實例,它是一個全局的存儲器,但是本身是一個InheritableThreadLocal,多線程共享后的映射關系會相對復雜:
再聊一下disableIgnoreNullValueSemantics的作用,默認情況下disableIgnoreNullValueSemantics=false,TTL如果設置NULL值,會直接從holder移除對應的TTL實例,在TTL#get()方法被調用的時候,如果原來持有的屬性不為NULL,該TTL實例會重新加到holder。如果設置disableIgnoreNullValueSemantics=true,則set(null)的語義和ThreadLocal一致。見下面的例子:
1public?class?TtlSample4?{23????static?TransmittableThreadLocal<Integer>?TL1?=?new?TransmittableThreadLocal<Integer>(false)?{4????????@Override5????????protected?Integer?initialValue()?{6????????????return?5;7????????}89????????@Override 10????????protected?Integer?childValue(Integer?parentValue)?{ 11????????????return?10; 12????????} 13????}; 14 15????static?TransmittableThreadLocal<Integer>?TL2?=?new?TransmittableThreadLocal<Integer>(true)?{ 16????????@Override 17????????protected?Integer?initialValue()?{ 18????????????return?5; 19????????} 20 21????????@Override 22????????protected?Integer?childValue(Integer?parentValue)?{ 23????????????return?10; 24????????} 25????}; 26 27????public?static?void?main(String[]?args)?throws?Exception?{ 28????????TL1.set(null); 29????????TL2.set(null); 30????????Thread?t1?=?new?Thread(TtlRunnable.get(()?->?{ 31????????????System.out.println(String.format("Thread:%s,value:%s",?Thread.currentThread().getName(),?TL1.get())); 32????????}),?"T1"); 33 34????????Thread?t2?=?new?Thread(TtlRunnable.get(()?->?{ 35????????????System.out.println(String.format("Thread:%s,value:%s",?Thread.currentThread().getName(),?TL2.get())); 36????????}),?"T2"); 37????????t1.start(); 38????????t2.start(); 39????????TimeUnit.SECONDS.sleep(Long.MAX_VALUE); 40????} 41} 42//?輸出結果: 43Thread:T2,value:null 44Thread:T1,value:5這是因為框架的設計者不想把NULL作為有狀態的值,如果真的有需要保持和ThreadLocal一致的用法,可以在構造TransmittableThreadLocal實例的時候傳入true。
發射器Transmitter
發射器Transmitter是TransmittableThreadLocal的一個公有靜態類,它的核心功能是傳輸所有的TransmittableThreadLocal實例和提供靜態方法注冊當前線程的變量到其他線程。按照筆者閱讀源碼的習慣,先看構造函數和關鍵屬性:
1//?#?TransmittableThreadLocal#Transmitter2public?static?class?Transmitter?{34????//?保存手動注冊的ThreadLocal->TtlCopier映射,這里是因為部分API提供了TtlCopier給用戶實現5????private?static?volatile?WeakHashMap<ThreadLocal<Object>,?TtlCopier<Object>>?threadLocalHolder?=?new?WeakHashMap<ThreadLocal<Object>,?TtlCopier<Object>>();6????//?threadLocalHolder更變時候的監視器7????private?static?final?Object?threadLocalHolderUpdateLock?=?new?Object();8????//?標記WeakHashMap中的ThreadLocal的對應值為NULL的屬性,便于后面清理9????private?static?final?Object?threadLocalClearMark?=?new?Object(); 10 11????//?默認的拷貝器,影子拷貝,直接返回父值 12????private?static?final?TtlCopier<Object>?shadowCopier?=?new?TtlCopier<Object>()?{ 13????????@Override 14????????public?Object?copy(Object?parentValue)?{ 15????????????return?parentValue; 16????????} 17????}; 18 19????//?私有構造,說明只能通過靜態方法提供外部調用 20????private?Transmitter()?{ 21????????throw?new?InstantiationError("Must?not?instantiate?this?class"); 22????} 23 24????//?私有靜態類,快照,保存從holder中捕獲的所有TransmittableThreadLocal和外部手動注冊保存在threadLocalHolder的ThreadLocal的K-V映射快照 25????private?static?class?Snapshot?{ 26????????final?WeakHashMap<TransmittableThreadLocal<Object>,?Object>?ttl2Value; 27????????final?WeakHashMap<ThreadLocal<Object>,?Object>?threadLocal2Value; 28 29????????private?Snapshot(WeakHashMap<TransmittableThreadLocal<Object>,?Object>?ttl2Value,?WeakHashMap<ThreadLocal<Object>,?Object>?threadLocal2Value)?{ 30????????????this.ttl2Value?=?ttl2Value; 31????????????this.threadLocal2Value?=?threadLocal2Value; 32????????} 33????} 34}Transmitter在設計上是一個典型的工具類,外部只能調用其公有靜態方法。接著看其他靜態方法:
1//?#?TransmittableThreadLocal#Transmitter2public?static?class?Transmitter?{34????//#########################################?捕獲?###########################################################56????//?捕獲當前線程綁定的所有的TransmittableThreadLocal和已經注冊的ThreadLocal的值?-?使用了用時拷貝快照的策略7????//?筆者注:它一般在構造任務實例的時候被調用,因此當前線程相對于子線程或者線程池的任務就是父線程,其實本質是捕獲父線程的所有線程本地變量的值8????@NonNull9????public?static?Object?capture()?{10????????return?new?Snapshot(captureTtlValues(),?captureThreadLocalValues());11????}1213????//?新建一個WeakHashMap,遍歷TransmittableThreadLocal#holder中的所有TransmittableThreadLocal的Entry,獲取K-V,存放到這個新的WeakHashMap返回14????private?static?WeakHashMap<TransmittableThreadLocal<Object>,?Object>?captureTtlValues()?{15????????WeakHashMap<TransmittableThreadLocal<Object>,?Object>?ttl2Value?=?new?WeakHashMap<TransmittableThreadLocal<Object>,?Object>();16????????for?(TransmittableThreadLocal<Object>?threadLocal?:?holder.get().keySet())?{17????????????ttl2Value.put(threadLocal,?threadLocal.copyValue());18????????}19????????return?ttl2Value;20????}2122????//?新建一個WeakHashMap,遍歷threadLocalHolder中的所有ThreadLocal的Entry,獲取K-V,存放到這個新的WeakHashMap返回23????private?static?WeakHashMap<ThreadLocal<Object>,?Object>?captureThreadLocalValues()?{24????????final?WeakHashMap<ThreadLocal<Object>,?Object>?threadLocal2Value?=?new?WeakHashMap<ThreadLocal<Object>,?Object>();25????????for?(Map.Entry<ThreadLocal<Object>,?TtlCopier<Object>>?entry?:?threadLocalHolder.entrySet())?{26????????????final?ThreadLocal<Object>?threadLocal?=?entry.getKey();27????????????final?TtlCopier<Object>?copier?=?entry.getValue();28????????????threadLocal2Value.put(threadLocal,?copier.copy(threadLocal.get()));29????????}30????????return?threadLocal2Value;31????}3233????//#########################################?重放?###########################################################3435????//?重放capture()方法中捕獲的TransmittableThreadLocal和手動注冊的ThreadLocal中的值,本質是重新拷貝holder中的所有變量,生成新的快照36????//?筆者注:重放操作一般會在子線程或者線程池中的線程的任務執行的時候調用,因此此時的holder#get()拿到的是子線程的原來就存在的本地線程變量,重放操作就是把這些子線程原有的本地線程變量備份37????@NonNull38????public?static?Object?replay(@NonNull?Object?captured)?{39????????final?Snapshot?capturedSnapshot?=?(Snapshot)?captured;40????????return?new?Snapshot(replayTtlValues(capturedSnapshot.ttl2Value),?replayThreadLocalValues(capturedSnapshot.threadLocal2Value));41????}4243????//?重放所有的TTL的值44????@NonNull45????private?static?WeakHashMap<TransmittableThreadLocal<Object>,?Object>?replayTtlValues(@NonNull?WeakHashMap<TransmittableThreadLocal<Object>,?Object>?captured)?{46????????//?新建一個新的備份WeakHashMap,其實也是一個快照47????????WeakHashMap<TransmittableThreadLocal<Object>,?Object>?backup?=?new?WeakHashMap<TransmittableThreadLocal<Object>,?Object>();48????????//?這里的循環針對的是子線程,用于獲取的是子線程的所有線程本地變量49????????for?(final?Iterator<TransmittableThreadLocal<Object>>?iterator?=?holder.get().keySet().iterator();?iterator.hasNext();?)?{50????????????TransmittableThreadLocal<Object>?threadLocal?=?iterator.next();5152????????????//?拷貝holder當前線程(子線程)綁定的所有TransmittableThreadLocal的K-V結構到備份中53????????????backup.put(threadLocal,?threadLocal.get());5455????????????//?清理所有的非捕獲快照中的TTL變量,以防有中間過程引入的額外的TTL變量(除了父線程的本地變量)影響了任務執行后的重放操作56????????????//?簡單來說就是:移除所有子線程的不包含在父線程捕獲的線程本地變量集合的中所有子線程本地變量和對應的值57????????????/**58?????????????*?這個問題可以舉個簡單的例子:59?????????????*?static?TransmittableThreadLocal<Integer>?TTL?=?new?TransmittableThreadLocal<>();60?????????????*?61?????????????*?線程池中的子線程C中原來初始化的時候,在線程C中綁定了TTL的值為10087,C線程是核心線程不會主動銷毀。62?????????????*?63?????????????*?父線程P在沒有設置TTL值的前提下,調用了線程C去執行任務,那么在C線程的Runnable包裝類中通過TTL#get()就會獲取到10087,顯然是不符合預期的64?????????????*65?????????????*?所以,在C線程的Runnable包裝類之前之前,要從C線程的線程本地變量,移除掉不包含在父線程P中的所有線程本地變量,確保Runnable包裝類執行期間只能拿到父線程中捕獲到的線程本地變量66?????????????*67?????????????*?下面這個判斷和移除做的就是這個工作68?????????????*/69????????????if?(!captured.containsKey(threadLocal))?{70????????????????iterator.remove();71????????????????threadLocal.superRemove();72????????????}73????????}7475????????//?重新設置TTL的值到捕獲的快照中76????????//?其實真實的意圖是:把從父線程中捕獲的所有線程本地變量重寫設置到TTL中,本質上,子線程holder里面的TTL綁定的值會被刷新77????????setTtlValuesTo(captured);7879????????//?回調模板方法beforeExecute80????????doExecuteCallback(true);8182????????return?backup;83????}8485????//?提取WeakHashMap中的KeySet,遍歷所有的TransmittableThreadLocal,重新設置VALUE86????private?static?void?setTtlValuesTo(@NonNull?WeakHashMap<TransmittableThreadLocal<Object>,?Object>?ttlValues)?{87????????for?(Map.Entry<TransmittableThreadLocal<Object>,?Object>?entry?:?ttlValues.entrySet())?{88????????????TransmittableThreadLocal<Object>?threadLocal?=?entry.getKey();89????????????//?重新設置TTL值,本質上,當前線程(子線程)holder里面的TTL綁定的值會被刷新90????????????threadLocal.set(entry.getValue());91????????}92????}9394????//?重放所有的手動注冊的ThreadLocal的值95????private?static?WeakHashMap<ThreadLocal<Object>,?Object>?replayThreadLocalValues(@NonNull?WeakHashMap<ThreadLocal<Object>,?Object>?captured)?{96????????//?新建備份97????????final?WeakHashMap<ThreadLocal<Object>,?Object>?backup?=?new?WeakHashMap<ThreadLocal<Object>,?Object>();98????????//?注意這里是遍歷捕獲的快照中的ThreadLocal99????????for?(Map.Entry<ThreadLocal<Object>,?Object>?entry?:?captured.entrySet())?{ 100????????????final?ThreadLocal<Object>?threadLocal?=?entry.getKey(); 101????????????//?添加到備份中 102????????????backup.put(threadLocal,?threadLocal.get()); 103????????????final?Object?value?=?entry.getValue(); 104????????????//?如果值為清除標記則綁定在當前線程的變量進行remove,否則設置值覆蓋 105????????????if?(value?==?threadLocalClearMark)?threadLocal.remove(); 106????????????else?threadLocal.set(value); 107????????} 108????????return?backup; 109????} 110 111????//?從relay()或者clear()方法中恢復TransmittableThreadLocal和手工注冊的ThreadLocal的值對應的備份 112????//?筆者注:恢復操作一般會在子線程或者線程池中的線程的任務執行的時候調用 113????public?static?void?restore(@NonNull?Object?backup)?{ 114????????final?Snapshot?backupSnapshot?=?(Snapshot)?backup; 115????????restoreTtlValues(backupSnapshot.ttl2Value); 116????????restoreThreadLocalValues(backupSnapshot.threadLocal2Value); 117????} 118 119????private?static?void?restoreTtlValues(@NonNull?WeakHashMap<TransmittableThreadLocal<Object>,?Object>?backup)?{ 120????????//?回調模板方法afterExecute 121????????doExecuteCallback(false); 122????????//?這里的循環針對的是子線程,用于獲取的是子線程的所有線程本地變量 123????????for?(final?Iterator<TransmittableThreadLocal<Object>>?iterator?=?holder.get().keySet().iterator();?iterator.hasNext();?)?{ 124????????????TransmittableThreadLocal<Object>?threadLocal?=?iterator.next(); 125????????????//?如果子線程原來就綁定的線程本地變量的值,如果不包含某個父線程傳來的對象,那么就刪除 126????????????//?這一步可以結合前面reply操作里面的方法段一起思考,如果不刪除的話,就相當于子線程的原來存在的線程本地變量綁定值被父線程對應的值污染了 127????????????if?(!backup.containsKey(threadLocal))?{ 128????????????????iterator.remove(); 129????????????????threadLocal.superRemove(); 130????????????} 131????????} 132 133????????//?重新設置TTL的值到捕獲的快照中 134????????//?其實真實的意圖是:把子線程的線程本地變量恢復到reply()的備份(前面的循環已經做了父線程捕獲變量的判斷),本質上,等于把holder中綁定于子線程本地變量的部分恢復到reply操作之前的狀態 135????????setTtlValuesTo(backup); 136????} 137 138????//?恢復所有的手動注冊的ThreadLocal的值 139????private?static?void?restoreThreadLocalValues(@NonNull?WeakHashMap<ThreadLocal<Object>,?Object>?backup)?{ 140????????for?(Map.Entry<ThreadLocal<Object>,?Object>?entry?:?backup.entrySet())?{ 141????????????final?ThreadLocal<Object>?threadLocal?=?entry.getKey(); 142????????????threadLocal.set(entry.getValue()); 143????????} 144????} 145}???這里三個核心方法,看起來比較抽象,要結合多線程的場景和一些空間想象進行推敲才能比較容易地理解:
capture():捕獲操作,父線程原來就存在的線程本地變量映射和手動注冊的線程本地變量映射捕獲,得到捕獲的快照值captured。
reply():重放操作,子線程原來就存在的線程本地變量映射和手動注冊的線程本地變量生成備份backup,刷新captured的所有值到子線程在全局存儲器holder中綁定的值。
restore():復原操作,子線程原來就存在的線程本地變量映射和手動注冊的線程本地變量恢復成backup。
setTtlValuesTo()這個方法比較隱蔽,要特別要結合多線程和空間思維去思考,例如當入參是captured,本質是從父線程捕獲到的綁定在父線程的所有線程本地變量,調用的時機在reply()和restore(),這兩個方法只會在子線程中調用,setTtlValuesTo()里面拿到的TransmittableThreadLocal實例調用set()方法相當于把綁定在父線程的所有線程本地變量的值全部刷新到子線程當前綁定的TTL中的線程本地變量的值,更深層次地想,是基于外部的傳入值刷新了子線程綁定在全局存儲器holder里面綁定到該子線程的線程本地變量的值。
Transmitter還有不少靜態工具方法,這里不做展開,可以參考項目里面的測試demo和README.md進行調試。
捕獲、重放和復原
其實上面一節已經介紹了Transmitter提供的捕獲、重放和復原的API,這一節主要結合分析TtlRunnable中的相關邏輯。TtlRunnable的源碼如下:
1public?final?class?TtlRunnable?implements?Runnable,?TtlWrapper<Runnable>,?TtlEnhanced,?TtlAttachments?{23????//?存放從父線程捕獲得到的線程本地變量映射的備份4????private?final?AtomicReference<Object>?capturedRef;5????//?原始的Runable實例6????private?final?Runnable?runnable;7????//?執行之后是否釋放TTL值引用8????private?final?boolean?releaseTtlValueReferenceAfterRun;9 10????private?TtlRunnable(@NonNull?Runnable?runnable,?boolean?releaseTtlValueReferenceAfterRun)?{ 11????????//?這里關鍵點:TtlRunnable實例化的時候就已經進行了線程本地變量的捕獲,所以一定是針對父線程的,因為此時任務還沒提交到線程池 12????????this.capturedRef?=?new?AtomicReference<Object>(capture()); 13????????this.runnable?=?runnable; 14????????this.releaseTtlValueReferenceAfterRun?=?releaseTtlValueReferenceAfterRun; 15????} 16 17????@Override 18????public?void?run()?{ 19????????//?獲取父線程捕獲到的線程本地變量映射的備份,做一些前置判斷 20????????Object?captured?=?capturedRef.get(); 21????????if?(captured?==?null?||?releaseTtlValueReferenceAfterRun?&&?!capturedRef.compareAndSet(captured,?null))?{ 22????????????throw?new?IllegalStateException("TTL?value?reference?is?released?after?run!"); 23????????} 24????????//?重放操作 25????????Object?backup?=?replay(captured); 26????????try?{ 27????????????//?真正的Runnable調用 28????????????runnable.run(); 29????????}?finally?{ 30????????????//?復原操作 31????????????restore(backup); 32????????} 33????} 34 35????@Nullable 36????public?static?TtlRunnable?get(@Nullable?Runnable?runnable)?{ 37????????return?get(runnable,?false,?false); 38????} 39 40????@Nullable 41????public?static?TtlRunnable?get(@Nullable?Runnable?runnable,?boolean?releaseTtlValueReferenceAfterRun,?boolean?idempotent)?{ 42????????if?(null?==?runnable)?return?null; 43????????if?(runnable?instanceof?TtlEnhanced)?{ 44????????????//?avoid?redundant?decoration,?and?ensure?idempotency 45????????????if?(idempotent)?return?(TtlRunnable)?runnable; 46????????????else?throw?new?IllegalStateException("Already?TtlRunnable!"); 47????????} 48????????return?new?TtlRunnable(runnable,?releaseTtlValueReferenceAfterRun); 49????} 50 51????//?省略其他不太重要的方法 52}其實關注點只需要放在構造函數、run()方法,其他都是基于此做修飾或者擴展。構造函數的源碼說明,capture()在TtlRunnable實例化的時候已經被調用,實例化它的一般就是父線程,所以整體的執行流程如下:
Agent模塊
啟用Agent功能,需要在Java的啟動參數添加:-javaagent:path/to/transmittable-thread-local-x.yzx.jar。原理是通過Instrumentation回調激發ClassFileTransformer實現目標類的字節碼增強,使用到javassist,被增強的類主要是泛線程池的類:
Executor體系:主要包括ThreadPoolExecutor和ScheduledThreadPoolExecutor,對應的字節碼增強類實現是TtlExecutorTransformlet。
ForkJoinPool:對應的字節碼增強類實現是TtlForkJoinTransformlet。
TimerTask:對應的字節碼增強類實現是TtlTimerTaskTransformlet。
Agent的入口類是TtlAgent,這里查看對應的源碼:
1public?final?class?TtlAgent?{23????public?static?void?premain(String?agentArgs,?@NonNull?Instrumentation?inst)?{4????????kvs?=?splitCommaColonStringToKV(agentArgs);56????????Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));7????????final?Logger?logger?=?Logger.getLogger(TtlAgent.class);89????????try?{ 10????????????logger.info("[TtlAgent.premain]?begin,?agentArgs:?"?+?agentArgs?+?",?Instrumentation:?"?+?inst); 11????????????final?boolean?disableInheritableForThreadPool?=?isDisableInheritableForThreadPool(); 12????????????//?裝載所有的JavassistTransformlet 13????????????final?List<JavassistTransformlet>?transformletList?=?new?ArrayList<JavassistTransformlet>(); 14????????????transformletList.add(new?TtlExecutorTransformlet(disableInheritableForThreadPool)); 15????????????transformletList.add(new?TtlForkJoinTransformlet(disableInheritableForThreadPool)); 16????????????if?(isEnableTimerTask())?transformletList.add(new?TtlTimerTaskTransformlet()); 17????????????final?ClassFileTransformer?transformer?=?new?TtlTransformer(transformletList); 18????????????inst.addTransformer(transformer,?true); 19????????????logger.info("[TtlAgent.premain]?addTransformer?"?+?transformer.getClass()?+?"?success"); 20????????????logger.info("[TtlAgent.premain]?end"); 21????????????ttlAgentLoaded?=?true; 22????????}?catch?(Exception?e)?{ 23????????????String?msg?=?"Fail?to?load?TtlAgent?,?cause:?"?+?e.toString(); 24????????????logger.log(Level.SEVERE,?msg,?e); 25????????????throw?new?IllegalStateException(msg,?e); 26????????} 27????} 28}List<JavassistTransformlet>作為參數傳入ClassFileTransformer的實現類TtlTransformer中,其中的轉換方法為:
1public?class?TtlTransformer?implements?ClassFileTransformer?{23????private?final?List<JavassistTransformlet>?transformletList?=?new?ArrayList<JavassistTransformlet>();45????TtlTransformer(List<??extends?JavassistTransformlet>?transformletList)?{6????????for?(JavassistTransformlet?transformlet?:?transformletList)?{7????????????this.transformletList.add(transformlet);8????????????logger.info("[TtlTransformer]?add?Transformlet?"?+?transformlet.getClass()?+?"?success");9????????} 10????} 11 12????@Override 13????public?final?byte[]?transform(@Nullable?final?ClassLoader?loader,?@Nullable?final?String?classFile,?final?Class<?>?classBeingRedefined, 14??????????????????????????????????final?ProtectionDomain?protectionDomain,?@NonNull?final?byte[]?classFileBuffer)?{ 15????????try?{ 16????????????//?Lambda?has?no?class?file,?no?need?to?transform,?just?return. 17????????????if?(classFile?==?null)?return?NO_TRANSFORM; 18????????????final?String?className?=?toClassName(classFile); 19????????????ClassInfo?classInfo?=?new?ClassInfo(className,?classFileBuffer,?loader); 20????????????//?這里做變量,如果字節碼被修改,則跳出循環返回 21????????????for?(JavassistTransformlet?transformlet?:?transformletList)?{ 22????????????????transformlet.doTransform(classInfo); 23????????????????if?(classInfo.isModified())?return?classInfo.getCtClass().toBytecode(); 24????????????} 25????????}?catch?(Throwable?t)?{ 26????????????String?msg?=?"Fail?to?transform?class?"?+?classFile?+?",?cause:?"?+?t.toString(); 27????????????logger.log(Level.SEVERE,?msg,?t); 28????????????throw?new?IllegalStateException(msg,?t); 29????????} 30????????return?NO_TRANSFORM; 31????} 32}這里挑選TtlExecutorTransformlet的部分方法來看:
1????@Override2????public?void?doTransform(@NonNull?final?ClassInfo?classInfo)?throws?IOException,?NotFoundException,?CannotCompileException?{3????????//?如果當前加載的類包含java.util.concurrent.ThreadPoolExecutor或者java.util.concurrent.ScheduledThreadPoolExecutor4????????if?(EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName()))?{5????????????final?CtClass?clazz?=?classInfo.getCtClass();6????????????//?遍歷所有的方法進行增強7????????????for?(CtMethod?method?:?clazz.getDeclaredMethods())?{8????????????????updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method);9????????????} 10????????????//?省略其他代碼 11????????}? 12????????//?省略其他代碼 13????} 14 15????private?void?updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull?final?CtMethod?method)?throws?NotFoundException,?CannotCompileException?{ 16????????final?int?modifiers?=?method.getModifiers(); 17????????if?(!Modifier.isPublic(modifiers)?||?Modifier.isStatic(modifiers))?return; 18????????//?這里主要在java.lang.Runnable構造時候調用com.alibaba.ttl.TtlRunnable#get()包裝為com.alibaba.ttl.TtlRunnable 19????????//?在java.util.concurrent.Callable構造時候調用com.alibaba.ttl.TtlCallable#get()包裝為com.alibaba.ttl.TtlCallable 20????????//?并且設置附件K-V為ttl.is.auto.wrapper=true 21????????CtClass[]?parameterTypes?=?method.getParameterTypes(); 22????????StringBuilder?insertCode?=?new?StringBuilder(); 23????????for?(int?i?=?0;?i?<?parameterTypes.length;?i++)?{ 24????????????final?String?paramTypeName?=?parameterTypes[i].getName(); 25????????????if?(PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName))?{ 26????????????????String?code?=?String.format( 27????????????????????????//?decorate?to?TTL?wrapper, 28????????????????????????//?and?then?set?AutoWrapper?attachment/Tag 29????????????????????????"$%d?=?%s.get($%d,?false,?true);" 30????????????????????????????????+?"\ncom.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.setAutoWrapperAttachment($%<d);", 31????????????????????????i?+?1,?PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName),?i?+?1); 32????????????????logger.info("insert?code?before?method?"?+?signatureOfMethod(method)?+?"?of?class?"?+?method.getDeclaringClass().getName()?+?":?"?+?code); 33????????????????insertCode.append(code); 34????????????} 35????????} 36????????if?(insertCode.length()?>?0)?method.insertBefore(insertCode.toString()); 37????}上面分析的方法的功能,就是讓java.util.concurrent.ThreadPoolExecutor和java.util.concurrent.ScheduledThreadPoolExecutor的字節碼被增強,提交的java.lang.Runnable類型的任務會被包裝為TtlRunnable,提交的java.util.concurrent.Callable類型的任務會被包裝為TtlCallable,實現了無入侵無感知地嵌入TTL的功能。
小結
TTL在使用線程池等會池化復用線程的執行組件情況下,提供ThreadLocal值的傳遞功能,解決異步執行時上下文傳遞的問題。它是一個Java標準庫,為框架/中間件設施開發提供的標配能力,項目代碼精悍,只依賴了javassist做字節碼增強,實現Agent模式下的近乎無入侵提供TTL功能的特性。
TTL能在業務代碼中實現透明/自動完成所有異步執行上下文的可定制、規范化的捕捉/傳遞,如果恰好碰到異步執行時上下文傳遞的問題,建議可以嘗試此庫。
參考資料:
JDK11相關源碼
TTL源碼
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的通过transmittable-thread-local源码理解线程池线程本地变量传递的原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hdu 1233 还是畅通工程 最
- 下一篇: hdu 1879 继续畅通工程 最小