并发设计模式
1、Immutability模式:如何利用不變性解決并發(fā)問(wèn)題?
“多個(gè)線程同時(shí)讀寫(xiě)同一共享變量存在并發(fā)問(wèn)題”,這里的必要條件之一是讀寫(xiě),如果只有讀,而沒(méi)有寫(xiě),是沒(méi)有并發(fā)問(wèn)題的。
解決并發(fā)問(wèn)題,其實(shí)最簡(jiǎn)單的辦法就是讓共享變量只有讀操作,而沒(méi)有寫(xiě)操作。這個(gè)辦法如此重要,以至于被上升到了一種解決并發(fā)問(wèn)題的設(shè)計(jì)模式:不變性(Immutability)模式。所謂不變性,簡(jiǎn)單來(lái)講,就是對(duì)象一旦被創(chuàng)建之后,狀態(tài)就不再發(fā)生變化。換句話(huà)說(shuō),就是變量一旦被賦值,就不允許修改了(沒(méi)有寫(xiě)操作);沒(méi)有修改操作,也就是保持了不變性。
(1)快速實(shí)現(xiàn)具備不可變性的類(lèi)
實(shí)現(xiàn)一個(gè)具備不可變性的類(lèi),還是挺簡(jiǎn)單的。將一個(gè)類(lèi)所有的屬性都設(shè)置成 final 的,并且只允許存在只讀方法,那么這個(gè)類(lèi)基本上就具備不可變性了。更嚴(yán)格的做法是這個(gè)類(lèi)本身也是 final 的,也就是不允許繼承。因?yàn)樽宇?lèi)可以覆蓋父類(lèi)的方法,有可能改變不可變性,所以推薦你在實(shí)際工作中,使用這種更嚴(yán)格的做法。
Java SDK 里很多類(lèi)都具備不可變性,只是由于它們的使用太簡(jiǎn)單,最后反而被忽略了。例如經(jīng)常用到的 String 和 Long、Integer、Double 等基礎(chǔ)類(lèi)型的包裝類(lèi)都具備不可變性,這些對(duì)象的線程安全性都是靠不可變性來(lái)保證的。如果你仔細(xì)翻看這些類(lèi)的聲明、屬性和方法,你會(huì)發(fā)現(xiàn)它們都嚴(yán)格遵守不可變類(lèi)的三點(diǎn)要求:類(lèi)和屬性都是 final 的,所有方法均是只讀的。
看到這里你可能會(huì)疑惑,Java 的 String 方法也有類(lèi)似字符替換操作,怎么能說(shuō)所有方法都是只讀的呢?我們結(jié)合 String 的源代碼來(lái)解釋一下這個(gè)問(wèn)題,下面的示例代碼源自 Java 1.8 SDK,我略做了修改,僅保留了關(guān)鍵屬性 value[]和 replace() 方法,你會(huì)發(fā)現(xiàn):String 這個(gè)類(lèi)以及它的屬性 value[]都是 final 的;而 replace() 方法的實(shí)現(xiàn),就的確沒(méi)有修改 value[],而是將替換后的字符串作為返回值返回了。
public final class String {private final char value[];// 字符替換String replace(char oldChar, char newChar) {//無(wú)需替換,直接返回this if (oldChar == newChar){return this;}int len = value.length;int i = -1;/* avoid getfield opcode */char[] val = value; //定位到需要替換的字符位置while (++i < len) {if (val[i] == oldChar) {break;}}//未找到oldChar,無(wú)需替換if (i >= len) {return this;} //創(chuàng)建一個(gè)buf[],這是關(guān)鍵//用來(lái)保存替換后的字符串char buf[] = new char[len];for (int j = 0; j < i; j++) {buf[j] = val[j];}while (i < len) {char c = val[i];buf[i] = (c == oldChar) ? newChar : c;i++;}//創(chuàng)建一個(gè)新的字符串返回//原字符串不會(huì)發(fā)生任何變化return new String(buf, true);} }通過(guò)分析 String 的實(shí)現(xiàn),你可能已經(jīng)發(fā)現(xiàn)了,如果具備不可變性的類(lèi),需要提供類(lèi)似修改的功能,具體該怎么操作呢?做法很簡(jiǎn)單,那就是創(chuàng)建一個(gè)新的不可變對(duì)象,這是與可變對(duì)象的一個(gè)重要區(qū)別,可變對(duì)象往往是修改自己的屬性。
所有的修改操作都創(chuàng)建一個(gè)新的不可變對(duì)象,你可能會(huì)有這種擔(dān)心:是不是創(chuàng)建的對(duì)象太多了,有點(diǎn)太浪費(fèi)內(nèi)存呢?是的,這樣做的確有些浪費(fèi),那如何解決呢?
(2)利用享元模式避免創(chuàng)建重復(fù)對(duì)象
如果你熟悉面向?qū)ο笙嚓P(guān)的設(shè)計(jì)模式,相信你一定能想到享元模式(Flyweight Pattern)。利用享元模式可以減少創(chuàng)建對(duì)象的數(shù)量,從而減少內(nèi)存占用。Java 語(yǔ)言里面 Long、Integer、Short、Byte 等這些基本數(shù)據(jù)類(lèi)型的包裝類(lèi)都用到了享元模式。
如果你熟悉面向?qū)ο笙嚓P(guān)的設(shè)計(jì)模式,相信你一定能想到享元模式(Flyweight Pattern)。利用享元模式可以減少創(chuàng)建對(duì)象的數(shù)量,從而減少內(nèi)存占用。Java 語(yǔ)言里面 Long、Integer、Short、Byte 等這些基本數(shù)據(jù)類(lèi)型的包裝類(lèi)都用到了享元模式。
下面我們就以 Long 這個(gè)類(lèi)作為例子,看看它是如何利用享元模式來(lái)優(yōu)化對(duì)象的創(chuàng)建的。
享元模式本質(zhì)上其實(shí)就是一個(gè)對(duì)象池,利用享元模式創(chuàng)建對(duì)象的邏輯也很簡(jiǎn)單:創(chuàng)建之前,首先去對(duì)象池里看看是不是存在;如果已經(jīng)存在,就利用對(duì)象池里的對(duì)象;如果不存在,就會(huì)新創(chuàng)建一個(gè)對(duì)象,并且把這個(gè)新創(chuàng)建出來(lái)的對(duì)象放進(jìn)對(duì)象池里。
Long 這個(gè)類(lèi)并沒(méi)有照搬享元模式,Long 內(nèi)部維護(hù)了一個(gè)靜態(tài)的對(duì)象池,僅緩存了[-128,127]之間的數(shù)字,這個(gè)對(duì)象池在 JVM 啟動(dòng)的時(shí)候就創(chuàng)建好了,而且這個(gè)對(duì)象池一直都不會(huì)變化,也就是說(shuō)它是靜態(tài)的。之所以采用這樣的設(shè)計(jì),是因?yàn)?Long 這個(gè)對(duì)象的狀態(tài)共有 264 種,實(shí)在太多,不宜全部緩存,而[-128,127]之間的數(shù)字利用率最高。下面的示例代碼出自 Java 1.8,valueOf() 方法就用到了 LongCache 這個(gè)緩存,你可以結(jié)合著來(lái)加深理解。
Long valueOf(long l) {final int offset = 128;// [-128,127]直接的數(shù)字做了緩存if (l >= -128 && l <= 127) { return LongCache.cache[(int)l + offset];}return new Long(l); } //緩存,等價(jià)于對(duì)象池 //僅緩存[-128,127]直接的數(shù)字 static class LongCache {static final Long cache[] = new Long[-(-128) + 127 + 1];static {for(int i=0; i<cache.length; i++)cache[i] = new Long(i-128);} }“Integer 和 String 類(lèi)型的對(duì)象不適合做鎖”,其實(shí)基本上所有的基礎(chǔ)類(lèi)型的包裝類(lèi)都不適合做鎖,因?yàn)樗鼈儍?nèi)部用到了享元模式,這會(huì)導(dǎo)致看上去私有的鎖,其實(shí)是共有的。例如在下面代碼中,本意是 A 用鎖 al,B 用鎖 bl,各自管理各自的,互不影響。但實(shí)際上 al 和 bl 是一個(gè)對(duì)象,結(jié)果 A 和 B 共用的是一把鎖。
class A {Long al=Long.valueOf(1);public void setAX(){synchronized (al) {//省略代碼無(wú)數(shù)}} } class B {Long bl=Long.valueOf(1);public void setBY(){synchronized (bl) {//省略代碼無(wú)數(shù)}} }(2)使用 Immutability 模式的注意事項(xiàng)
在使用 Immutability 模式的時(shí)候,需要注意以下兩點(diǎn):
① 對(duì)象的所有屬性都是 final 的,并不能保證不可變性;
② 不可變對(duì)象也需要正確發(fā)布。
在 Java 語(yǔ)言中,final 修飾的屬性一旦被賦值,就不可以再修改,但是如果屬性的類(lèi)型是普通對(duì)象,那么這個(gè)普通對(duì)象的屬性是可以被修改的。例如下面的代碼中,Bar 的屬性 foo 雖然是 final 的,依然可以通過(guò) setAge() 方法來(lái)設(shè)置 foo 的屬性 age。所以,在使用 Immutability 模式的時(shí)候一定要確認(rèn)保持不變性的邊界在哪里,是否要求屬性對(duì)象也具備不可變性。
class Foo{int age=0;int name="abc"; } final class Bar {final Foo foo;void setAge(int a){foo.age=a;} }下面我們?cè)倏纯慈绾握_地發(fā)布不可變對(duì)象。不可變對(duì)象雖然是線程安全的,但是并不意味著引用這些不可變對(duì)象的對(duì)象就是線程安全的。例如在下面的代碼中,Foo 具備不可變性,線程安全,但是類(lèi) Bar 并不是線程安全的,類(lèi) Bar 中持有對(duì) Foo 的引用 foo,對(duì) foo 這個(gè)引用的修改在多線程中并不能保證可見(jiàn)性和原子性。
//Foo線程安全 final class Foo{final int age=0;final int name="abc"; } //Bar線程不安全 class Bar {Foo foo;void setFoo(Foo f){this.foo=f;} }如果你的程序僅僅需要 foo 保持可見(jiàn)性,無(wú)需保證原子性,那么可以將 foo 聲明為 volatile 變量,這樣就能保證可見(jiàn)性。如果你的程序需要保證原子性,那么可以通過(guò)原子類(lèi)來(lái)實(shí)現(xiàn)。下面的示例代碼是合理庫(kù)存的原子化實(shí)現(xiàn),你應(yīng)該很熟悉了,其中就是用原子類(lèi)解決了不可變對(duì)象引用的原子性問(wèn)題。
public class SafeWM {class WMRange{final int upper;final int lower;WMRange(int upper,int lower){//省略構(gòu)造函數(shù)實(shí)現(xiàn)}}final AtomicReference<WMRange>rf = new AtomicReference<>(new WMRange(0,0));// 設(shè)置庫(kù)存上限void setUpper(int v){while(true){WMRange or = rf.get();// 檢查參數(shù)合法性if(v < or.lower){throw new IllegalArgumentException();}WMRange nr = newWMRange(v, or.lower);if(rf.compareAndSet(or, nr)){return;}}} }利用 Immutability 模式解決并發(fā)問(wèn)題,也許你覺(jué)得有點(diǎn)陌生,其實(shí)你天天都在享受它的戰(zhàn)果。Java 語(yǔ)言里面的 String 和 Long、Integer、Double 等基礎(chǔ)類(lèi)型的包裝類(lèi)都具備不可變性,這些對(duì)象的線程安全性都是靠不可變性來(lái)保證的。Immutability 模式是最簡(jiǎn)單的解決并發(fā)問(wèn)題的方法,建議當(dāng)你試圖解決一個(gè)并發(fā)問(wèn)題時(shí),可以首先嘗試一下 Immutability 模式,看是否能夠快速解決。
具備不變性的對(duì)象,只有一種狀態(tài),這個(gè)狀態(tài)由對(duì)象內(nèi)部所有的不變屬性共同決定。其實(shí)還有一種更簡(jiǎn)單的不變性對(duì)象,那就是無(wú)狀態(tài)。無(wú)狀態(tài)對(duì)象內(nèi)部沒(méi)有屬性,只有方法。除了無(wú)狀態(tài)的對(duì)象,你可能還聽(tīng)說(shuō)過(guò)無(wú)狀態(tài)的服務(wù)、無(wú)狀態(tài)的協(xié)議等等。無(wú)狀態(tài)有很多好處,最核心的一點(diǎn)就是性能。在多線程領(lǐng)域,無(wú)狀態(tài)對(duì)象沒(méi)有線程安全問(wèn)題,無(wú)需同步處理,自然性能很好;在分布式領(lǐng)域,無(wú)狀態(tài)意味著可以無(wú)限地水平擴(kuò)展,所以分布式領(lǐng)域里面性能的瓶頸一定不是出在無(wú)狀態(tài)的服務(wù)節(jié)點(diǎn)上。
2、Copy-on-Write模式:不是延時(shí)策略的COW
Java 里 String 這個(gè)類(lèi)在實(shí)現(xiàn) replace() 方法的時(shí)候,并沒(méi)有更改原字符串里面 value[]數(shù)組的內(nèi)容,而是創(chuàng)建了一個(gè)新字符串,這種方法在解決不可變對(duì)象的修改問(wèn)題時(shí)經(jīng)常用到。如果你深入地思考這個(gè)方法,你會(huì)發(fā)現(xiàn)它本質(zhì)上是一種 Copy-on-Write 方法。所謂 Copy-on-Write,經(jīng)常被縮寫(xiě)為 COW 或者 CoW,顧名思義就是寫(xiě)時(shí)復(fù)制。
不可變對(duì)象的寫(xiě)操作往往都是使用 Copy-on-Write 方法解決的,當(dāng)然 Copy-on-Write 的應(yīng)用領(lǐng)域并不局限于 Immutability 模式。下面我們先簡(jiǎn)單介紹一下 Copy-on-Write 的應(yīng)用領(lǐng)域,讓你對(duì)它有個(gè)更全面的認(rèn)識(shí)。
(1)Copy-on-Write 模式的應(yīng)用領(lǐng)域
CopyOnWriteArrayList 和 CopyOnWriteArraySet 這兩個(gè) Copy-on-Write 容器,它們背后的設(shè)計(jì)思想就是 Copy-on-Write;通過(guò) Copy-on-Write 這兩個(gè)容器實(shí)現(xiàn)的讀操作是無(wú)鎖的,由于無(wú)鎖,所以將讀操作的性能發(fā)揮到了極致。
除了 Java 這個(gè)領(lǐng)域,Copy-on-Write 在操作系統(tǒng)領(lǐng)域也有廣泛的應(yīng)用。
我第一次接觸 Copy-on-Write 其實(shí)就是在操作系統(tǒng)領(lǐng)域。類(lèi) Unix 的操作系統(tǒng)中創(chuàng)建進(jìn)程的 API 是 fork(),傳統(tǒng)的 fork() 函數(shù)會(huì)創(chuàng)建父進(jìn)程的一個(gè)完整副本,例如父進(jìn)程的地址空間現(xiàn)在用到了 1G 的內(nèi)存,那么 fork() 子進(jìn)程的時(shí)候要復(fù)制父進(jìn)程整個(gè)進(jìn)程的地址空間(占有 1G 內(nèi)存)給子進(jìn)程,這個(gè)過(guò)程是很耗時(shí)的。而 Linux 中的 fork() 函數(shù)就聰明得多了,fork() 子進(jìn)程的時(shí)候,并不復(fù)制整個(gè)進(jìn)程的地址空間,而是讓父子進(jìn)程共享同一個(gè)地址空間;只用在父進(jìn)程或者子進(jìn)程需要寫(xiě)入的時(shí)候才會(huì)復(fù)制地址空間,從而使父子進(jìn)程擁有各自的地址空間。
本質(zhì)上來(lái)講,父子進(jìn)程的地址空間以及數(shù)據(jù)都是要隔離的,使用 Copy-on-Write 更多地體現(xiàn)的是一種延時(shí)策略,只有在真正需要復(fù)制的時(shí)候才復(fù)制,而不是提前復(fù)制好,同時(shí) Copy-on-Write 還支持按需復(fù)制,所以 Copy-on-Write 在操作系統(tǒng)領(lǐng)域是能夠提升性能的。相比較而言,Java 提供的 Copy-on-Write 容器,由于在修改的同時(shí)會(huì)復(fù)制整個(gè)容器,所以在提升讀操作性能的同時(shí),是以?xún)?nèi)存復(fù)制為代價(jià)的。這里你會(huì)發(fā)現(xiàn),同樣是應(yīng)用 Copy-on-Write,不同的場(chǎng)景,對(duì)性能的影響是不同的。
在操作系統(tǒng)領(lǐng)域,除了創(chuàng)建進(jìn)程用到了 Copy-on-Write,很多文件系統(tǒng)也同樣用到了,例如 Btrfs (B-Tree File System)、aufs(advanced multi-layered unification filesystem)等。
除了上面我們說(shuō)的 Java 領(lǐng)域、操作系統(tǒng)領(lǐng)域,很多其他領(lǐng)域也都能看到 Copy-on-Write 的身影:Docker 容器鏡像的設(shè)計(jì)是 Copy-on-Write,甚至分布式源碼管理系統(tǒng) Git 背后的設(shè)計(jì)思想都有 Copy-on-Write……
不過(guò),Copy-on-Write 最大的應(yīng)用領(lǐng)域還是在函數(shù)式編程領(lǐng)域。函數(shù)式編程的基礎(chǔ)是不可變性(Immutability),所以函數(shù)式編程里面所有的修改操作都需要 Copy-on-Write 來(lái)解決。你或許會(huì)有疑問(wèn),“所有數(shù)據(jù)的修改都需要復(fù)制一份,性能是不是會(huì)成為瓶頸呢?”你的擔(dān)憂(yōu)是有道理的,之所以函數(shù)式編程早年間沒(méi)有興起,性能絕對(duì)拖了后腿。但是隨著硬件性能的提升,性能問(wèn)題已經(jīng)慢慢變得可以接受了。而且,Copy-on-Write 也遠(yuǎn)不像 Java 里的 CopyOnWriteArrayList 那樣笨:整個(gè)數(shù)組都復(fù)制一遍。Copy-on-Write 也是可以按需復(fù)制的,如果你感興趣可以參考Purely Functional Data Structures這本書(shū),里面描述了各種具備不變性的數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)。
CopyOnWriteArrayList 和 CopyOnWriteArraySet 這兩個(gè) Copy-on-Write 容器在修改的時(shí)候會(huì)復(fù)制整個(gè)數(shù)組,所以如果容器經(jīng)常被修改或者這個(gè)數(shù)組本身就非常大的時(shí)候,是不建議使用的。反之,如果是修改非常少、數(shù)組數(shù)量也不大,并且對(duì)讀性能要求苛刻的場(chǎng)景,使用 Copy-on-Write 容器效果就非常好了。下面我們結(jié)合一個(gè)真實(shí)的案例來(lái)講解一下。
(2)案例
一個(gè) RPC 框架,有點(diǎn)類(lèi)似 Dubbo,服務(wù)提供方是多實(shí)例分布式部署的,所以服務(wù)的客戶(hù)端在調(diào)用 RPC 的時(shí)候,會(huì)選定一個(gè)服務(wù)實(shí)例來(lái)調(diào)用,這個(gè)選定的過(guò)程本質(zhì)上就是在做負(fù)載均衡,而做負(fù)載均衡的前提是客戶(hù)端要有全部的路由信息。例如在下圖中,A 服務(wù)的提供方有 3 個(gè)實(shí)例,分別是 192.168.1.1、192.168.1.2 和 192.168.1.3,客戶(hù)端在調(diào)用目標(biāo)服務(wù) A 前,首先需要做的是負(fù)載均衡,也就是從這 3 個(gè)實(shí)例中選出 1 個(gè)來(lái),然后再通過(guò) RPC 把請(qǐng)求發(fā)送選中的目標(biāo)實(shí)例。
RPC 框架的一個(gè)核心任務(wù)就是維護(hù)服務(wù)的路由關(guān)系,我們可以把服務(wù)的路由關(guān)系簡(jiǎn)化成下圖所示的路由表。當(dāng)服務(wù)提供方上線或者下線的時(shí)候,就需要更新客戶(hù)端的這張路由表。
我們首先來(lái)分析一下如何用程序來(lái)實(shí)現(xiàn)。每次 RPC 調(diào)用都需要通過(guò)負(fù)載均衡器來(lái)計(jì)算目標(biāo)服務(wù)的 IP 和端口號(hào),而負(fù)載均衡器需要通過(guò)路由表獲取接口的所有路由信息,也就是說(shuō),每次 RPC 調(diào)用都需要訪問(wèn)路由表,所以訪問(wèn)路由表這個(gè)操作的性能要求是很高的。不過(guò)路由表對(duì)數(shù)據(jù)的一致性要求并不高,一個(gè)服務(wù)提供方從上線到反饋到客戶(hù)端的路由表里,即便有 5 秒鐘,很多時(shí)候也都是能接受的(5 秒鐘,對(duì)于以納秒作為時(shí)鐘周期的 CPU 來(lái)說(shuō),那何止是一萬(wàn)年,所以路由表對(duì)一致性的要求并不高)。而且路由表是典型的讀多寫(xiě)少類(lèi)問(wèn)題,寫(xiě)操作的量相比于讀操作,可謂是滄海一粟,少得可憐。
通過(guò)以上分析,你會(huì)發(fā)現(xiàn)一些關(guān)鍵詞:對(duì)讀的性能要求很高,讀多寫(xiě)少,弱一致性。它們綜合在一起,你會(huì)想到什么呢?CopyOnWriteArrayList 和 CopyOnWriteArraySet 天生就適用這種場(chǎng)景啊。所以下面的示例代碼中,RouteTable 這個(gè)類(lèi)內(nèi)部我們通過(guò)ConcurrentHashMap<string, copyonwritearrayset>這個(gè)數(shù)據(jù)結(jié)構(gòu)來(lái)描述路由表,ConcurrentHashMap 的 Key 是接口名,Value 是路由集合,這個(gè)路由集合我們用是 CopyOnWriteArraySet。
下面我們?cè)賮?lái)思考 Router 該如何設(shè)計(jì),服務(wù)提供方的每一次上線、下線都會(huì)更新路由信息,這時(shí)候你有兩種選擇。一種是通過(guò)更新 Router 的一個(gè)狀態(tài)位來(lái)標(biāo)識(shí),如果這樣做,那么所有訪問(wèn)該狀態(tài)位的地方都需要同步訪問(wèn),這樣很影響性能。另外一種就是采用 Immutability 模式,每次上線、下線都創(chuàng)建新的 Router 對(duì)象或者刪除對(duì)應(yīng)的 Router 對(duì)象。由于上線、下線的頻率很低,所以后者是最好的選擇。
Router 的實(shí)現(xiàn)代碼如下所示,是一種典型 Immutability 模式的實(shí)現(xiàn),需要你注意的是我們重寫(xiě)了 equals 方法,這樣 CopyOnWriteArraySet 的 add() 和 remove() 方法才能正常工作。
//路由信息 public final class Router{private final String ip;private final Integer port;private final String iface;//構(gòu)造函數(shù)public Router(String ip, Integer port, String iface){this.ip = ip;this.port = port;this.iface = iface;}//重寫(xiě)equals方法public boolean equals(Object obj){if (obj instanceof Router) {Router r = (Router)obj;return iface.equals(r.iface) &&ip.equals(r.ip) &&port.equals(r.port);}return false;}public int hashCode() {//省略hashCode相關(guān)代碼} } //路由表信息 public class RouterTable {//Key:接口名//Value:路由集合ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> rt = new ConcurrentHashMap<>();//根據(jù)接口名獲取路由表public Set<Router> get(String iface){return rt.get(iface);}//刪除路由public void remove(Router router) {Set<Router> set=rt.get(router.iface);if (set != null) {set.remove(router);}}//增加路由public void add(Router router) {Set<Router> set = rt.computeIfAbsent(route.iface, r -> new CopyOnWriteArraySet<>());set.add(router);} }目前 Copy-on-Write 在 Java 并發(fā)編程領(lǐng)域知名度不是很高,很多人都在無(wú)意中把它忽視了,但其實(shí) Copy-on-Write 才是最簡(jiǎn)單的并發(fā)解決方案。它是如此簡(jiǎn)單,以至于 Java 中的基本數(shù)據(jù)類(lèi)型 String、Integer、Long 等都是基于 Copy-on-Write 方案實(shí)現(xiàn)的。
Copy-on-Write 是一項(xiàng)非常通用的技術(shù)方案,在很多領(lǐng)域都有著廣泛的應(yīng)用。不過(guò),它也有缺點(diǎn)的,那就是消耗內(nèi)存,每次修改都需要復(fù)制一個(gè)新的對(duì)象出來(lái),好在隨著自動(dòng)垃圾回收(GC)算法的成熟以及硬件的發(fā)展,這種內(nèi)存消耗已經(jīng)漸漸可以接受了。所以在實(shí)際工作中,如果寫(xiě)操作非常少,那你就可以嘗試用一下 Copy-on-Write,效果還是不錯(cuò)的。
3、線程本地存儲(chǔ)模式:沒(méi)有共享,就沒(méi)有傷害
解決并發(fā)問(wèn)題的一個(gè)重要方法:避免共享。
我們?cè)?jīng)一遍一遍又一遍地重復(fù),多個(gè)線程同時(shí)讀寫(xiě)同一共享變量存在并發(fā)問(wèn)題。前面兩篇文章我們突破的是寫(xiě),沒(méi)有寫(xiě)操作自然沒(méi)有并發(fā)問(wèn)題了。其實(shí)還可以突破共享變量,沒(méi)有共享變量也不會(huì)有并發(fā)問(wèn)題,正所謂是沒(méi)有共享,就沒(méi)有傷害。
那如何避免共享呢?思路其實(shí)很簡(jiǎn)單,多個(gè)人爭(zhēng)一個(gè)球總?cè)菀壮雒?#xff0c;那就每個(gè)人發(fā)一個(gè)球。對(duì)應(yīng)到并發(fā)編程領(lǐng)域,就是每個(gè)線程都擁有自己的變量,彼此之間不共享,也就沒(méi)有并發(fā)問(wèn)題了。
線程封閉,其本質(zhì)上就是避免共享。你已經(jīng)知道通過(guò)局部變量可以做到避免共享,那還有沒(méi)有其他方法可以做到呢?有的,Java 語(yǔ)言提供的線程本地存儲(chǔ)(ThreadLocal)就能夠做到。下面我們先看看 ThreadLocal 到底該如何使用。
(1)ThreadLocal 的使用方法
下面這個(gè)靜態(tài)類(lèi) ThreadId 會(huì)為每個(gè)線程分配一個(gè)唯一的線程 Id,如果一個(gè)線程前后兩次調(diào)用 ThreadId 的 get() 方法,兩次 get() 方法的返回值是相同的。但如果是兩個(gè)線程分別調(diào)用 ThreadId 的 get() 方法,那么兩個(gè)線程看到的 get() 方法的返回值是不同的。若你是初次接觸 ThreadLocal,可能會(huì)覺(jué)得奇怪,為什么相同線程調(diào)用 get() 方法結(jié)果就相同,而不同線程調(diào)用 get() 方法結(jié)果就不同呢?
static class ThreadId {static final AtomicLong nextId=new AtomicLong(0);//定義ThreadLocal變量static final ThreadLocal<Long> tl=ThreadLocal.withInitial(()->nextId.getAndIncrement());//此方法會(huì)為每個(gè)線程分配一個(gè)唯一的Idstatic long get(){return tl.get();} }能有這個(gè)奇怪的結(jié)果,都是 ThreadLocal 的杰作,不過(guò)在詳細(xì)解釋 ThreadLocal 的工作原理之前,我們?cè)倏匆粋€(gè)實(shí)際工作中可能遇到的例子來(lái)加深一下對(duì) ThreadLocal 的理解。你可能知道 SimpleDateFormat 不是線程安全的,那如果需要在并發(fā)場(chǎng)景下使用它,你該怎么辦呢?
其實(shí)有一個(gè)辦法就是用 ThreadLocal 來(lái)解決,下面的示例代碼就是 ThreadLocal 解決方案的具體實(shí)現(xiàn),這段代碼與前面 ThreadId 的代碼高度相似,同樣地,不同線程調(diào)用 SafeDateFormat 的 get() 方法將返回不同的 SimpleDateFormat 對(duì)象實(shí)例,由于不同線程并不共享 SimpleDateFormat,所以就像局部變量一樣,是線程安全的。
static class SafeDateFormat {//定義ThreadLocal變量static final ThreadLocal<DateFormat>tl=ThreadLocal.withInitial(()-> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));static DateFormat get(){return tl.get();} } //不同線程執(zhí)行下面代碼 //返回的df是不同的 DateFormat df =SafeDateFormat.get();通過(guò)上面兩個(gè)例子,相信你對(duì) ThreadLocal 的用法以及應(yīng)用場(chǎng)景都了解了,下面我們就來(lái)詳細(xì)解釋 ThreadLocal 的工作原理。
(2)ThreadLocal 的工作原理
在解釋 ThreadLocal 的工作原理之前, 你先自己想想:如果讓你來(lái)實(shí)現(xiàn) ThreadLocal 的功能,你會(huì)怎么設(shè)計(jì)呢?ThreadLocal 的目標(biāo)是讓不同的線程有不同的變量 V,那最直接的方法就是創(chuàng)建一個(gè) Map,它的 Key 是線程,Value 是每個(gè)線程擁有的變量 V,ThreadLocal 內(nèi)部持有這樣的一個(gè) Map 就可以了。你可以參考下面的示意圖和示例代碼來(lái)理解。
class MyThreadLocal<T> {Map<Thread, T> locals = new ConcurrentHashMap<>();//獲取線程變量 T get() {return locals.get(Thread.currentThread());}//設(shè)置線程變量void set(T t) {locals.put(Thread.currentThread(), t);} }那 Java 的 ThreadLocal 是這么實(shí)現(xiàn)的嗎?這一次我們的設(shè)計(jì)思路和 Java 的實(shí)現(xiàn)差異很大。Java 的實(shí)現(xiàn)里面也有一個(gè) Map,叫做 ThreadLocalMap,不過(guò)持有 ThreadLocalMap 的不是 ThreadLocal,而是 Thread。Thread 這個(gè)類(lèi)內(nèi)部有一個(gè)私有屬性 threadLocals,其類(lèi)型就是 ThreadLocalMap,ThreadLocalMap 的 Key 是 ThreadLocal。你可以結(jié)合下面的示意圖和精簡(jiǎn)之后的 Java 實(shí)現(xiàn)代碼來(lái)理解。
class Thread {//內(nèi)部持有ThreadLocalMapThreadLocal.ThreadLocalMap threadLocals; } class ThreadLocal<T>{public T get() {//首先獲取線程持有的//ThreadLocalMapThreadLocalMap map =Thread.currentThread().threadLocals;//在ThreadLocalMap中//查找變量Entry e = map.getEntry(this);return e.value; }static class ThreadLocalMap{//內(nèi)部是數(shù)組而不是MapEntry[] table;//根據(jù)ThreadLocal查找EntryEntry getEntry(ThreadLocal key){//省略查找邏輯}//Entry定義static class Entry extendsWeakReference<ThreadLocal>{Object value;}} }初看上去,我們的設(shè)計(jì)方案和 Java 的實(shí)現(xiàn)僅僅是 Map 的持有方不同而已,我們的設(shè)計(jì)里面 Map 屬于 ThreadLocal,而 Java 的實(shí)現(xiàn)里面 ThreadLocalMap 則是屬于 Thread。這兩種方式哪種更合理呢?很顯然 Java 的實(shí)現(xiàn)更合理一些。在 Java 的實(shí)現(xiàn)方案里面,ThreadLocal 僅僅是一個(gè)代理工具類(lèi),內(nèi)部并不持有任何與線程相關(guān)的數(shù)據(jù),所有和線程相關(guān)的數(shù)據(jù)都存儲(chǔ)在 Thread 里面,這樣的設(shè)計(jì)容易理解。而從數(shù)據(jù)的親緣性上來(lái)講,ThreadLocalMap 屬于 Thread 也更加合理。
當(dāng)然還有一個(gè)更加深層次的原因,那就是不容易產(chǎn)生內(nèi)存泄露。在我們的設(shè)計(jì)方案中,ThreadLocal 持有的 Map 會(huì)持有 Thread 對(duì)象的引用,這就意味著,只要 ThreadLocal 對(duì)象存在,那么 Map 中的 Thread 對(duì)象就永遠(yuǎn)不會(huì)被回收。ThreadLocal 的生命周期往往都比線程要長(zhǎng),所以這種設(shè)計(jì)方案很容易導(dǎo)致內(nèi)存泄露。而 Java 的實(shí)現(xiàn)中 Thread 持有 ThreadLocalMap,而且 ThreadLocalMap 里對(duì) ThreadLocal 的引用還是弱引用(WeakReference),所以只要 Thread 對(duì)象可以被回收,那么 ThreadLocalMap 就能被回收。Java 的這種實(shí)現(xiàn)方案雖然看上去復(fù)雜一些,但是更加安全。
Java 的 ThreadLocal 實(shí)現(xiàn)應(yīng)該稱(chēng)得上深思熟慮了,不過(guò)即便如此深思熟慮,還是不能百分百地讓程序員避免內(nèi)存泄露,例如在線程池中使用 ThreadLocal,如果不謹(jǐn)慎就可能導(dǎo)致內(nèi)存泄露。
(3)ThreadLocal 與內(nèi)存泄露
在線程池中使用 ThreadLocal 為什么可能導(dǎo)致內(nèi)存泄露呢?原因就出在線程池中線程的存活時(shí)間太長(zhǎng),往往都是和程序同生共死的,這就意味著 Thread 持有的 ThreadLocalMap 一直都不會(huì)被回收,再加上 ThreadLocalMap 中的 Entry 對(duì) ThreadLocal 是弱引用(WeakReference),所以只要 ThreadLocal 結(jié)束了自己的生命周期是可以被回收掉的。但是 Entry 中的 Value 卻是被 Entry 強(qiáng)引用的,所以即便 Value 的生命周期結(jié)束了,Value 也是無(wú)法被回收的,從而導(dǎo)致內(nèi)存泄露。
那在線程池中,我們?cè)撊绾握_使用 ThreadLocal 呢?其實(shí)很簡(jiǎn)單,既然 JVM 不能做到自動(dòng)釋放對(duì) Value 的強(qiáng)引用,那我們手動(dòng)釋放就可以了。如何能做到手動(dòng)釋放呢?估計(jì)你馬上想到 try{}finally{}方案了,這個(gè)簡(jiǎn)直就是手動(dòng)釋放資源的利器。示例的代碼如下,你可以參考學(xué)習(xí)。
ExecutorService es; ThreadLocal tl; es.execute(()->{//ThreadLocal增加變量tl.set(obj);try {// 省略業(yè)務(wù)邏輯代碼}finally {//手動(dòng)清理ThreadLocal tl.remove();} });(4)InheritableThreadLocal 與繼承性
通過(guò) ThreadLocal 創(chuàng)建的線程變量,其子線程是無(wú)法繼承的。也就是說(shuō)你在線程中通過(guò) ThreadLocal 創(chuàng)建了線程變量 V,而后該線程創(chuàng)建了子線程,你在子線程中是無(wú)法通過(guò) ThreadLocal 來(lái)訪問(wèn)父線程的線程變量 V 的。
如果你需要子線程繼承父線程的線程變量,那該怎么辦呢?其實(shí)很簡(jiǎn)單,Java 提供了 InheritableThreadLocal 來(lái)支持這種特性,InheritableThreadLocal 是 ThreadLocal 子類(lèi),所以用法和 ThreadLocal 相同,這里就不多介紹了。
不過(guò),我完全不建議你在線程池中使用 InheritableThreadLocal,不僅僅是因?yàn)樗哂?ThreadLocal 相同的缺點(diǎn)——可能導(dǎo)致內(nèi)存泄露,更重要的原因是:線程池中線程的創(chuàng)建是動(dòng)態(tài)的,很容易導(dǎo)致繼承關(guān)系錯(cuò)亂,如果你的業(yè)務(wù)邏輯依賴(lài) InheritableThreadLocal,那么很可能導(dǎo)致業(yè)務(wù)邏輯計(jì)算錯(cuò)誤,而這個(gè)錯(cuò)誤往往比內(nèi)存泄露更要命。
線程本地存儲(chǔ)模式本質(zhì)上是一種避免共享的方案,由于沒(méi)有共享,所以自然也就沒(méi)有并發(fā)問(wèn)題。如果你需要在并發(fā)場(chǎng)景中使用一個(gè)線程不安全的工具類(lèi),最簡(jiǎn)單的方案就是避免共享。避免共享有兩種方案,一種方案是將這個(gè)工具類(lèi)作為局部變量使用,另外一種方案就是線程本地存儲(chǔ)模式。這兩種方案,局部變量方案的缺點(diǎn)是在高并發(fā)場(chǎng)景下會(huì)頻繁創(chuàng)建對(duì)象,而線程本地存儲(chǔ)方案,每個(gè)線程只需要?jiǎng)?chuàng)建一個(gè)工具類(lèi)的實(shí)例,所以不存在頻繁創(chuàng)建對(duì)象的問(wèn)題。
線程本地存儲(chǔ)模式是解決并發(fā)問(wèn)題的常用方案,所以 Java SDK 也提供了相應(yīng)的實(shí)現(xiàn):ThreadLocal。通過(guò)上面我們的分析,你應(yīng)該能體會(huì)到 Java SDK 的實(shí)現(xiàn)已經(jīng)是深思熟慮了,不過(guò)即便如此,仍不能盡善盡美,例如在線程池中使用 ThreadLocal 仍可能導(dǎo)致內(nèi)存泄漏,所以使用 ThreadLocal 還是需要你打起精神,足夠謹(jǐn)慎。
4、Guarded Suspension模式:等待喚醒機(jī)制的規(guī)范實(shí)現(xiàn)
前不久,同事小灰工作中遇到一個(gè)問(wèn)題,他開(kāi)發(fā)了一個(gè) Web 項(xiàng)目:Web 版的文件瀏覽器,通過(guò)它用戶(hù)可以在瀏覽器里查看服務(wù)器上的目錄和文件。這個(gè)項(xiàng)目依賴(lài)運(yùn)維部門(mén)提供的文件瀏覽服務(wù),而這個(gè)文件瀏覽服務(wù)只支持消息隊(duì)列(MQ)方式接入。消息隊(duì)列在互聯(lián)網(wǎng)大廠中用的非常多,主要用作流量削峰和系統(tǒng)解耦。在這種接入方式中,發(fā)送消息和消費(fèi)結(jié)果這兩個(gè)操作之間是異步的,你可以參考下面的示意圖來(lái)理解。
在小灰的這個(gè) Web 項(xiàng)目中,用戶(hù)通過(guò)瀏覽器發(fā)過(guò)來(lái)一個(gè)請(qǐng)求,會(huì)被轉(zhuǎn)換成一個(gè)異步消息發(fā)送給 MQ,等 MQ 返回結(jié)果后,再將這個(gè)結(jié)果返回至瀏覽器。小灰同學(xué)的問(wèn)題是:給 MQ 發(fā)送消息的線程是處理 Web 請(qǐng)求的線程 T1,但消費(fèi) MQ 結(jié)果的線程并不是線程 T1,那線程 T1 如何等待 MQ 的返回結(jié)果呢?為了便于你理解這個(gè)場(chǎng)景,我將其代碼化了,示例代碼如下。
class Message{String id;String content; } //該方法可以發(fā)送消息 void send(Message msg){//省略相關(guān)代碼 } //MQ消息返回后會(huì)調(diào)用該方法 //該方法的執(zhí)行線程不同于 //發(fā)送消息的線程 void onMessage(Message msg){//省略相關(guān)代碼 } //處理瀏覽器發(fā)來(lái)的請(qǐng)求 Respond handleWebReq(){//創(chuàng)建一消息Message msg1 = new Message("1","{...}");//發(fā)送消息send(msg1);//如何等待MQ返回的消息呢?String result = ...; }異步轉(zhuǎn)同步問(wèn)題嗎?仔細(xì)分析,的確是這樣,不過(guò)在那一篇文章中我們只是介紹了最終方案,讓你知其然,但是并沒(méi)有介紹這個(gè)方案是如何設(shè)計(jì)出來(lái)的,今天咱們?cè)僮屑?xì)聊聊這個(gè)問(wèn)題,讓你知其所以然,遇到類(lèi)似問(wèn)題也能自己設(shè)計(jì)出方案來(lái)。
(1)Guarded Suspension 模式
上面小灰遇到的問(wèn)題,在現(xiàn)實(shí)世界里比比皆是,只是我們一不小心就忽略了。比如,項(xiàng)目組團(tuán)建要外出聚餐,我們提前預(yù)訂了一個(gè)包間,然后興沖沖地奔過(guò)去,到那兒后大堂經(jīng)理看了一眼包間,發(fā)現(xiàn)服務(wù)員正在收拾,就會(huì)告訴我們:“您預(yù)訂的包間服務(wù)員正在收拾,請(qǐng)您稍等片刻。”過(guò)了一會(huì),大堂經(jīng)理發(fā)現(xiàn)包間已經(jīng)收拾完了,于是馬上帶我們?nèi)グg就餐。
我們等待包間收拾完的這個(gè)過(guò)程和小灰遇到的等待 MQ 返回消息本質(zhì)上是一樣的,都是等待一個(gè)條件滿(mǎn)足:就餐需要等待包間收拾完,小灰的程序里要等待 MQ 返回消息。
那我們來(lái)看看現(xiàn)實(shí)世界里是如何解決這類(lèi)問(wèn)題的呢?現(xiàn)實(shí)世界里大堂經(jīng)理這個(gè)角色很重要,我們是否等待,完全是由他來(lái)協(xié)調(diào)的。通過(guò)類(lèi)比,相信你也一定有思路了:我們的程序里,也需要這樣一個(gè)大堂經(jīng)理。的確是這樣,那程序世界里的大堂經(jīng)理該如何設(shè)計(jì)呢?其實(shí)設(shè)計(jì)方案前人早就搞定了,而且還將其總結(jié)成了一個(gè)設(shè)計(jì)模式:Guarded Suspension。所謂 Guarded Suspension,直譯過(guò)來(lái)就是“保護(hù)性地暫停”。那下面我們就來(lái)看看,Guarded Suspension 模式是如何模擬大堂經(jīng)理進(jìn)行保護(hù)性地暫停的。
下圖就是 Guarded Suspension 模式的結(jié)構(gòu)圖,非常簡(jiǎn)單,一個(gè)對(duì)象 GuardedObject,內(nèi)部有一個(gè)成員變量——受保護(hù)的對(duì)象,以及兩個(gè)成員方法——get(Predicate p)和onChanged(T obj)方法。其中,對(duì)象 GuardedObject 就是我們前面提到的大堂經(jīng)理,受保護(hù)對(duì)象就是餐廳里面的包間;受保護(hù)對(duì)象的 get() 方法對(duì)應(yīng)的是我們的就餐,就餐的前提條件是包間已經(jīng)收拾好了,參數(shù) p 就是用來(lái)描述這個(gè)前提條件的;受保護(hù)對(duì)象的 onChanged() 方法對(duì)應(yīng)的是服務(wù)員把包間收拾好了,通過(guò) onChanged() 方法可以 fire 一個(gè)事件,而這個(gè)事件往往能改變前提條件 p 的計(jì)算結(jié)果。下圖中,左側(cè)的綠色線程就是需要就餐的顧客,而右側(cè)的藍(lán)色線程就是收拾包間的服務(wù)員。
GuardedObject 的內(nèi)部實(shí)現(xiàn)非常簡(jiǎn)單,是管程的一個(gè)經(jīng)典用法,你可以參考下面的示例代碼,核心是:get() 方法通過(guò)條件變量的 await() 方法實(shí)現(xiàn)等待,onChanged() 方法通過(guò)條件變量的 signalAll() 方法實(shí)現(xiàn)喚醒功能。邏輯還是很簡(jiǎn)單的,所以這里就不再詳細(xì)介紹了。
class GuardedObject<T>{//受保護(hù)的對(duì)象T obj;final Lock lock = new ReentrantLock();final Condition done =lock.newCondition();final int timeout=1;//獲取受保護(hù)對(duì)象 T get(Predicate<T> p) {lock.lock();try {//MESA管程推薦寫(xiě)法while(!p.test(obj)){done.await(timeout, TimeUnit.SECONDS);}}catch(InterruptedException e){throw new RuntimeException(e);}finally{lock.unlock();}//返回非空的受保護(hù)對(duì)象return obj;}//事件通知方法void onChanged(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}} }(2)擴(kuò)展 Guarded Suspension 模式
上面我們介紹了 Guarded Suspension 模式及其實(shí)現(xiàn),這個(gè)模式能夠模擬現(xiàn)實(shí)世界里大堂經(jīng)理的角色,那現(xiàn)在我們?cè)賮?lái)看看這個(gè)“大堂經(jīng)理”能否解決小灰同學(xué)遇到的問(wèn)題。
Guarded Suspension 模式里 GuardedObject 有兩個(gè)核心方法,一個(gè)是 get() 方法,一個(gè)是 onChanged() 方法。很顯然,在處理 Web 請(qǐng)求的方法 handleWebReq() 中,可以調(diào)用 GuardedObject 的 get() 方法來(lái)實(shí)現(xiàn)等待;在 MQ 消息的消費(fèi)方法 onMessage() 中,可以調(diào)用 GuardedObject 的 onChanged() 方法來(lái)實(shí)現(xiàn)喚醒。
//處理瀏覽器發(fā)來(lái)的請(qǐng)求 Respond handleWebReq(){//創(chuàng)建一消息Message msg1 = new Message("1","{...}");//發(fā)送消息send(msg1);//利用GuardedObject實(shí)現(xiàn)等待GuardedObject<Message> go=new GuardObjec<>();Message r = go.get(t->t != null); } void onMessage(Message msg){//如何找到匹配的go?GuardedObject<Message> go=???go.onChanged(msg); }但是在實(shí)現(xiàn)的時(shí)候會(huì)遇到一個(gè)問(wèn)題,handleWebReq() 里面創(chuàng)建了 GuardedObject 對(duì)象的實(shí)例 go,并調(diào)用其 get() 方等待結(jié)果,那在 onMessage() 方法中,如何才能夠找到匹配的 GuardedObject 對(duì)象呢?這個(gè)過(guò)程類(lèi)似服務(wù)員告訴大堂經(jīng)理某某包間已經(jīng)收拾好了,大堂經(jīng)理如何根據(jù)包間找到就餐的人。現(xiàn)實(shí)世界里,大堂經(jīng)理的頭腦中,有包間和就餐人之間的關(guān)系圖,所以服務(wù)員說(shuō)完之后大堂經(jīng)理立刻就能把就餐人找出來(lái)。
我們可以參考大堂經(jīng)理識(shí)別就餐人的辦法,來(lái)擴(kuò)展一下 Guarded Suspension 模式,從而使它能夠很方便地解決小灰同學(xué)的問(wèn)題。在小灰的程序中,每個(gè)發(fā)送到 MQ 的消息,都有一個(gè)唯一性的屬性 id,所以我們可以維護(hù)一個(gè) MQ 消息 id 和 GuardedObject 對(duì)象實(shí)例的關(guān)系,這個(gè)關(guān)系可以類(lèi)比大堂經(jīng)理大腦里維護(hù)的包間和就餐人的關(guān)系。
有了這個(gè)關(guān)系,我們來(lái)看看具體如何實(shí)現(xiàn)。下面的示例代碼是擴(kuò)展 Guarded Suspension 模式的實(shí)現(xiàn),擴(kuò)展后的 GuardedObject 內(nèi)部維護(hù)了一個(gè) Map,其 Key 是 MQ 消息 id,而 Value 是 GuardedObject 對(duì)象實(shí)例,同時(shí)增加了靜態(tài)方法 create() 和 fireEvent();create() 方法用來(lái)創(chuàng)建一個(gè) GuardedObject 對(duì)象實(shí)例,并根據(jù) key 值將其加入到 Map 中,而 fireEvent() 方法則是模擬的大堂經(jīng)理根據(jù)包間找就餐人的邏輯。
class GuardedObject<T>{//受保護(hù)的對(duì)象T obj;final Lock lock = new ReentrantLock();final Condition done =lock.newCondition();final int timeout=2;//保存所有GuardedObjectfinal static Map<Object, GuardedObject> gos=new ConcurrentHashMap<>();//靜態(tài)方法創(chuàng)建GuardedObjectstatic <K> GuardedObject create(K key){GuardedObject go=new GuardedObject();gos.put(key, go);return go;}static <K, T> void fireEvent(K key, T obj){GuardedObject go=gos.remove(key);if (go != null){go.onChanged(obj);}}//獲取受保護(hù)對(duì)象 T get(Predicate<T> p) {lock.lock();try {//MESA管程推薦寫(xiě)法while(!p.test(obj)){done.await(timeout, TimeUnit.SECONDS);}}catch(InterruptedException e){throw new RuntimeException(e);}finally{lock.unlock();}//返回非空的受保護(hù)對(duì)象return obj;}//事件通知方法void onChanged(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}} }這樣利用擴(kuò)展后的 GuardedObject 來(lái)解決小灰同學(xué)的問(wèn)題就很簡(jiǎn)單了,具體代碼如下所示。
//處理瀏覽器發(fā)來(lái)的請(qǐng)求 Respond handleWebReq(){int id=序號(hào)生成器.get();//創(chuàng)建一消息Message msg1 = new Message(id,"{...}");//創(chuàng)建GuardedObject實(shí)例GuardedObject<Message> go=GuardedObject.create(id); //發(fā)送消息send(msg1);//等待MQ消息Message r = go.get(t->t != null); } void onMessage(Message msg){//喚醒等待的線程GuardedObject.fireEvent(msg.id, msg); }Guarded Suspension 模式本質(zhì)上是一種等待喚醒機(jī)制的實(shí)現(xiàn),只不過(guò) Guarded Suspension 模式將其規(guī)范化了。規(guī)范化的好處是你無(wú)需重頭思考如何實(shí)現(xiàn),也無(wú)需擔(dān)心實(shí)現(xiàn)程序的可理解性問(wèn)題,同時(shí)也能避免一不小心寫(xiě)出個(gè) Bug 來(lái)。但 Guarded Suspension 模式在解決實(shí)際問(wèn)題的時(shí)候,往往還是需要擴(kuò)展的,擴(kuò)展的方式有很多,本篇文章就直接對(duì) GuardedObject 的功能進(jìn)行了增強(qiáng),Dubbo 中 DefaultFuture 這個(gè)類(lèi)也是采用的這種方式,你可以對(duì)比著來(lái)看,相信對(duì) DefaultFuture 的實(shí)現(xiàn)原理會(huì)理解得更透徹。當(dāng)然,你也可以創(chuàng)建新的類(lèi)來(lái)實(shí)現(xiàn)對(duì) Guarded Suspension 模式的擴(kuò)展。
Guarded Suspension 模式也常被稱(chēng)作 Guarded Wait 模式、Spin Lock 模式(因?yàn)槭褂昧?while 循環(huán)去等待),這些名字都很形象,不過(guò)它還有一個(gè)更形象的非官方名字:多線程版本的 if。單線程場(chǎng)景中,if 語(yǔ)句是不需要等待的,因?yàn)樵谥挥幸粋€(gè)線程的條件下,如果這個(gè)線程被阻塞,那就沒(méi)有其他活動(dòng)線程了,這意味著 if 判斷條件的結(jié)果也不會(huì)發(fā)生變化了。但是多線程場(chǎng)景中,等待就變得有意義了,這種場(chǎng)景下,if 判斷條件的結(jié)果是可能發(fā)生變化的。所以,用“多線程版本的 if”來(lái)理解這個(gè)模式會(huì)更簡(jiǎn)單。
5、Balking模式:再談線程安全的單例模式
“多線程版本的 if”來(lái)理解 Guarded Suspension 模式,不同于單線程中的 if,這個(gè)“多線程版本的 if”是需要等待的,而且還很執(zhí)著,必須要等到條件為真。但很顯然這個(gè)世界,不是所有場(chǎng)景都需要這么執(zhí)著,有時(shí)候我們還需要快速放棄。
需要快速放棄的一個(gè)最常見(jiàn)的例子是各種編輯器提供的自動(dòng)保存功能。自動(dòng)保存功能的實(shí)現(xiàn)邏輯一般都是隔一定時(shí)間自動(dòng)執(zhí)行存盤(pán)操作,存盤(pán)操作的前提是文件做過(guò)修改,如果文件沒(méi)有執(zhí)行過(guò)修改操作,就需要快速放棄存盤(pán)操作。下面的示例代碼將自動(dòng)保存功能代碼化了,很顯然 AutoSaveEditor 這個(gè)類(lèi)不是線程安全的,因?yàn)閷?duì)共享變量 changed 的讀寫(xiě)沒(méi)有使用同步,那如何保證 AutoSaveEditor 的線程安全性呢?
class AutoSaveEditor{//文件是否被修改過(guò)boolean changed=false;//定時(shí)任務(wù)線程池ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();//定時(shí)執(zhí)行自動(dòng)保存void startAutoSave(){ses.scheduleWithFixedDelay(()->{autoSave();}, 5, 5, TimeUnit.SECONDS); }//自動(dòng)存盤(pán)操作void autoSave(){if (!changed) {return;}changed = false;//執(zhí)行存盤(pán)操作//省略且實(shí)現(xiàn)this.execSave();}//編輯操作void edit(){//省略編輯邏輯......changed = true;} }解決這個(gè)問(wèn)題相信你一定手到擒來(lái)了:讀寫(xiě)共享變量 changed 的方法 autoSave() 和 edit() 都加互斥鎖就可以了。這樣做雖然簡(jiǎn)單,但是性能很差,原因是鎖的范圍太大了。那我們可以將鎖的范圍縮小,只在讀寫(xiě)共享變量 changed 的地方加鎖,實(shí)現(xiàn)代碼如下所示。
//自動(dòng)存盤(pán)操作 void autoSave(){synchronized(this){if (!changed) {return;}changed = false;}//執(zhí)行存盤(pán)操作//省略且實(shí)現(xiàn)this.execSave(); } //編輯操作 void edit(){//省略編輯邏輯......synchronized(this){changed = true;} }如果你深入地分析一下這個(gè)示例程序,你會(huì)發(fā)現(xiàn),示例中的共享變量是一個(gè)狀態(tài)變量,業(yè)務(wù)邏輯依賴(lài)于這個(gè)狀態(tài)變量的狀態(tài):當(dāng)狀態(tài)滿(mǎn)足某個(gè)條件時(shí),執(zhí)行某個(gè)業(yè)務(wù)邏輯,其本質(zhì)其實(shí)不過(guò)就是一個(gè) if 而已,放到多線程場(chǎng)景里,就是一種“多線程版本的 if”。這種“多線程版本的 if”的應(yīng)用場(chǎng)景還是很多的,所以也有人把它總結(jié)成了一種設(shè)計(jì)模式,叫做 Balking 模式。
(1)Balking 模式的經(jīng)典實(shí)現(xiàn)
Balking 模式本質(zhì)上是一種規(guī)范化地解決“多線程版本的 if”的方案,對(duì)于上面自動(dòng)保存的例子,使用 Balking 模式規(guī)范化之后的寫(xiě)法如下所示,你會(huì)發(fā)現(xiàn)僅僅是將 edit() 方法中對(duì)共享變量 changed 的賦值操作抽取到了 change() 中,這樣的好處是將并發(fā)處理邏輯和業(yè)務(wù)邏輯分開(kāi)。
boolean changed=false; //自動(dòng)存盤(pán)操作 void autoSave(){synchronized(this){if (!changed) {return;}changed = false;}//執(zhí)行存盤(pán)操作//省略且實(shí)現(xiàn)this.execSave(); } //編輯操作 void edit(){//省略編輯邏輯......change(); } //改變狀態(tài) void change(){synchronized(this){changed = true;} }(2)用 volatile 實(shí)現(xiàn) Balking 模式
前面我們用 synchronized 實(shí)現(xiàn)了 Balking 模式,這種實(shí)現(xiàn)方式最為穩(wěn)妥,建議你實(shí)際工作中也使用這個(gè)方案。不過(guò)在某些特定場(chǎng)景下,也可以使用 volatile 來(lái)實(shí)現(xiàn),但使用 volatile 的前提是對(duì)原子性沒(méi)有要求。
有一個(gè) RPC 框架路由表的案例,在 RPC 框架中,本地路由表是要和注冊(cè)中心進(jìn)行信息同步的,應(yīng)用啟動(dòng)的時(shí)候,會(huì)將應(yīng)用依賴(lài)服務(wù)的路由表從注冊(cè)中心同步到本地路由表中,如果應(yīng)用重啟的時(shí)候注冊(cè)中心宕機(jī),那么會(huì)導(dǎo)致該應(yīng)用依賴(lài)的服務(wù)均不可用,因?yàn)檎也坏揭蕾?lài)服務(wù)的路由表。為了防止這種極端情況出現(xiàn),RPC 框架可以將本地路由表自動(dòng)保存到本地文件中,如果重啟的時(shí)候注冊(cè)中心宕機(jī),那么就從本地文件中恢復(fù)重啟前的路由表。這其實(shí)也是一種降級(jí)的方案。
自動(dòng)保存路由表和前面介紹的編輯器自動(dòng)保存原理是一樣的,也可以用 Balking 模式實(shí)現(xiàn),不過(guò)我們這里采用 volatile 來(lái)實(shí)現(xiàn),實(shí)現(xiàn)的代碼如下所示。之所以可以采用 volatile 來(lái)實(shí)現(xiàn),是因?yàn)閷?duì)共享變量 changed 和 rt 的寫(xiě)操作不存在原子性的要求,而且采用 scheduleWithFixedDelay() 這種調(diào)度方式能保證同一時(shí)刻只有一個(gè)線程執(zhí)行 autoSave() 方法。
//路由表信息 public class RouterTable {//Key:接口名//Value:路由集合ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> rt = new ConcurrentHashMap<>(); //路由表是否發(fā)生變化volatile boolean changed;//將路由表寫(xiě)入本地文件的線程池ScheduledExecutorService ses=Executors.newSingleThreadScheduledExecutor();//啟動(dòng)定時(shí)任務(wù)//將變更后的路由表寫(xiě)入本地文件public void startLocalSaver(){ses.scheduleWithFixedDelay(()->{autoSave();}, 1, 1, MINUTES);}//保存路由表到本地文件void autoSave() {if (!changed) {return;}changed = false;//將路由表寫(xiě)入本地文件//省略其方法實(shí)現(xiàn)this.save2Local();}//刪除路由public void remove(Router router) {Set<Router> set=rt.get(router.iface);if (set != null) {set.remove(router);//路由表已發(fā)生變化changed = true;}}//增加路由public void add(Router router) {Set<Router> set = rt.computeIfAbsent(route.iface, r -> new CopyOnWriteArraySet<>());set.add(router);//路由表已發(fā)生變化changed = true;} }Balking 模式有一個(gè)非常典型的應(yīng)用場(chǎng)景就是單次初始化,下面的示例代碼是它的實(shí)現(xiàn)。這個(gè)實(shí)現(xiàn)方案中,我們將 init() 聲明為一個(gè)同步方法,這樣同一個(gè)時(shí)刻就只有一個(gè)線程能夠執(zhí)行 init() 方法;init() 方法在第一次執(zhí)行完時(shí)會(huì)將 inited 設(shè)置為 true,這樣后續(xù)執(zhí)行 init() 方法的線程就不會(huì)再執(zhí)行 doInit() 了。
class InitTest{boolean inited = false;synchronized void init(){if(inited){return;}//省略doInit的實(shí)現(xiàn)doInit();inited=true;} }線程安全的單例模式本質(zhì)上其實(shí)也是單次初始化,所以可以用 Balking 模式來(lái)實(shí)現(xiàn)線程安全的單例模式,下面的示例代碼是其實(shí)現(xiàn)。這個(gè)實(shí)現(xiàn)雖然功能上沒(méi)有問(wèn)題,但是性能卻很差,因?yàn)榛コ怄i synchronized 將 getInstance() 方法串行化了,那有沒(méi)有辦法可以?xún)?yōu)化一下它的性能呢?
class Singleton{private staticSingleton singleton;//構(gòu)造方法私有化 private Singleton(){}//獲取實(shí)例(單例)public synchronized static Singleton getInstance(){if(singleton == null){singleton=new Singleton();}return singleton;} }辦法當(dāng)然是有的,那就是經(jīng)典的雙重檢查(Double Check)方案,下面的示例代碼是其詳細(xì)實(shí)現(xiàn)。在雙重檢查方案中,一旦 Singleton 對(duì)象被成功創(chuàng)建之后,就不會(huì)執(zhí)行 synchronized(Singleton.class){}相關(guān)的代碼,也就是說(shuō),此時(shí) getInstance() 方法的執(zhí)行路徑是無(wú)鎖的,從而解決了性能問(wèn)題。不過(guò)需要你注意的是,這個(gè)方案中使用了 volatile 來(lái)禁止編譯優(yōu)化,其原因你可以參考《01 | 可見(jiàn)性、原子性和有序性問(wèn)題:并發(fā)編程 Bug 的源頭》中相關(guān)的內(nèi)容。至于獲取鎖后的二次檢查,則是出于對(duì)安全性負(fù)責(zé)。
class Singleton{private static volatile Singleton singleton;//構(gòu)造方法私有化 private Singleton() {}//獲取實(shí)例(單例)public static Singleton getInstance() {//第一次檢查if(singleton==null){synchronize(Singleton.class){//獲取鎖后二次檢查if(singleton==null){singleton=new Singleton();}}}return singleton;} }Balking 模式和 Guarded Suspension 模式從實(shí)現(xiàn)上看似乎沒(méi)有多大的關(guān)系,Balking 模式只需要用互斥鎖就能解決,而 Guarded Suspension 模式則要用到管程這種高級(jí)的并發(fā)原語(yǔ);但是從應(yīng)用的角度來(lái)看,它們解決的都是“線程安全的 if”語(yǔ)義,不同之處在于,Guarded Suspension 模式會(huì)等待 if 條件為真,而 Balking 模式不會(huì)等待。
Balking 模式的經(jīng)典實(shí)現(xiàn)是使用互斥鎖,你可以使用 Java 語(yǔ)言?xún)?nèi)置 synchronized,也可以使用 SDK 提供 Lock;如果你對(duì)互斥鎖的性能不滿(mǎn)意,可以嘗試采用 volatile 方案,不過(guò)使用 volatile 方案需要你更加謹(jǐn)慎。
當(dāng)然你也可以嘗試使用雙重檢查方案來(lái)優(yōu)化性能,雙重檢查中的第一次檢查,完全是出于對(duì)性能的考量:避免執(zhí)行加鎖操作,因?yàn)榧渔i操作很耗時(shí)。而加鎖之后的二次檢查,則是出于對(duì)安全性負(fù)責(zé)。雙重檢查方案在優(yōu)化加鎖性能方面經(jīng)常用到,例如實(shí)現(xiàn)緩存按需加載功能時(shí),也用到了雙重檢查方案。
6、Thread-Per-Message模式:最簡(jiǎn)單實(shí)用的分工方法
我們?cè)?jīng)把并發(fā)編程領(lǐng)域的問(wèn)題總結(jié)為三個(gè)核心問(wèn)題:分工、同步和互斥。其中,同步和互斥相關(guān)問(wèn)題更多地源自微觀,而分工問(wèn)題則是源自宏觀。我們解決問(wèn)題,往往都是從宏觀入手,在編程領(lǐng)域,軟件的設(shè)計(jì)過(guò)程也是先從概要設(shè)計(jì)開(kāi)始,而后才進(jìn)行詳細(xì)設(shè)計(jì)。同樣,解決并發(fā)編程問(wèn)題,首要問(wèn)題也是解決宏觀的分工問(wèn)題。
并發(fā)編程領(lǐng)域里,解決分工問(wèn)題也有一系列的設(shè)計(jì)模式,比較常用的主要有 Thread-Per-Message 模式、Worker Thread 模式、生產(chǎn)者 - 消費(fèi)者模式等等。今天我們重點(diǎn)介紹 Thread-Per-Message 模式。
(1)如何理解 Thread-Per-Message 模式
現(xiàn)實(shí)世界里,很多事情我們都需要委托他人辦理,一方面受限于我們的能力,總有很多搞不定的事,比如教育小朋友,搞不定怎么辦呢?只能委托學(xué)校老師了;另一方面受限于我們的時(shí)間,比如忙著寫(xiě) Bug,哪有時(shí)間買(mǎi)別墅呢?只能委托房產(chǎn)中介了。委托他人代辦有一個(gè)非常大的好處,那就是可以專(zhuān)心做自己的事了。
在編程領(lǐng)域也有很多類(lèi)似的需求,比如寫(xiě)一個(gè) HTTP Server,很顯然只能在主線程中接收請(qǐng)求,而不能處理 HTTP 請(qǐng)求,因?yàn)槿绻谥骶€程中處理 HTTP 請(qǐng)求的話(huà),那同一時(shí)間只能處理一個(gè)請(qǐng)求,太慢了!怎么辦呢?可以利用代辦的思路,創(chuàng)建一個(gè)子線程,委托子線程去處理 HTTP 請(qǐng)求。
這種委托他人辦理的方式,在并發(fā)編程領(lǐng)域被總結(jié)為一種設(shè)計(jì)模式,叫做 Thread-Per-Message 模式,簡(jiǎn)言之就是為每個(gè)任務(wù)分配一個(gè)獨(dú)立的線程。這是一種最簡(jiǎn)單的分工方法,實(shí)現(xiàn)起來(lái)也非常簡(jiǎn)單。
(2)用 Thread 實(shí)現(xiàn) Thread-Per-Message 模式
Thread-Per-Message 模式的一個(gè)最經(jīng)典的應(yīng)用場(chǎng)景是網(wǎng)絡(luò)編程里服務(wù)端的實(shí)現(xiàn),服務(wù)端為每個(gè)客戶(hù)端請(qǐng)求創(chuàng)建一個(gè)獨(dú)立的線程,當(dāng)線程處理完請(qǐng)求后,自動(dòng)銷(xiāo)毀,這是一種最簡(jiǎn)單的并發(fā)處理網(wǎng)絡(luò)請(qǐng)求的方法。
網(wǎng)絡(luò)編程里最簡(jiǎn)單的程序當(dāng)數(shù) echo 程序了,echo 程序的服務(wù)端會(huì)原封不動(dòng)地將客戶(hù)端的請(qǐng)求發(fā)送回客戶(hù)端。例如,客戶(hù)端發(fā)送 TCP 請(qǐng)求"Hello World",那么服務(wù)端也會(huì)返回"Hello World"。
下面我們就以 echo 程序的服務(wù)端為例,介紹如何實(shí)現(xiàn) Thread-Per-Message 模式。
在 Java 語(yǔ)言中,實(shí)現(xiàn) echo 程序的服務(wù)端還是很簡(jiǎn)單的。只需要 30 行代碼就能夠?qū)崿F(xiàn),示例代碼如下,我們?yōu)槊總€(gè)請(qǐng)求都創(chuàng)建了一個(gè) Java 線程,核心代碼是:new Thread(()->{...}).start()。
final ServerSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8080)); //處理請(qǐng)求 try {while (true) {// 接收請(qǐng)求SocketChannel sc = ssc.accept();// 每個(gè)請(qǐng)求都創(chuàng)建一個(gè)線程new Thread(()->{try {// 讀SocketByteBuffer rb = ByteBuffer.allocateDirect(1024);sc.read(rb);//模擬處理請(qǐng)求Thread.sleep(2000);// 寫(xiě)SocketByteBuffer wb = (ByteBuffer)rb.flip();sc.write(wb);// 關(guān)閉Socketsc.close();}catch(Exception e){throw new UncheckedIOException(e);}}).start();} } finally {ssc.close(); }如果你熟悉網(wǎng)絡(luò)編程,相信你一定會(huì)提出一個(gè)很尖銳的問(wèn)題:上面這個(gè) echo 服務(wù)的實(shí)現(xiàn)方案是不具備可行性的。原因在于 Java 中的線程是一個(gè)重量級(jí)的對(duì)象,創(chuàng)建成本很高,一方面創(chuàng)建線程比較耗時(shí),另一方面線程占用的內(nèi)存也比較大。所以,為每個(gè)請(qǐng)求創(chuàng)建一個(gè)新的線程并不適合高并發(fā)場(chǎng)景。
于是,你開(kāi)始質(zhì)疑 Thread-Per-Message 模式,而且開(kāi)始重新思索解決方案,這時(shí)候很可能你會(huì)想到 Java 提供的線程池。你的這個(gè)思路沒(méi)有問(wèn)題,但是引入線程池難免會(huì)增加復(fù)雜度。其實(shí)你完全可以換一個(gè)角度來(lái)思考這個(gè)問(wèn)題,語(yǔ)言、工具、框架本身應(yīng)該是幫助我們更敏捷地實(shí)現(xiàn)方案的,而不是用來(lái)否定方案的,Thread-Per-Message 模式作為一種最簡(jiǎn)單的分工方案,Java 語(yǔ)言支持不了,顯然是 Java 語(yǔ)言本身的問(wèn)題。
Java 語(yǔ)言里,Java 線程是和操作系統(tǒng)線程一一對(duì)應(yīng)的,這種做法本質(zhì)上是將 Java 線程的調(diào)度權(quán)完全委托給操作系統(tǒng),而操作系統(tǒng)在這方面非常成熟,所以這種做法的好處是穩(wěn)定、可靠,但是也繼承了操作系統(tǒng)線程的缺點(diǎn):創(chuàng)建成本高。為了解決這個(gè)缺點(diǎn),Java 并發(fā)包里提供了線程池等工具類(lèi)。這個(gè)思路在很長(zhǎng)一段時(shí)間里都是很穩(wěn)妥的方案,但是這個(gè)方案并不是唯一的方案。
業(yè)界還有另外一種方案,叫做輕量級(jí)線程。這個(gè)方案在 Java 領(lǐng)域知名度并不高,但是在其他編程語(yǔ)言里卻叫得很響,例如 Go 語(yǔ)言、Lua 語(yǔ)言里的協(xié)程,本質(zhì)上就是一種輕量級(jí)的線程。輕量級(jí)的線程,創(chuàng)建的成本很低,基本上和創(chuàng)建一個(gè)普通對(duì)象的成本相似;并且創(chuàng)建的速度和內(nèi)存占用相比操作系統(tǒng)線程至少有一個(gè)數(shù)量級(jí)的提升,所以基于輕量級(jí)線程實(shí)現(xiàn) Thread-Per-Message 模式就完全沒(méi)有問(wèn)題了。
Java 語(yǔ)言目前也已經(jīng)意識(shí)到輕量級(jí)線程的重要性了,OpenJDK 有個(gè) Loom 項(xiàng)目,就是要解決 Java 語(yǔ)言的輕量級(jí)線程問(wèn)題,在這個(gè)項(xiàng)目中,輕量級(jí)線程被叫做 Fiber。下面我們就來(lái)看看基于 Fiber 如何實(shí)現(xiàn) Thread-Per-Message 模式。
(3)用 Fiber 實(shí)現(xiàn) Thread-Per-Message 模式
Loom 項(xiàng)目在設(shè)計(jì)輕量級(jí)線程時(shí),充分考量了當(dāng)前 Java 線程的使用方式,采取的是盡量兼容的態(tài)度,所以使用上還是挺簡(jiǎn)單的。用 Fiber 實(shí)現(xiàn) echo 服務(wù)的示例代碼如下所示,對(duì)比 Thread 的實(shí)現(xiàn),你會(huì)發(fā)現(xiàn)改動(dòng)量非常小,只需要把 new Thread(()->{...}).start() 換成 Fiber.schedule(()->{}) 就可以了。
final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080)); //處理請(qǐng)求 try{while (true) {// 接收請(qǐng)求final SocketChannel sc = ssc.accept();Fiber.schedule(()->{try {// 讀SocketByteBuffer rb = ByteBuffer.allocateDirect(1024);sc.read(rb);//模擬處理請(qǐng)求LockSupport.parkNanos(2000*1000000);// 寫(xiě)SocketByteBuffer wb = (ByteBuffer)rb.flip()sc.write(wb);// 關(guān)閉Socketsc.close();} catch(Exception e){throw new UncheckedIOException(e);}});}//while }finally{ssc.close(); }那使用 Fiber 實(shí)現(xiàn)的 echo 服務(wù)是否能夠達(dá)到預(yù)期的效果呢?我們可以在 Linux 環(huán)境下做一個(gè)簡(jiǎn)單的實(shí)驗(yàn),步驟如下:
1、首先通過(guò) ulimit -u 512 將用戶(hù)能創(chuàng)建的最大進(jìn)程數(shù)(包括線程)設(shè)置為 512;
2、啟動(dòng) Fiber 實(shí)現(xiàn)的 echo 程序;
3、利用壓測(cè)工具 ab 進(jìn)行壓測(cè):ab -r -c 20000 -n 200000 http:// 測(cè)試機(jī) IP 地址:8080/
壓測(cè)執(zhí)行結(jié)果如下:
Concurrency Level: 20000 Time taken for tests: 67.718 seconds Complete requests: 200000 Failed requests: 0 Write errors: 0 Non-2xx responses: 200000 Total transferred: 16400000 bytes HTML transferred: 0 bytes Requests per second: 2953.41 [#/sec] (mean) Time per request: 6771.844 [ms] (mean) Time per request: 0.339 [ms] (mean, across all concurrent requests) Transfer rate: 236.50 [Kbytes/sec] receivedConnection Times (ms)min mean[+/-sd] median max Connect: 0 557 3541.6 1 63127 Processing: 2000 2010 31.8 2003 2615 Waiting: 1986 2008 30.9 2002 2615 Total: 2000 2567 3543.9 2004 65293你會(huì)發(fā)現(xiàn)即便在 20000 并發(fā)下,該程序依然能夠良好運(yùn)行。同等條件下,Thread 實(shí)現(xiàn)的 echo 程序 512 并發(fā)都抗不過(guò)去,直接就 OOM 了。
如果你通過(guò) Linux 命令 top -Hp pid 查看 Fiber 實(shí)現(xiàn)的 echo 程序的進(jìn)程信息,你可以看到該進(jìn)程僅僅創(chuàng)建了 16(不同 CPU 核數(shù)結(jié)果會(huì)不同)個(gè)操作系統(tǒng)線程。
并發(fā)編程領(lǐng)域的分工問(wèn)題,指的是如何高效地拆解任務(wù)并分配給線程。前面我們?cè)诓l(fā)工具類(lèi)模塊中已經(jīng)介紹了不少解決分工問(wèn)題的工具類(lèi),例如 Future、CompletableFuture 、CompletionService、Fork/Join 計(jì)算框架等,這些工具類(lèi)都能很好地解決特定應(yīng)用場(chǎng)景的問(wèn)題,所以,這些工具類(lèi)曾經(jīng)是 Java 語(yǔ)言引以為傲的。不過(guò)這些工具類(lèi)都繼承了 Java 語(yǔ)言的老毛病:太復(fù)雜。
如果你一直從事 Java 開(kāi)發(fā),估計(jì)你已經(jīng)習(xí)以為常了,習(xí)慣性地認(rèn)為這個(gè)復(fù)雜度是正常的。不過(guò)這個(gè)世界時(shí)刻都在變化,曾經(jīng)正常的復(fù)雜度,現(xiàn)在看來(lái)也許就已經(jīng)沒(méi)有必要了,例如 Thread-Per-Message 模式如果使用線程池方案就會(huì)增加復(fù)雜度。
Thread-Per-Message 模式在 Java 領(lǐng)域并不是那么知名,根本原因在于 Java 語(yǔ)言里的線程是一個(gè)重量級(jí)的對(duì)象,為每一個(gè)任務(wù)創(chuàng)建一個(gè)線程成本太高,尤其是在高并發(fā)領(lǐng)域,基本就不具備可行性。不過(guò)這個(gè)背景條件目前正在發(fā)生巨變,Java 語(yǔ)言未來(lái)一定會(huì)提供輕量級(jí)線程,這樣基于輕量級(jí)線程實(shí)現(xiàn) Thread-Per-Message 模式就是一個(gè)非常靠譜的選擇。
當(dāng)然,對(duì)于一些并發(fā)度沒(méi)那么高的異步場(chǎng)景,例如定時(shí)任務(wù),采用 Thread-Per-Message 模式是完全沒(méi)有問(wèn)題的。實(shí)際工作中,我就見(jiàn)過(guò)完全基于 Thread-Per-Message 模式實(shí)現(xiàn)的分布式調(diào)度框架,這個(gè)框架為每個(gè)定時(shí)任務(wù)都分配了一個(gè)獨(dú)立的線程。
7、Worker Thread模式:如何避免重復(fù)創(chuàng)建線程?
我們介紹了一種最簡(jiǎn)單的分工模式——Thread-Per-Message 模式,對(duì)應(yīng)到現(xiàn)實(shí)世界,其實(shí)就是委托代辦。這種分工模式如果用 Java Thread 實(shí)現(xiàn),頻繁地創(chuàng)建、銷(xiāo)毀線程非常影響性能,同時(shí)無(wú)限制地創(chuàng)建線程還可能導(dǎo)致 OOM,所以在 Java 領(lǐng)域使用場(chǎng)景就受限了。
要想有效避免線程的頻繁創(chuàng)建、銷(xiāo)毀以及 OOM 問(wèn)題,就不得不提今天我們要細(xì)聊的,也是 Java 領(lǐng)域使用最多的 Worker Thread 模式。
(1)Worker Thread 模式及其實(shí)現(xiàn)
Worker Thread 模式可以類(lèi)比現(xiàn)實(shí)世界里車(chē)間的工作模式:車(chē)間里的工人,有活兒了,大家一起干,沒(méi)活兒了就聊聊天等著。你可以參考下面的示意圖來(lái)理解,Worker Thread 模式中 Worker Thread 對(duì)應(yīng)到現(xiàn)實(shí)世界里,其實(shí)指的就是車(chē)間里的工人。不過(guò)這里需要注意的是,車(chē)間里的工人數(shù)量往往是確定的。
那在編程領(lǐng)域該如何模擬車(chē)間的這種工作模式呢?或者說(shuō)如何去實(shí)現(xiàn) Worker Thread 模式呢?通過(guò)上面的圖,你很容易就能想到用阻塞隊(duì)列做任務(wù)池,然后創(chuàng)建固定數(shù)量的線程消費(fèi)阻塞隊(duì)列中的任務(wù)。其實(shí)你仔細(xì)想會(huì)發(fā)現(xiàn),這個(gè)方案就是 Java 語(yǔ)言提供的線程池。
線程池有很多優(yōu)點(diǎn),例如能夠避免重復(fù)創(chuàng)建、銷(xiāo)毀線程,同時(shí)能夠限制創(chuàng)建線程的上限等等。學(xué)習(xí)完上一篇文章后你已經(jīng)知道,用 Java 的 Thread 實(shí)現(xiàn) Thread-Per-Message 模式難以應(yīng)對(duì)高并發(fā)場(chǎng)景,原因就在于頻繁創(chuàng)建、銷(xiāo)毀 Java 線程的成本有點(diǎn)高,而且無(wú)限制地創(chuàng)建線程還可能導(dǎo)致應(yīng)用 OOM。線程池,則恰好能解決這些問(wèn)題。
那我們還是以 echo 程序?yàn)槔?#xff0c;看看如何用線程池來(lái)實(shí)現(xiàn)。
下面的示例代碼是用線程池實(shí)現(xiàn)的 echo 服務(wù)端,相比于 Thread-Per-Message 模式的實(shí)現(xiàn),改動(dòng)非常少,僅僅是創(chuàng)建了一個(gè)最多線程數(shù)為 500 的線程池 es,然后通過(guò) es.execute() 方法將請(qǐng)求處理的任務(wù)提交給線程池處理。
ExecutorService es = Executors.newFixedThreadPool(500); final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080)); //處理請(qǐng)求 try {while (true) {// 接收請(qǐng)求SocketChannel sc = ssc.accept();// 將請(qǐng)求處理任務(wù)提交給線程池es.execute(()->{try {// 讀SocketByteBuffer rb = ByteBuffer.allocateDirect(1024);sc.read(rb);//模擬處理請(qǐng)求Thread.sleep(2000);// 寫(xiě)SocketByteBuffer wb = (ByteBuffer)rb.flip();sc.write(wb);// 關(guān)閉Socketsc.close();}catch(Exception e){throw new UncheckedIOException(e);}});} } finally {ssc.close();es.shutdown(); }(1)正確地創(chuàng)建線程池
① Java 的線程池既能夠避免無(wú)限制地創(chuàng)建線程導(dǎo)致 OOM,也能避免無(wú)限制地接收任務(wù)導(dǎo)致 OOM。只不過(guò)后者經(jīng)常容易被我們忽略,例如在上面的實(shí)現(xiàn)中,就被我們忽略了。所以強(qiáng)烈建議你用創(chuàng)建有界的隊(duì)列來(lái)接收任務(wù)。
② 當(dāng)請(qǐng)求量大于有界隊(duì)列的容量時(shí),就需要合理地拒絕請(qǐng)求。如何合理地拒絕呢?這需要你結(jié)合具體的業(yè)務(wù)場(chǎng)景來(lái)制定,即便線程池默認(rèn)的拒絕策略能夠滿(mǎn)足你的需求,也同樣建議你在創(chuàng)建線程池時(shí),清晰地指明拒絕策略。
③ 同時(shí),為了便于調(diào)試和診斷問(wèn)題,我也強(qiáng)烈建議你在實(shí)際工作中給線程賦予一個(gè)業(yè)務(wù)相關(guān)的名字。
綜合以上這三點(diǎn)建議,echo 程序中創(chuàng)建線程可以使用下面的示例代碼。
ExecutorService es = new ThreadPoolExecutor(50, 500,60L, TimeUnit.SECONDS,//注意要?jiǎng)?chuàng)建有界隊(duì)列new LinkedBlockingQueue<Runnable>(2000),//建議根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)ThreadFactoryr->{return new Thread(r, "echo-"+ r.hashCode());},//建議根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)RejectedExecutionHandlernew ThreadPoolExecutor.CallerRunsPolicy());(2)避免線程死鎖
使用線程池過(guò)程中,還要注意一種線程死鎖的場(chǎng)景。如果提交到相同線程池的任務(wù)不是相互獨(dú)立的,而是有依賴(lài)關(guān)系的,那么就有可能導(dǎo)致線程死鎖。實(shí)際工作中,我就親歷過(guò)這種線程死鎖的場(chǎng)景。具體現(xiàn)象是應(yīng)用每運(yùn)行一段時(shí)間偶爾就會(huì)處于無(wú)響應(yīng)的狀態(tài),監(jiān)控?cái)?shù)據(jù)看上去一切都正常,但是實(shí)際上已經(jīng)不能正常工作了。
這個(gè)出問(wèn)題的應(yīng)用,相關(guān)的邏輯精簡(jiǎn)之后,如下圖所示,該應(yīng)用將一個(gè)大型的計(jì)算任務(wù)分成兩個(gè)階段,第一個(gè)階段的任務(wù)會(huì)等待第二階段的子任務(wù)完成。在這個(gè)應(yīng)用里,每一個(gè)階段都使用了線程池,而且兩個(gè)階段使用的還是同一個(gè)線程池。
我們可以用下面的示例代碼來(lái)模擬該應(yīng)用,如果你執(zhí)行下面的這段代碼,會(huì)發(fā)現(xiàn)它永遠(yuǎn)執(zhí)行不到最后一行。執(zhí)行過(guò)程中沒(méi)有任何異常,但是應(yīng)用已經(jīng)停止響應(yīng)了。
//L1、L2階段共用的線程池 ExecutorService es = Executors.newFixedThreadPool(2); //L1階段的閉鎖 CountDownLatch l1=new CountDownLatch(2); for (int i=0; i<2; i++){System.out.println("L1");//執(zhí)行L1階段任務(wù)es.execute(()->{//L2階段的閉鎖 CountDownLatch l2=new CountDownLatch(2);//執(zhí)行L2階段子任務(wù)for (int j=0; j<2; j++){es.execute(()->{System.out.println("L2");l2.countDown();});}//等待L2階段任務(wù)執(zhí)行完l2.await();l1.countDown();}); } //等著L1階段任務(wù)執(zhí)行完 l1.await(); System.out.println("end");當(dāng)應(yīng)用出現(xiàn)類(lèi)似問(wèn)題時(shí),首選的診斷方法是查看線程棧。下圖是上面示例代碼停止響應(yīng)后的線程棧,你會(huì)發(fā)現(xiàn)線程池中的兩個(gè)線程全部都阻塞在 l2.await(); 這行代碼上了,也就是說(shuō),線程池里所有的線程都在等待 L2 階段的任務(wù)執(zhí)行完,那 L2 階段的子任務(wù)什么時(shí)候能夠執(zhí)行完呢?永遠(yuǎn)都沒(méi)那一天了,為什么呢?因?yàn)榫€程池里的線程都阻塞了,沒(méi)有空閑的線程執(zhí)行 L2 階段的任務(wù)了。
原因找到了,那如何解決就簡(jiǎn)單了,最簡(jiǎn)單粗暴的辦法就是將線程池的最大線程數(shù)調(diào)大,如果能夠確定任務(wù)的數(shù)量不是非常多的話(huà),這個(gè)辦法也是可行的,否則這個(gè)辦法就行不通了。其實(shí)這種問(wèn)題通用的解決方案是為不同的任務(wù)創(chuàng)建不同的線程池。對(duì)于上面的這個(gè)應(yīng)用,L1 階段的任務(wù)和 L2 階段的任務(wù)如果各自都有自己的線程池,就不會(huì)出現(xiàn)這種問(wèn)題了。
最后再次強(qiáng)調(diào)一下:提交到相同線程池中的任務(wù)一定是相互獨(dú)立的,否則就一定要慎重。
我們?cè)?jīng)說(shuō)過(guò),解決并發(fā)編程里的分工問(wèn)題,最好的辦法是和現(xiàn)實(shí)世界做對(duì)比。對(duì)比現(xiàn)實(shí)世界構(gòu)建編程領(lǐng)域的模型,能夠讓模型更容易理解。上一篇我們介紹的 Thread-Per-Message 模式,類(lèi)似于現(xiàn)實(shí)世界里的委托他人辦理,而今天介紹的 Worker Thread 模式則類(lèi)似于車(chē)間里工人的工作模式。如果你在設(shè)計(jì)階段,發(fā)現(xiàn)對(duì)業(yè)務(wù)模型建模之后,模型非常類(lèi)似于車(chē)間的工作模式,那基本上就能確定可以在實(shí)現(xiàn)階段采用 Worker Thread 模式來(lái)實(shí)現(xiàn)。
Worker Thread 模式和 Thread-Per-Message 模式的區(qū)別有哪些呢?從現(xiàn)實(shí)世界的角度看,你委托代辦人做事,往往是和代辦人直接溝通的;對(duì)應(yīng)到編程領(lǐng)域,其實(shí)現(xiàn)也是主線程直接創(chuàng)建了一個(gè)子線程,主子線程之間是可以直接通信的。而車(chē)間工人的工作方式則是完全圍繞任務(wù)展開(kāi)的,一個(gè)具體的任務(wù)被哪個(gè)工人執(zhí)行,預(yù)先是無(wú)法知道的;對(duì)應(yīng)到編程領(lǐng)域,則是主線程提交任務(wù)到線程池,但主線程并不關(guān)心任務(wù)被哪個(gè)線程執(zhí)行。
Worker Thread 模式能避免線程頻繁創(chuàng)建、銷(xiāo)毀的問(wèn)題,而且能夠限制線程的最大數(shù)量。Java 語(yǔ)言里可以直接使用線程池來(lái)實(shí)現(xiàn) Worker Thread 模式,線程池是一個(gè)非常基礎(chǔ)和優(yōu)秀的工具類(lèi),甚至有些大廠的編碼規(guī)范都不允許用 new Thread() 來(lái)創(chuàng)建線程的,必須使用線程池。
不過(guò)使用線程池還是需要格外謹(jǐn)慎的,除了今天重點(diǎn)講到的如何正確創(chuàng)建線程池、如何避免線程死鎖問(wèn)題,還需要注意前面我們?cè)?jīng)提到的 ThreadLocal 內(nèi)存泄露問(wèn)題。同時(shí)對(duì)于提交到線程池的任務(wù),還要做好異常處理,避免異常的任務(wù)從眼前溜走,從業(yè)務(wù)的角度看,有時(shí)沒(méi)有發(fā)現(xiàn)異常的任務(wù)后果往往都很?chē)?yán)重。
8、兩階段終止模式:如何優(yōu)雅地終止線程?
從純技術(shù)的角度看,都是啟動(dòng)多線程去執(zhí)行一個(gè)異步任務(wù)。既啟動(dòng),那又該如何終止呢?今天咱們就從技術(shù)的角度聊聊如何優(yōu)雅地終止線程,正所謂有始有終。
線程執(zhí)行完或者出現(xiàn)異常就會(huì)進(jìn)入終止?fàn)顟B(tài)。這樣看,終止一個(gè)線程看上去很簡(jiǎn)單啊!一個(gè)線程執(zhí)行完自己的任務(wù),自己進(jìn)入終止?fàn)顟B(tài),這的確很簡(jiǎn)單。不過(guò)我們今天談到的“優(yōu)雅地終止線程”,不是自己終止自己,而是在一個(gè)線程 T1 中,終止線程 T2;這里所謂的“優(yōu)雅”,指的是給 T2 一個(gè)機(jī)會(huì)料理后事,而不是被一劍封喉。
Java 語(yǔ)言的 Thread 類(lèi)中曾經(jīng)提供了一個(gè) stop() 方法,用來(lái)終止線程,可是早已不建議使用了,原因是這個(gè)方法用的就是一劍封喉的做法,被終止的線程沒(méi)有機(jī)會(huì)料理后事。
既然不建議使用 stop() 方法,那在 Java 領(lǐng)域,我們又該如何優(yōu)雅地終止線程呢?
(1)如何理解兩階段終止模式
前輩們經(jīng)過(guò)認(rèn)真對(duì)比分析,已經(jīng)總結(jié)出了一套成熟的方案,叫做兩階段終止模式。顧名思義,就是將終止過(guò)程分成兩個(gè)階段,其中第一個(gè)階段主要是線程 T1 向線程 T2發(fā)送終止指令,而第二階段則是線程 T2響應(yīng)終止指令。
那在 Java 語(yǔ)言里,終止指令是什么呢?這個(gè)要從 Java 線程的狀態(tài)轉(zhuǎn)換過(guò)程說(shuō)起。Java 線程的狀態(tài)轉(zhuǎn)換圖,如下圖所示。
從這個(gè)圖里你會(huì)發(fā)現(xiàn),Java 線程進(jìn)入終止?fàn)顟B(tài)的前提是線程進(jìn)入 RUNNABLE 狀態(tài),而實(shí)際上線程也可能處在休眠狀態(tài),也就是說(shuō),我們要想終止一個(gè)線程,首先要把線程的狀態(tài)從休眠狀態(tài)轉(zhuǎn)換到 RUNNABLE 狀態(tài)。如何做到呢?這個(gè)要靠 Java Thread 類(lèi)提供的 interrupt() 方法,它可以將休眠狀態(tài)的線程轉(zhuǎn)換到 RUNNABLE 狀態(tài)。
線程轉(zhuǎn)換到 RUNNABLE 狀態(tài)之后,我們?nèi)绾卧賹⑵浣K止呢?RUNNABLE 狀態(tài)轉(zhuǎn)換到終止?fàn)顟B(tài),優(yōu)雅的方式是讓 Java 線程自己執(zhí)行完 run() 方法,所以一般我們采用的方法是設(shè)置一個(gè)標(biāo)志位,然后線程會(huì)在合適的時(shí)機(jī)檢查這個(gè)標(biāo)志位,如果發(fā)現(xiàn)符合終止條件,則自動(dòng)退出 run() 方法。這個(gè)過(guò)程其實(shí)就是我們前面提到的第二階段:響應(yīng)終止指令。
綜合上面這兩點(diǎn),我們能總結(jié)出終止指令,其實(shí)包括兩方面內(nèi)容:interrupt() 方法和線程終止的標(biāo)志位。
理解了兩階段終止模式之后,下面我們看一個(gè)實(shí)際工作中的案例。
(2)用兩階段終止模式終止監(jiān)控操作
實(shí)際工作中,有些監(jiān)控系統(tǒng)需要?jiǎng)討B(tài)地采集一些數(shù)據(jù),一般都是監(jiān)控系統(tǒng)發(fā)送采集指令給被監(jiān)控系統(tǒng)的監(jiān)控代理,監(jiān)控代理接收到指令之后,從監(jiān)控目標(biāo)收集數(shù)據(jù),然后回傳給監(jiān)控系統(tǒng),詳細(xì)過(guò)程如下圖所示。出于對(duì)性能的考慮(有些監(jiān)控項(xiàng)對(duì)系統(tǒng)性能影響很大,所以不能一直持續(xù)監(jiān)控),動(dòng)態(tài)采集功能一般都會(huì)有終止操作。
下面的示例代碼是監(jiān)控代理簡(jiǎn)化之后的實(shí)現(xiàn),start() 方法會(huì)啟動(dòng)一個(gè)新的線程 rptThread 來(lái)執(zhí)行監(jiān)控?cái)?shù)據(jù)采集和回傳的功能,stop() 方法需要優(yōu)雅地終止線程 rptThread,那 stop() 相關(guān)功能該如何實(shí)現(xiàn)呢?
class Proxy {boolean started = false;//采集線程Thread rptThread;//啟動(dòng)采集功能synchronized void start(){//不允許同時(shí)啟動(dòng)多個(gè)采集線程if (started) {return;}started = true;rptThread = new Thread(()->{while (true) {//省略采集、回傳實(shí)現(xiàn)report();//每隔兩秒鐘采集、回傳一次數(shù)據(jù)try {Thread.sleep(2000);} catch (InterruptedException e) { }}//執(zhí)行到此處說(shuō)明線程馬上終止started = false;});rptThread.start();}//終止采集功能synchronized void stop(){//如何實(shí)現(xiàn)?} }按照兩階段終止模式,我們首先需要做的就是將線程 rptThread 狀態(tài)轉(zhuǎn)換到 RUNNABLE,做法很簡(jiǎn)單,只需要在調(diào)用 rptThread.interrupt() 就可以了。線程 rptThread 的狀態(tài)轉(zhuǎn)換到 RUNNABLE 之后,如何優(yōu)雅地終止呢?下面的示例代碼中,我們選擇的標(biāo)志位是線程的中斷狀態(tài):Thread.currentThread().isInterrupted() ,需要注意的是,我們?cè)诓东@ Thread.sleep() 的中斷異常之后,通過(guò) Thread.currentThread().interrupt() 重新設(shè)置了線程的中斷狀態(tài),因?yàn)?JVM 的異常處理會(huì)清除線程的中斷狀態(tài)。
class Proxy {boolean started = false;//采集線程Thread rptThread;//啟動(dòng)采集功能synchronized void start(){//不允許同時(shí)啟動(dòng)多個(gè)采集線程if (started) {return;}started = true;rptThread = new Thread(()->{while (!Thread.currentThread().isInterrupted()){//省略采集、回傳實(shí)現(xiàn)report();//每隔兩秒鐘采集、回傳一次數(shù)據(jù)try {Thread.sleep(2000);} catch (InterruptedException e){//重新設(shè)置線程中斷狀態(tài)Thread.currentThread().interrupt();}}//執(zhí)行到此處說(shuō)明線程馬上終止started = false;});rptThread.start();}//終止采集功能synchronized void stop(){rptThread.interrupt();} }上面的示例代碼的確能夠解決當(dāng)前的問(wèn)題,但是建議你在實(shí)際工作中謹(jǐn)慎使用。原因在于我們很可能在線程的 run() 方法中調(diào)用第三方類(lèi)庫(kù)提供的方法,而我們沒(méi)有辦法保證第三方類(lèi)庫(kù)正確處理了線程的中斷異常,例如第三方類(lèi)庫(kù)在捕獲到 Thread.sleep() 方法拋出的中斷異常后,沒(méi)有重新設(shè)置線程的中斷狀態(tài),那么就會(huì)導(dǎo)致線程不能夠正常終止。所以強(qiáng)烈建議你設(shè)置自己的線程終止標(biāo)志位,例如在下面的代碼中,使用 isTerminated 作為線程終止標(biāo)志位,此時(shí)無(wú)論是否正確處理了線程的中斷異常,都不會(huì)影響線程優(yōu)雅地終止。
class Proxy {//線程終止標(biāo)志位volatile boolean terminated = false;boolean started = false;//采集線程Thread rptThread;//啟動(dòng)采集功能synchronized void start(){//不允許同時(shí)啟動(dòng)多個(gè)采集線程if (started) {return;}started = true;terminated = false;rptThread = new Thread(()->{while (!terminated){//省略采集、回傳實(shí)現(xiàn)report();//每隔兩秒鐘采集、回傳一次數(shù)據(jù)try {Thread.sleep(2000);} catch (InterruptedException e){//重新設(shè)置線程中斷狀態(tài)Thread.currentThread().interrupt();}}//執(zhí)行到此處說(shuō)明線程馬上終止started = false;});rptThread.start();}//終止采集功能synchronized void stop(){//設(shè)置中斷標(biāo)志位terminated = true;//中斷線程rptThreadrptThread.interrupt();} }(3)如何優(yōu)雅地終止線程池
Java 領(lǐng)域用的最多的還是線程池,而不是手動(dòng)地創(chuàng)建線程。那我們?cè)撊绾蝺?yōu)雅地終止線程池呢?
線程池提供了兩個(gè)方法:shutdown()和shutdownNow()。這兩個(gè)方法有什么區(qū)別呢?要了解它們的區(qū)別,就先需要了解線程池的實(shí)現(xiàn)原理。
我們?cè)?jīng)講過(guò),Java 線程池是生產(chǎn)者 - 消費(fèi)者模式的一種實(shí)現(xiàn),提交給線程池的任務(wù),首先是進(jìn)入一個(gè)阻塞隊(duì)列中,之后線程池中的線程從阻塞隊(duì)列中取出任務(wù)執(zhí)行。
shutdown() 方法是一種很保守的關(guān)閉線程池的方法。線程池執(zhí)行 shutdown() 后,就會(huì)拒絕接收新的任務(wù),但是會(huì)等待線程池中正在執(zhí)行的任務(wù)和已經(jīng)進(jìn)入阻塞隊(duì)列的任務(wù)都執(zhí)行完之后才最終關(guān)閉線程池。
而 shutdownNow() 方法,相對(duì)就激進(jìn)一些了,線程池執(zhí)行 shutdownNow() 后,會(huì)拒絕接收新的任務(wù),同時(shí)還會(huì)中斷線程池中正在執(zhí)行的任務(wù),已經(jīng)進(jìn)入阻塞隊(duì)列的任務(wù)也被剝奪了執(zhí)行的機(jī)會(huì),不過(guò)這些被剝奪執(zhí)行機(jī)會(huì)的任務(wù)會(huì)作為 shutdownNow() 方法的返回值返回。因?yàn)?shutdownNow() 方法會(huì)中斷正在執(zhí)行的線程,所以提交到線程池的任務(wù),如果需要優(yōu)雅地結(jié)束,就需要正確地處理線程中斷。
如果提交到線程池的任務(wù)不允許取消,那就不能使用 shutdownNow() 方法終止線程池。不過(guò),如果提交到線程池的任務(wù)允許后續(xù)以補(bǔ)償?shù)姆绞街匦聢?zhí)行,也是可以使用 shutdownNow() 方法終止線程池的。《Java 并發(fā)編程實(shí)戰(zhàn)》這本書(shū)第 7 章《取消與關(guān)閉》的“shutdownNow 的局限性”一節(jié)中,提到一種將已提交但尚未開(kāi)始執(zhí)行的任務(wù)以及已經(jīng)取消的正在執(zhí)行的任務(wù)保存起來(lái),以便后續(xù)重新執(zhí)行的方案,你可以參考一下,方案很簡(jiǎn)單,這里就不詳細(xì)介紹了。
其實(shí)分析完 shutdown() 和 shutdownNow() 方法你會(huì)發(fā)現(xiàn),它們實(shí)質(zhì)上使用的也是兩階段終止模式,只是終止指令的范圍不同而已,前者只影響阻塞隊(duì)列接收任務(wù),后者范圍擴(kuò)大到線程池中所有的任務(wù)。
兩階段終止模式是一種應(yīng)用很廣泛的并發(fā)設(shè)計(jì)模式,在 Java 語(yǔ)言中使用兩階段終止模式來(lái)優(yōu)雅地終止線程,需要注意兩個(gè)關(guān)鍵點(diǎn):一個(gè)是僅檢查終止標(biāo)志位是不夠的,因?yàn)榫€程的狀態(tài)可能處于休眠態(tài);另一個(gè)是僅檢查線程的中斷狀態(tài)也是不夠的,因?yàn)槲覀円蕾?lài)的第三方類(lèi)庫(kù)很可能沒(méi)有正確處理中斷異常。
當(dāng)你使用 Java 的線程池來(lái)管理線程的時(shí)候,需要依賴(lài)線程池提供的 shutdown() 和 shutdownNow() 方法來(lái)終止線程池。不過(guò)在使用時(shí)需要注意它們的應(yīng)用場(chǎng)景,尤其是在使用 shutdownNow() 的時(shí)候,一定要謹(jǐn)慎。
9、生產(chǎn)者-消費(fèi)者模式:用流水線思想提高效率
Worker Thread 模式類(lèi)比的是工廠里車(chē)間工人的工作模式。但其實(shí)在現(xiàn)實(shí)世界,工廠里還有一種流水線的工作模式,類(lèi)比到編程領(lǐng)域,就是生產(chǎn)者 - 消費(fèi)者模式。
生產(chǎn)者 - 消費(fèi)者模式在編程領(lǐng)域的應(yīng)用也非常廣泛,前面我們?cè)?jīng)提到,Java 線程池本質(zhì)上就是用生產(chǎn)者 - 消費(fèi)者模式實(shí)現(xiàn)的,所以每當(dāng)使用線程池的時(shí)候,其實(shí)就是在應(yīng)用生產(chǎn)者 - 消費(fèi)者模式。
當(dāng)然,除了在線程池中的應(yīng)用,為了提升性能,并發(fā)編程領(lǐng)域很多地方也都用到了生產(chǎn)者 - 消費(fèi)者模式,例如 Log4j2 中異步 Appender 內(nèi)部也用到了生產(chǎn)者 - 消費(fèi)者模式。所以今天我們就來(lái)深入地聊聊生產(chǎn)者 - 消費(fèi)者模式,看看它具體有哪些優(yōu)點(diǎn),以及如何提升系統(tǒng)的性能。
(1)生產(chǎn)者 - 消費(fèi)者模式的優(yōu)點(diǎn)
生產(chǎn)者 - 消費(fèi)者模式的核心是一個(gè)任務(wù)隊(duì)列,生產(chǎn)者線程生產(chǎn)任務(wù),并將任務(wù)添加到任務(wù)隊(duì)列中,而消費(fèi)者線程從任務(wù)隊(duì)列中獲取任務(wù)并執(zhí)行。下面是生產(chǎn)者 - 消費(fèi)者模式的一個(gè)示意圖,你可以結(jié)合它來(lái)理解。
從架構(gòu)設(shè)計(jì)的角度來(lái)看,生產(chǎn)者 - 消費(fèi)者模式有一個(gè)很重要的優(yōu)點(diǎn),就是解耦。解耦對(duì)于大型系統(tǒng)的設(shè)計(jì)非常重要,而解耦的一個(gè)關(guān)鍵就是組件之間的依賴(lài)關(guān)系和通信方式必須受限。在生產(chǎn)者 - 消費(fèi)者模式中,生產(chǎn)者和消費(fèi)者沒(méi)有任何依賴(lài)關(guān)系,它們彼此之間的通信只能通過(guò)任務(wù)隊(duì)列,所以生產(chǎn)者 - 消費(fèi)者模式是一個(gè)不錯(cuò)的解耦方案。
除了架構(gòu)設(shè)計(jì)上的優(yōu)點(diǎn)之外,生產(chǎn)者 - 消費(fèi)者模式還有一個(gè)重要的優(yōu)點(diǎn)就是支持異步,并且能夠平衡生產(chǎn)者和消費(fèi)者的速度差異。在生產(chǎn)者 - 消費(fèi)者模式中,生產(chǎn)者線程只需要將任務(wù)添加到任務(wù)隊(duì)列而無(wú)需等待任務(wù)被消費(fèi)者線程執(zhí)行完,也就是說(shuō)任務(wù)的生產(chǎn)和消費(fèi)是異步的,這是與傳統(tǒng)的方法之間調(diào)用的本質(zhì)區(qū)別,傳統(tǒng)的方法之間調(diào)用是同步的。
你或許會(huì)有這樣的疑問(wèn),異步化處理最簡(jiǎn)單的方式就是創(chuàng)建一個(gè)新的線程去處理,那中間增加一個(gè)“任務(wù)隊(duì)列”究竟有什么用呢?我覺(jué)得主要還是用于平衡生產(chǎn)者和消費(fèi)者的速度差異。我們假設(shè)生產(chǎn)者的速率很慢,而消費(fèi)者的速率很高,比如是 1:3,如果生產(chǎn)者有 3 個(gè)線程,采用創(chuàng)建新的線程的方式,那么會(huì)創(chuàng)建 3 個(gè)子線程,而采用生產(chǎn)者 - 消費(fèi)者模式,消費(fèi)線程只需要 1 個(gè)就可以了。Java 語(yǔ)言里,Java 線程和操作系統(tǒng)線程是一一對(duì)應(yīng)的,線程創(chuàng)建得太多,會(huì)增加上下文切換的成本,所以 Java 線程不是越多越好,適量即可。而生產(chǎn)者 - 消費(fèi)者模式恰好能支持你用適量的線程。
(2)支持批量執(zhí)行以提升性能
輕量級(jí)的線程,如果使用輕量級(jí)線程,就沒(méi)有必要平衡生產(chǎn)者和消費(fèi)者的速度差異了,因?yàn)檩p量級(jí)線程本身就是廉價(jià)的,那是否意味著生產(chǎn)者 - 消費(fèi)者模式在性能優(yōu)化方面就無(wú)用武之地了呢?當(dāng)然不是,有一類(lèi)并發(fā)場(chǎng)景應(yīng)用生產(chǎn)者 - 消費(fèi)者模式就有奇效,那就是批量執(zhí)行任務(wù)。
例如,我們要在數(shù)據(jù)庫(kù)里 INSERT 1000 條數(shù)據(jù),有兩種方案:第一種方案是用 1000 個(gè)線程并發(fā)執(zhí)行,每個(gè)線程 INSERT 一條數(shù)據(jù);第二種方案是用 1 個(gè)線程,執(zhí)行一個(gè)批量的 SQL,一次性把 1000 條數(shù)據(jù) INSERT 進(jìn)去。這兩種方案,顯然是第二種方案效率更高,其實(shí)這樣的應(yīng)用場(chǎng)景就是我們上面提到的批量執(zhí)行場(chǎng)景。
一個(gè)監(jiān)控系統(tǒng)動(dòng)態(tài)采集的案例,其實(shí)最終回傳的監(jiān)控?cái)?shù)據(jù)還是要存入數(shù)據(jù)庫(kù)的(如下圖)。但被監(jiān)控系統(tǒng)往往有很多,如果每一條回傳數(shù)據(jù)都直接 INSERT 到數(shù)據(jù)庫(kù),那么這個(gè)方案就是上面提到的第一種方案:每個(gè)線程 INSERT 一條數(shù)據(jù)。很顯然,更好的方案是批量執(zhí)行 SQL,那如何實(shí)現(xiàn)呢?這就要用到生產(chǎn)者 - 消費(fèi)者模式了。
利用生產(chǎn)者 - 消費(fèi)者模式實(shí)現(xiàn)批量執(zhí)行 SQL 非常簡(jiǎn)單:將原來(lái)直接 INSERT 數(shù)據(jù)到數(shù)據(jù)庫(kù)的線程作為生產(chǎn)者線程,生產(chǎn)者線程只需將數(shù)據(jù)添加到任務(wù)隊(duì)列,然后消費(fèi)者線程負(fù)責(zé)將任務(wù)從任務(wù)隊(duì)列中批量取出并批量執(zhí)行。
在下面的示例代碼中,我們創(chuàng)建了 5 個(gè)消費(fèi)者線程負(fù)責(zé)批量執(zhí)行 SQL,這 5 個(gè)消費(fèi)者線程以 while(true){} 循環(huán)方式批量地獲取任務(wù)并批量地執(zhí)行。需要注意的是,從任務(wù)隊(duì)列中獲取批量任務(wù)的方法 pollTasks() 中,首先是以阻塞方式獲取任務(wù)隊(duì)列中的一條任務(wù),而后則是以非阻塞的方式獲取任務(wù);之所以首先采用阻塞方式,是因?yàn)槿绻蝿?wù)隊(duì)列中沒(méi)有任務(wù),這樣的方式能夠避免無(wú)謂的循環(huán)。
//任務(wù)隊(duì)列 BlockingQueue<Task> bq=newLinkedBlockingQueue<>(2000); //啟動(dòng)5個(gè)消費(fèi)者線程 //執(zhí)行批量任務(wù) void start() {ExecutorService es=executors.newFixedThreadPool(5);for (int i=0; i<5; i++) {es.execute(()->{try {while (true) {//獲取批量任務(wù)List<Task> ts=pollTasks();//執(zhí)行批量任務(wù)execTasks(ts);}} catch (Exception e) {e.printStackTrace();}});} } //從任務(wù)隊(duì)列中獲取批量任務(wù) List<Task> pollTasks() throws InterruptedException{List<Task> ts=new LinkedList<>();//阻塞式獲取一條任務(wù)Task t = bq.take();while (t != null) {ts.add(t);//非阻塞式獲取一條任務(wù)t = bq.poll();}return ts; } //批量執(zhí)行任務(wù) execTasks(List<Task> ts) {//省略具體代碼無(wú)數(shù) }(3)支持分階段提交以提升性能
利用生產(chǎn)者 - 消費(fèi)者模式還可以輕松地支持一種分階段提交的應(yīng)用場(chǎng)景。我們知道寫(xiě)文件如果同步刷盤(pán)性能會(huì)很慢,所以對(duì)于不是很重要的數(shù)據(jù),我們往往采用異步刷盤(pán)的方式。我曾經(jīng)參與過(guò)一個(gè)項(xiàng)目,其中的日志組件是自己實(shí)現(xiàn)的,采用的就是異步刷盤(pán)方式,刷盤(pán)的時(shí)機(jī)是:
① ERROR 級(jí)別的日志需要立即刷盤(pán);
② 數(shù)據(jù)積累到 500 條需要立即刷盤(pán);
③ 存在未刷盤(pán)數(shù)據(jù),且 5 秒鐘內(nèi)未曾刷盤(pán),需要立即刷盤(pán)。
這個(gè)日志組件的異步刷盤(pán)操作本質(zhì)上其實(shí)就是一種分階段提交。下面我們具體看看用生產(chǎn)者 - 消費(fèi)者模式如何實(shí)現(xiàn)。在下面的示例代碼中,可以通過(guò)調(diào)用 info()和error() 方法寫(xiě)入日志,這兩個(gè)方法都是創(chuàng)建了一個(gè)日志任務(wù) LogMsg,并添加到阻塞隊(duì)列中,調(diào)用 info()和error() 方法的線程是生產(chǎn)者;而真正將日志寫(xiě)入文件的是消費(fèi)者線程,在 Logger 這個(gè)類(lèi)中,我們只創(chuàng)建了 1 個(gè)消費(fèi)者線程,在這個(gè)消費(fèi)者線程中,會(huì)根據(jù)刷盤(pán)規(guī)則執(zhí)行刷盤(pán)操作,邏輯很簡(jiǎn)單,這里就不贅述了。
class Logger {//任務(wù)隊(duì)列 final BlockingQueue<LogMsg> bq= new BlockingQueue<>();//flush批量 static final int batchSize=500;//只需要一個(gè)線程寫(xiě)日志ExecutorService es = Executors.newFixedThreadPool(1);//啟動(dòng)寫(xiě)日志線程void start(){File file=File.createTempFile("foo", ".log");final FileWriter writer=new FileWriter(file);this.es.execute(()->{try {//未刷盤(pán)日志數(shù)量int curIdx = 0;long preFT=System.currentTimeMillis();while (true) {LogMsg log = bq.poll(5, TimeUnit.SECONDS);//寫(xiě)日志if (log != null) {writer.write(log.toString());++curIdx;}//如果不存在未刷盤(pán)數(shù)據(jù),則無(wú)需刷盤(pán)if (curIdx <= 0) {continue;}//根據(jù)規(guī)則刷盤(pán)if (log!=null && log.level==LEVEL.ERROR ||curIdx == batchSize ||System.currentTimeMillis()-preFT>5000){writer.flush();curIdx = 0;preFT=System.currentTimeMillis();}}}catch(Exception e){e.printStackTrace();} finally {try {writer.flush();writer.close();}catch(IOException e){e.printStackTrace();}}}); }//寫(xiě)INFO級(jí)別日志void info(String msg) {bq.put(new LogMsg(LEVEL.INFO, msg));}//寫(xiě)ERROR級(jí)別日志void error(String msg) {bq.put(new LogMsg(LEVEL.ERROR, msg));} } //日志級(jí)別 enum LEVEL {INFO, ERROR } class LogMsg {LEVEL level;String msg;//省略構(gòu)造函數(shù)實(shí)現(xiàn)LogMsg(LEVEL lvl, String msg){}//省略toString()實(shí)現(xiàn)String toString(){} }Java 語(yǔ)言提供的線程池本身就是一種生產(chǎn)者 - 消費(fèi)者模式的實(shí)現(xiàn),但是線程池中的線程每次只能從任務(wù)隊(duì)列中消費(fèi)一個(gè)任務(wù)來(lái)執(zhí)行,對(duì)于大部分并發(fā)場(chǎng)景這種策略都沒(méi)有問(wèn)題。但是有些場(chǎng)景還是需要自己來(lái)實(shí)現(xiàn),例如需要批量執(zhí)行以及分階段提交的場(chǎng)景。
生產(chǎn)者 - 消費(fèi)者模式在分布式計(jì)算中的應(yīng)用也非常廣泛。在分布式場(chǎng)景下,你可以借助分布式消息隊(duì)列(MQ)來(lái)實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者模式。MQ 一般都會(huì)支持兩種消息模型,一種是點(diǎn)對(duì)點(diǎn)模型,一種是發(fā)布訂閱模型。這兩種模型的區(qū)別在于,點(diǎn)對(duì)點(diǎn)模型里一個(gè)消息只會(huì)被一個(gè)消費(fèi)者消費(fèi),和 Java 的線程池非常類(lèi)似(Java 線程池的任務(wù)也只會(huì)被一個(gè)線程執(zhí)行);而發(fā)布訂閱模型里一個(gè)消息會(huì)被多個(gè)消費(fèi)者消費(fèi),本質(zhì)上是一種消息的廣播,在多線程編程領(lǐng)域,你可以結(jié)合觀察者模式實(shí)現(xiàn)廣播功能。
10、避免共享的設(shè)計(jì)模式
Immutability 模式、Copy-on-Write 模式和線程本地存儲(chǔ)模式本質(zhì)上都是為了避免共享,只是實(shí)現(xiàn)手段不同而已。這 3 種設(shè)計(jì)模式的實(shí)現(xiàn)都很簡(jiǎn)單,但是實(shí)現(xiàn)過(guò)程中有些細(xì)節(jié)還是需要格外注意的。例如,使用 Immutability 模式需要注意對(duì)象屬性的不可變性,使用 Copy-on-Write 模式需要注意性能問(wèn)題,使用線程本地存儲(chǔ)模式需要注意異步執(zhí)行問(wèn)題。所以,每篇文章最后我設(shè)置的課后思考題的目的就是提醒你注意這些細(xì)節(jié)。
Account 這個(gè)類(lèi)是不是具備不可變性。這個(gè)類(lèi)初看上去屬于不可變對(duì)象的中規(guī)中矩實(shí)現(xiàn),而實(shí)質(zhì)上這個(gè)實(shí)現(xiàn)是有問(wèn)題的,原因在于 StringBuffer 不同于 String,StringBuffer 不具備不可變性,通過(guò) getUser() 方法獲取 user 之后,是可以修改 user 的。一個(gè)簡(jiǎn)單的解決方案是讓 getUser() 方法返回 String 對(duì)象。
public final class Account{private final StringBuffer user;public Account(String user){this.user = new StringBuffer(user);}//返回的StringBuffer并不具備不可變性public StringBuffer getUser(){return this.user;}public String toString(){return "user"+user;} }Java SDK 中為什么沒(méi)有提供 CopyOnWriteLinkedList。這是一個(gè)開(kāi)放性的問(wèn)題,沒(méi)有標(biāo)準(zhǔn)答案,但是性能問(wèn)題一定是其中一個(gè)很重要的原因,畢竟完整地復(fù)制 LinkedList 性能開(kāi)銷(xiāo)太大了。
在異步場(chǎng)景中,是否可以使用 Spring 的事務(wù)管理器。答案顯然是不能的,Spring 使用 ThreadLocal 來(lái)傳遞事務(wù)信息,因此這個(gè)事務(wù)信息是不能跨線程共享的。實(shí)際工作中有很多類(lèi)庫(kù)都是用 ThreadLocal 傳遞上下文信息的,這種場(chǎng)景下如果有異步操作,一定要注意上下文信息是不能跨線程共享的。
11、多線程版本 IF 的設(shè)計(jì)模式
Guarded Suspension 模式和 Balking 模式都可以簡(jiǎn)單地理解為“多線程版本的 if”,但它們的區(qū)別在于前者會(huì)等待 if 條件變?yōu)檎?#xff0c;而后者則不需要等待。
Guarded Suspension 模式的經(jīng)典實(shí)現(xiàn)是使用管程,很多初學(xué)者會(huì)簡(jiǎn)單地用線程 sleep 的方式實(shí)現(xiàn),比如用線程 sleep 方式實(shí)現(xiàn)的。但不推薦你使用這種方式,最重要的原因是性能,如果 sleep 的時(shí)間太長(zhǎng),會(huì)影響響應(yīng)時(shí)間;sleep 的時(shí)間太短,會(huì)導(dǎo)致線程頻繁地被喚醒,消耗系統(tǒng)資源。
同時(shí),示例代碼的實(shí)現(xiàn)也有問(wèn)題:由于 obj 不是 volatile 變量,所以即便 obj 被設(shè)置了正確的值,執(zhí)行 while(!p.test(obj)) 的線程也有可能看不到,從而導(dǎo)致更長(zhǎng)時(shí)間的 sleep。
//獲取受保護(hù)對(duì)象 T get(Predicate<T> p) {try {//obj的可見(jiàn)性無(wú)法保證while(!p.test(obj)){TimeUnit.SECONDS.sleep(timeout);}}catch(InterruptedException e){throw new RuntimeException(e);}//返回非空的受保護(hù)對(duì)象return obj; } //事件通知方法 void onChanged(T obj) {this.obj = obj; }實(shí)現(xiàn) Balking 模式最容易忽視的就是競(jìng)態(tài)條件問(wèn)題。比如,存在競(jìng)態(tài)條件問(wèn)題。因此,在多線程場(chǎng)景中使用 if 語(yǔ)句時(shí),一定要多問(wèn)自己一遍:是否存在競(jìng)態(tài)條件。
class Test{volatile boolean inited = false;int count = 0;void init(){//存在競(jìng)態(tài)條件if(inited){return;}//有可能多個(gè)線程執(zhí)行到這里inited = true;//計(jì)算count的值count = calc();} }12、三種最簡(jiǎn)單的分工模式
Thread-Per-Message 模式、Worker Thread 模式和生產(chǎn)者 - 消費(fèi)者模式是三種最簡(jiǎn)單實(shí)用的多線程分工方法。雖說(shuō)簡(jiǎn)單,但也還是有許多細(xì)節(jié)需要你多加小心和注意。
Thread-Per-Message 模式在實(shí)現(xiàn)的時(shí)候需要注意是否存在線程的頻繁創(chuàng)建、銷(xiāo)毀以及是否可能導(dǎo)致 OOM。關(guān)于如何快速解決 OOM 問(wèn)題的。在高并發(fā)場(chǎng)景中,最簡(jiǎn)單的辦法其實(shí)是限流。當(dāng)然,限流方案也并不局限于解決 Thread-Per-Message 模式中的 OOM 問(wèn)題。
Worker Thread 模式的實(shí)現(xiàn),需要注意潛在的線程死鎖問(wèn)題。示例代碼就存在線程死鎖。描述得很貼切和形象:“工廠里只有一個(gè)工人,他的工作就是同步地等待工廠里其他人給他提供東西,然而并沒(méi)有其他人,他將等到天荒地老,海枯石爛!”因此,共享線程池雖然能夠提供線程池的使用效率,但一定要保證一個(gè)前提,那就是:任務(wù)之間沒(méi)有依賴(lài)關(guān)系。
ExecutorService pool = Executors.newSingleThreadExecutor(); //提交主任務(wù) pool.submit(() -> {try {//提交子任務(wù)并等待其完成,//會(huì)導(dǎo)致線程死鎖String qq=pool.submit(()->"QQ").get();System.out.println(qq);} catch (Exception e) {} });Java 線程池本身就是一種生產(chǎn)者 - 消費(fèi)者模式的實(shí)現(xiàn),所以大部分場(chǎng)景你都不需要自己實(shí)現(xiàn),直接使用 Java 的線程池就可以了。但若能自己靈活地實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者模式會(huì)更好,比如可以實(shí)現(xiàn)批量執(zhí)行和分階段提交,不過(guò)這過(guò)程中還需要注意如何優(yōu)雅地終止線程。
如何優(yōu)雅地終止線程?兩階段終止模式是一種通用的解決方案。但其實(shí)終止生產(chǎn)者 - 消費(fèi)者服務(wù)還有一種更簡(jiǎn)單的方案,叫做“毒丸”對(duì)象。“毒丸”對(duì)象有過(guò)詳細(xì)的介紹。簡(jiǎn)單來(lái)講,“毒丸”對(duì)象是生產(chǎn)者生產(chǎn)的一條特殊任務(wù),然后當(dāng)消費(fèi)者線程讀到“毒丸”對(duì)象時(shí),會(huì)立即終止自身的執(zhí)行。
下面是用“毒丸”對(duì)象終止寫(xiě)日志線程的具體實(shí)現(xiàn),整體的實(shí)現(xiàn)過(guò)程還是很簡(jiǎn)單的:類(lèi) Logger 中聲明了一個(gè)“毒丸”對(duì)象 poisonPill ,當(dāng)消費(fèi)者線程從阻塞隊(duì)列 bq 中取出一條 LogMsg 后,先判斷是否是“毒丸”對(duì)象,如果是,則 break while 循環(huán),從而終止自己的執(zhí)行。
class Logger {//用于終止日志執(zhí)行的“毒丸”final LogMsg poisonPill = new LogMsg(LEVEL.ERROR, "");//任務(wù)隊(duì)列 final BlockingQueue<LogMsg> bq= new BlockingQueue<>();//只需要一個(gè)線程寫(xiě)日志ExecutorService es = Executors.newFixedThreadPool(1);//啟動(dòng)寫(xiě)日志線程void start(){File file=File.createTempFile("foo", ".log");final FileWriter writer=new FileWriter(file);this.es.execute(()->{try {while (true) {LogMsg log = bq.poll(5, TimeUnit.SECONDS);//如果是“毒丸”,終止執(zhí)行 if(poisonPill.equals(logMsg)){break;} //省略執(zhí)行邏輯}} catch(Exception e){} finally {try {writer.flush();writer.close();}catch(IOException e){}}}); }//終止寫(xiě)日志線程public void stop() {//將“毒丸”對(duì)象加入阻塞隊(duì)列bq.add(poisonPill);es.shutdown();} }13、高性能限流器Guava RateLimiter
首先我們來(lái)看看 Guava RateLimiter 是如何解決高并發(fā)場(chǎng)景下的限流問(wèn)題的。Guava 是 Google 開(kāi)源的 Java 類(lèi)庫(kù),提供了一個(gè)工具類(lèi) RateLimiter。我們先來(lái)看看 RateLimiter 的使用,讓你對(duì)限流有個(gè)感官的印象。假設(shè)我們有一個(gè)線程池,它每秒只能處理兩個(gè)任務(wù),如果提交的任務(wù)過(guò)快,可能導(dǎo)致系統(tǒng)不穩(wěn)定,這個(gè)時(shí)候就需要用到限流。
在下面的示例代碼中,我們創(chuàng)建了一個(gè)流速為 2 個(gè)請(qǐng)求 / 秒的限流器,這里的流速該怎么理解呢?直觀地看,2 個(gè)請(qǐng)求 / 秒指的是每秒最多允許 2 個(gè)請(qǐng)求通過(guò)限流器,其實(shí)在 Guava 中,流速還有更深一層的意思:是一種勻速的概念,2 個(gè)請(qǐng)求 / 秒等價(jià)于 1 個(gè)請(qǐng)求 /500 毫秒。
在向線程池提交任務(wù)之前,調(diào)用 acquire() 方法就能起到限流的作用。通過(guò)示例代碼的執(zhí)行結(jié)果,任務(wù)提交到線程池的時(shí)間間隔基本上穩(wěn)定在 500 毫秒。
//限流器流速:2個(gè)請(qǐng)求/秒 RateLimiter limiter = RateLimiter.create(2.0); //執(zhí)行任務(wù)的線程池 ExecutorService es = Executors.newFixedThreadPool(1); //記錄上一次執(zhí)行時(shí)間 prev = System.nanoTime(); //測(cè)試執(zhí)行20次 for (int i=0; i<20; i++){//限流器限流limiter.acquire();//提交任務(wù)異步執(zhí)行es.execute(()->{long cur=System.nanoTime();//打印時(shí)間間隔:毫秒System.out.println((cur-prev)/1000_000);prev = cur;}); }輸出結(jié)果: ... 500 499 499 500 499(1)經(jīng)典限流算法:令牌桶算法
Guava 的限流器使用上還是很簡(jiǎn)單的,那它是如何實(shí)現(xiàn)的呢?Guava 采用的是令牌桶算法,其核心是要想通過(guò)限流器,必須拿到令牌。也就是說(shuō),只要我們能夠限制發(fā)放令牌的速率,那么就能控制流速了。令牌桶算法的詳細(xì)描述如下:
① 令牌以固定的速率添加到令牌桶中,假設(shè)限流的速率是 r/ 秒,則令牌每 1/r 秒會(huì)添加一個(gè);
② 假設(shè)令牌桶的容量是 b ,如果令牌桶已滿(mǎn),則新的令牌會(huì)被丟棄;
③ 請(qǐng)求能夠通過(guò)限流器的前提是令牌桶中有令牌。
這個(gè)算法中,限流的速率 r 還是比較容易理解的,但令牌桶的容量 b 該怎么理解呢?b 其實(shí)是 burst 的簡(jiǎn)寫(xiě),意義是限流器允許的最大突發(fā)流量。比如 b=10,而且令牌桶中的令牌已滿(mǎn),此時(shí)限流器允許 10 個(gè)請(qǐng)求同時(shí)通過(guò)限流器,當(dāng)然只是突發(fā)流量而已,這 10 個(gè)請(qǐng)求會(huì)帶走 10 個(gè)令牌,所以后續(xù)的流量只能按照速率 r 通過(guò)限流器。
令牌桶這個(gè)算法,如何用 Java 實(shí)現(xiàn)呢?很可能你的直覺(jué)會(huì)告訴你生產(chǎn)者 - 消費(fèi)者模式:一個(gè)生產(chǎn)者線程定時(shí)向阻塞隊(duì)列中添加令牌,而試圖通過(guò)限流器的線程則作為消費(fèi)者線程,只有從阻塞隊(duì)列中獲取到令牌,才允許通過(guò)限流器。
這個(gè)算法看上去非常完美,而且實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單,如果并發(fā)量不大,這個(gè)實(shí)現(xiàn)并沒(méi)有什么問(wèn)題。可實(shí)際情況卻是使用限流的場(chǎng)景大部分都是高并發(fā)場(chǎng)景,而且系統(tǒng)壓力已經(jīng)臨近極限了,此時(shí)這個(gè)實(shí)現(xiàn)就有問(wèn)題了。問(wèn)題就出在定時(shí)器上,在高并發(fā)場(chǎng)景下,當(dāng)系統(tǒng)壓力已經(jīng)臨近極限的時(shí)候,定時(shí)器的精度誤差會(huì)非常大,同時(shí)定時(shí)器本身會(huì)創(chuàng)建調(diào)度線程,也會(huì)對(duì)系統(tǒng)的性能產(chǎn)生影響。
那還有什么好的實(shí)現(xiàn)方式呢?當(dāng)然有,Guava 的實(shí)現(xiàn)就沒(méi)有使用定時(shí)器,下面我們就來(lái)看看它是如何實(shí)現(xiàn)的。
(2)Guava 如何實(shí)現(xiàn)令牌桶算法
Guava 實(shí)現(xiàn)令牌桶算法,用了一個(gè)很簡(jiǎn)單的辦法,其關(guān)鍵是記錄并動(dòng)態(tài)計(jì)算下一令牌發(fā)放的時(shí)間。下面我們以一個(gè)最簡(jiǎn)單的場(chǎng)景來(lái)介紹該算法的執(zhí)行過(guò)程。假設(shè)令牌桶的容量為 b=1,限流速率 r = 1 個(gè)請(qǐng)求 / 秒,如下圖所示,如果當(dāng)前令牌桶中沒(méi)有令牌,下一個(gè)令牌的發(fā)放時(shí)間是在第 3 秒,而在第 2 秒的時(shí)候有一個(gè)線程 T1 請(qǐng)求令牌,此時(shí)該如何處理呢?
對(duì)于這個(gè)請(qǐng)求令牌的線程而言,很顯然需要等待 1 秒,因?yàn)?1 秒以后(第 3 秒)它就能拿到令牌了。此時(shí)需要注意的是,下一個(gè)令牌發(fā)放的時(shí)間也要增加 1 秒,為什么呢?因?yàn)榈?3 秒發(fā)放的令牌已經(jīng)被線程 T1 預(yù)占了。處理之后如下圖所示。
假設(shè) T1 在預(yù)占了第 3 秒的令牌之后,馬上又有一個(gè)線程 T2 請(qǐng)求令牌,如下圖所示。
很顯然,由于下一個(gè)令牌產(chǎn)生的時(shí)間是第 4 秒,所以線程 T2 要等待兩秒的時(shí)間,才能獲取到令牌,同時(shí)由于 T2 預(yù)占了第 4 秒的令牌,所以下一令牌產(chǎn)生時(shí)間還要增加 1 秒,完全處理之后,如下圖所示。
上面線程 T1、T2 都是在下一令牌產(chǎn)生時(shí)間之前請(qǐng)求令牌,如果線程在下一令牌產(chǎn)生時(shí)間之后請(qǐng)求令牌會(huì)如何呢?假設(shè)在線程 T1 請(qǐng)求令牌之后的 5 秒,也就是第 7 秒,線程 T3 請(qǐng)求令牌,如下圖所示。
由于在第 5 秒已經(jīng)產(chǎn)生了一個(gè)令牌,所以此時(shí)線程 T3 可以直接拿到令牌,而無(wú)需等待。在第 7 秒,實(shí)際上限流器能夠產(chǎn)生 3 個(gè)令牌,第 5、6、7 秒各產(chǎn)生一個(gè)令牌。由于我們假設(shè)令牌桶的容量是 1,所以第 6、7 秒產(chǎn)生的令牌就丟棄了,其實(shí)等價(jià)地你也可以認(rèn)為是保留的第 7 秒的令牌,丟棄的第 5、6 秒的令牌,也就是說(shuō)第 7 秒的令牌被線程 T3 占有了,于是下一令牌的的產(chǎn)生時(shí)間應(yīng)該是第 8 秒,如下圖所示。
通過(guò)上面簡(jiǎn)要地分析,你會(huì)發(fā)現(xiàn),我們只需要記錄一個(gè)下一令牌產(chǎn)生的時(shí)間,并動(dòng)態(tài)更新它,就能夠輕松完成限流功能。我們可以將上面的這個(gè)算法代碼化,示例代碼如下所示,依然假設(shè)令牌桶的容量是 1。關(guān)鍵是 reserve() 方法,這個(gè)方法會(huì)為請(qǐng)求令牌的線程預(yù)分配令牌,同時(shí)返回該線程能夠獲取令牌的時(shí)間。其實(shí)現(xiàn)邏輯就是上面提到的:如果線程請(qǐng)求令牌的時(shí)間在下一令牌產(chǎn)生時(shí)間之后,那么該線程立刻就能夠獲取令牌;反之,如果請(qǐng)求時(shí)間在下一令牌產(chǎn)生時(shí)間之前,那么該線程是在下一令牌產(chǎn)生的時(shí)間獲取令牌。由于此時(shí)下一令牌已經(jīng)被該線程預(yù)占,所以下一令牌產(chǎn)生的時(shí)間需要加上 1 秒。
class SimpleLimiter {//下一令牌產(chǎn)生時(shí)間long next = System.nanoTime();//發(fā)放令牌間隔:納秒long interval = 1000_000_000;//預(yù)占令牌,返回能夠獲取令牌的時(shí)間synchronized long reserve(long now){//請(qǐng)求時(shí)間在下一令牌產(chǎn)生時(shí)間之后//重新計(jì)算下一令牌產(chǎn)生時(shí)間if (now > next){//將下一令牌產(chǎn)生時(shí)間重置為當(dāng)前時(shí)間next = now;}//能夠獲取令牌的時(shí)間long at=next;//設(shè)置下一令牌產(chǎn)生時(shí)間next += interval;//返回線程需要等待的時(shí)間return Math.max(at, 0L);}//申請(qǐng)令牌void acquire() {//申請(qǐng)令牌時(shí)的時(shí)間long now = System.nanoTime();//預(yù)占令牌long at=reserve(now);long waitTime=max(at-now, 0);//按照條件等待if(waitTime > 0) {try {TimeUnit.NANOSECONDS.sleep(waitTime);}catch(InterruptedException e){e.printStackTrace();}}} }如果令牌桶的容量大于 1,又該如何處理呢?按照令牌桶算法,令牌要首先從令牌桶中出,所以我們需要按需計(jì)算令牌桶中的數(shù)量,當(dāng)有線程請(qǐng)求令牌時(shí),先從令牌桶中出。具體的代碼實(shí)現(xiàn)如下所示。我們?cè)黾恿艘粋€(gè) resync() 方法,在這個(gè)方法中,如果線程請(qǐng)求令牌的時(shí)間在下一令牌產(chǎn)生時(shí)間之后,會(huì)重新計(jì)算令牌桶中的令牌數(shù),新產(chǎn)生的令牌的計(jì)算公式是:(now-next)/interval,你可對(duì)照上面的示意圖來(lái)理解。reserve() 方法中,則增加了先從令牌桶中出令牌的邏輯,不過(guò)需要注意的是,如果令牌是從令牌桶中出的,那么 next 就無(wú)需增加一個(gè) interval 了。
class SimpleLimiter {//當(dāng)前令牌桶中的令牌數(shù)量long storedPermits = 0;//令牌桶的容量long maxPermits = 3;//下一令牌產(chǎn)生時(shí)間long next = System.nanoTime();//發(fā)放令牌間隔:納秒long interval = 1000_000_000;//請(qǐng)求時(shí)間在下一令牌產(chǎn)生時(shí)間之后,則// 1.重新計(jì)算令牌桶中的令牌數(shù)// 2.將下一個(gè)令牌發(fā)放時(shí)間重置為當(dāng)前時(shí)間void resync(long now) {if (now > next) {//新產(chǎn)生的令牌數(shù)long newPermits=(now-next)/interval;//新令牌增加到令牌桶storedPermits=min(maxPermits, storedPermits + newPermits);//將下一個(gè)令牌發(fā)放時(shí)間重置為當(dāng)前時(shí)間next = now;}}//預(yù)占令牌,返回能夠獲取令牌的時(shí)間synchronized long reserve(long now){resync(now);//能夠獲取令牌的時(shí)間long at = next;//令牌桶中能提供的令牌long fb=min(1, storedPermits);//令牌凈需求:首先減掉令牌桶中的令牌long nr = 1 - fb;//重新計(jì)算下一令牌產(chǎn)生時(shí)間next = next + nr*interval;//重新計(jì)算令牌桶中的令牌this.storedPermits -= fb;return at;}//申請(qǐng)令牌void acquire() {//申請(qǐng)令牌時(shí)的時(shí)間long now = System.nanoTime();//預(yù)占令牌long at=reserve(now);long waitTime=max(at-now, 0);//按照條件等待if(waitTime > 0) {try {TimeUnit.NANOSECONDS.sleep(waitTime);}catch(InterruptedException e){e.printStackTrace();}}} }經(jīng)典的限流算法有兩個(gè),一個(gè)是令牌桶算法(Token Bucket),另一個(gè)是漏桶算法(Leaky Bucket)。令牌桶算法是定時(shí)向令牌桶發(fā)送令牌,請(qǐng)求能夠從令牌桶中拿到令牌,然后才能通過(guò)限流器;而漏桶算法里,請(qǐng)求就像水一樣注入漏桶,漏桶會(huì)按照一定的速率自動(dòng)將水漏掉,只有漏桶里還能注入水的時(shí)候,請(qǐng)求才能通過(guò)限流器。令牌桶算法和漏桶算法很像一個(gè)硬幣的正反面,所以你可以參考令牌桶算法的實(shí)現(xiàn)來(lái)實(shí)現(xiàn)漏桶算法。
上面我們介紹了 Guava 是如何實(shí)現(xiàn)令牌桶算法的,我們的示例代碼是對(duì) Guava RateLimiter 的簡(jiǎn)化,Guava RateLimiter 擴(kuò)展了標(biāo)準(zhǔn)的令牌桶算法,比如還能支持預(yù)熱功能。對(duì)于按需加載的緩存來(lái)說(shuō),預(yù)熱后緩存能支持 5 萬(wàn) TPS 的并發(fā),但是在預(yù)熱前 5 萬(wàn) TPS 的并發(fā)直接就把緩存擊垮了,所以如果需要給該緩存限流,限流器也需要支持預(yù)熱功能,在初始階段,限制的流速 r 很小,但是動(dòng)態(tài)增長(zhǎng)的。預(yù)熱功能的實(shí)現(xiàn)非常復(fù)雜,Guava 構(gòu)建了一個(gè)積分函數(shù)來(lái)解決這個(gè)問(wèn)題,如果你感興趣,可以繼續(xù)深入研究。
14、高性能網(wǎng)絡(luò)應(yīng)用框架Netty
Netty 是一個(gè)高性能網(wǎng)絡(luò)應(yīng)用框架,應(yīng)用非常普遍,目前在 Java 領(lǐng)域里,Netty 基本上成為網(wǎng)絡(luò)程序的標(biāo)配了。Netty 框架功能豐富,也非常復(fù)雜,今天我們主要分析 Netty 框架中的線程模型,而線程模型直接影響著網(wǎng)絡(luò)程序的性能。
在介紹 Netty 的線程模型之前,我們首先需要把問(wèn)題搞清楚,了解網(wǎng)絡(luò)編程性能的瓶頸在哪里,然后再看 Netty 的線程模型是如何解決這個(gè)問(wèn)題的。
(1)網(wǎng)絡(luò)編程性能的瓶頸
一個(gè)簡(jiǎn)單的網(wǎng)絡(luò)程序 echo,采用的是阻塞式 I/O(BIO)。BIO 模型里,所有 read() 操作和 write() 操作都會(huì)阻塞當(dāng)前線程的,如果客戶(hù)端已經(jīng)和服務(wù)端建立了一個(gè)連接,而遲遲不發(fā)送數(shù)據(jù),那么服務(wù)端的 read() 操作會(huì)一直阻塞,所以使用 BIO 模型,一般都會(huì)為每個(gè) socket 分配一個(gè)獨(dú)立的線程,這樣就不會(huì)因?yàn)榫€程阻塞在一個(gè) socket 上而影響對(duì)其他 socket 的讀寫(xiě)。BIO 的線程模型如下圖所示,每一個(gè) socket 都對(duì)應(yīng)一個(gè)獨(dú)立的線程;為了避免頻繁創(chuàng)建、消耗線程,可以采用線程池,但是 socket 和線程之間的對(duì)應(yīng)關(guān)系并不會(huì)變化。
BIO 這種線程模型適用于 socket 連接不是很多的場(chǎng)景;但是現(xiàn)在的互聯(lián)網(wǎng)場(chǎng)景,往往需要服務(wù)器能夠支撐十萬(wàn)甚至百萬(wàn)連接,而創(chuàng)建十萬(wàn)甚至上百萬(wàn)個(gè)線程顯然并不現(xiàn)實(shí),所以 BIO 線程模型無(wú)法解決百萬(wàn)連接的問(wèn)題。如果仔細(xì)觀察,你會(huì)發(fā)現(xiàn)互聯(lián)網(wǎng)場(chǎng)景中,雖然連接多,但是每個(gè)連接上的請(qǐng)求并不頻繁,所以線程大部分時(shí)間都在等待 I/O 就緒。也就是說(shuō)線程大部分時(shí)間都阻塞在那里,這完全是浪費(fèi),如果我們能夠解決這個(gè)問(wèn)題,那就不需要這么多線程了。
順著這個(gè)思路,我們可以將線程模型優(yōu)化為下圖這個(gè)樣子,可以用一個(gè)線程來(lái)處理多個(gè)連接,這樣線程的利用率就上來(lái)了,同時(shí)所需的線程數(shù)量也跟著降下來(lái)了。這個(gè)思路很好,可是使用 BIO 相關(guān)的 API 是無(wú)法實(shí)現(xiàn)的,這是為什么呢?因?yàn)?BIO 相關(guān)的 socket 讀寫(xiě)操作都是阻塞式的,而一旦調(diào)用了阻塞式 API,在 I/O 就緒前,調(diào)用線程會(huì)一直阻塞,也就無(wú)法處理其他的 socket 連接了。
好在 Java 里還提供了非阻塞式(NIO)API,利用非阻塞式 API 就能夠?qū)崿F(xiàn)一個(gè)線程處理多個(gè)連接了。那具體如何實(shí)現(xiàn)呢?現(xiàn)在普遍都是采用 Reactor 模式,包括 Netty 的實(shí)現(xiàn)。所以,要想理解 Netty 的實(shí)現(xiàn),接下來(lái)我們就需要先了解一下 Reactor 模式。
(2)Reactor 模式
下面是 Reactor 模式的類(lèi)結(jié)構(gòu)圖,其中 Handle 指的是 I/O 句柄,在 Java 網(wǎng)絡(luò)編程里,它本質(zhì)上就是一個(gè)網(wǎng)絡(luò)連接。Event Handler 很容易理解,就是一個(gè)事件處理器,其中 handle_event() 方法處理 I/O 事件,也就是每個(gè) Event Handler 處理一個(gè) I/O Handle;get_handle() 方法可以返回這個(gè) I/O 的 Handle。Synchronous Event Demultiplexer 可以理解為操作系統(tǒng)提供的 I/O 多路復(fù)用 API,例如 POSIX 標(biāo)準(zhǔn)里的 select() 以及 Linux 里面的 epoll()。
Reactor 模式的核心自然是 Reactor 這個(gè)類(lèi),其中 register_handler() 和 remove_handler() 這兩個(gè)方法可以注冊(cè)和刪除一個(gè)事件處理器;handle_events() 方式是核心,也是 Reactor 模式的發(fā)動(dòng)機(jī),這個(gè)方法的核心邏輯如下:首先通過(guò)同步事件多路選擇器提供的 select() 方法監(jiān)聽(tīng)網(wǎng)絡(luò)事件,當(dāng)有網(wǎng)絡(luò)事件就緒后,就遍歷事件處理器來(lái)處理該網(wǎng)絡(luò)事件。由于網(wǎng)絡(luò)事件是源源不斷的,所以在主程序中啟動(dòng) Reactor 模式,需要以 while(true){} 的方式調(diào)用 handle_events() 方法。
void Reactor::handle_events(){//通過(guò)同步事件多路選擇器提供的//select()方法監(jiān)聽(tīng)網(wǎng)絡(luò)事件select(handlers);//處理網(wǎng)絡(luò)事件for(h in handlers){h.handle_event();} } // 在主程序中啟動(dòng)事件循環(huán) while (true) {handle_events();(3)Netty 中的線程模型
Netty 的實(shí)現(xiàn)雖然參考了 Reactor 模式,但是并沒(méi)有完全照搬,Netty 中最核心的概念是事件循環(huán)(EventLoop),其實(shí)也就是 Reactor 模式中的 Reactor,負(fù)責(zé)監(jiān)聽(tīng)網(wǎng)絡(luò)事件并調(diào)用事件處理器進(jìn)行處理。在 4.x 版本的 Netty 中,網(wǎng)絡(luò)連接和 EventLoop 是穩(wěn)定的多對(duì) 1 關(guān)系,而 EventLoop 和 Java 線程是 1 對(duì) 1 關(guān)系,這里的穩(wěn)定指的是關(guān)系一旦確定就不再發(fā)生變化。也就是說(shuō)一個(gè)網(wǎng)絡(luò)連接只會(huì)對(duì)應(yīng)唯一的一個(gè) EventLoop,而一個(gè) EventLoop 也只會(huì)對(duì)應(yīng)到一個(gè) Java 線程,所以一個(gè)網(wǎng)絡(luò)連接只會(huì)對(duì)應(yīng)到一個(gè) Java 線程。
一個(gè)網(wǎng)絡(luò)連接對(duì)應(yīng)到一個(gè) Java 線程上,有什么好處呢?最大的好處就是對(duì)于一個(gè)網(wǎng)絡(luò)連接的事件處理是單線程的,這樣就避免了各種并發(fā)問(wèn)題。
Netty 中的線程模型可以參考下圖,這個(gè)圖和前面我們提到的理想的線程模型圖非常相似,核心目標(biāo)都是用一個(gè)線程處理多個(gè)網(wǎng)絡(luò)連接。
Netty 中還有一個(gè)核心概念是 EventLoopGroup,顧名思義,一個(gè) EventLoopGroup 由一組 EventLoop 組成。實(shí)際使用中,一般都會(huì)創(chuàng)建兩個(gè) EventLoopGroup,一個(gè)稱(chēng)為 bossGroup,一個(gè)稱(chēng)為 workerGroup。為什么會(huì)有兩個(gè) EventLoopGroup 呢?
這個(gè)和 socket 處理網(wǎng)絡(luò)請(qǐng)求的機(jī)制有關(guān),socket 處理 TCP 網(wǎng)絡(luò)連接請(qǐng)求,是在一個(gè)獨(dú)立的 socket 中,每當(dāng)有一個(gè) TCP 連接成功建立,都會(huì)創(chuàng)建一個(gè)新的 socket,之后對(duì) TCP 連接的讀寫(xiě)都是由新創(chuàng)建處理的 socket 完成的。也就是說(shuō)處理 TCP 連接請(qǐng)求和讀寫(xiě)請(qǐng)求是通過(guò)兩個(gè)不同的 socket 完成的。上面我們?cè)谟懻摼W(wǎng)絡(luò)請(qǐng)求的時(shí)候,為了簡(jiǎn)化模型,只是討論了讀寫(xiě)請(qǐng)求,而沒(méi)有討論連接請(qǐng)求。
在 Netty 中,bossGroup 就用來(lái)處理連接請(qǐng)求的,而 workerGroup 是用來(lái)處理讀寫(xiě)請(qǐng)求的。bossGroup 處理完連接請(qǐng)求后,會(huì)將這個(gè)連接提交給 workerGroup 來(lái)處理, workerGroup 里面有多個(gè) EventLoop,那新的連接會(huì)交給哪個(gè) EventLoop 來(lái)處理呢?這就需要一個(gè)負(fù)載均衡算法,Netty 中目前使用的是輪詢(xún)算法。
下面我們用 Netty 重新實(shí)現(xiàn)以下 echo 程序的服務(wù)端,近距離感受一下 Netty。
(4)用 Netty 實(shí)現(xiàn) Echo 程序服務(wù)端
下面的示例代碼基于 Netty 實(shí)現(xiàn)了 echo 程序服務(wù)端:首先創(chuàng)建了一個(gè)事件處理器(等同于 Reactor 模式中的事件處理器),然后創(chuàng)建了 bossGroup 和 workerGroup,再之后創(chuàng)建并初始化了 ServerBootstrap,代碼還是很簡(jiǎn)單的,不過(guò)有兩個(gè)地方需要注意一下。
第一個(gè),如果 NettybossGroup 只監(jiān)聽(tīng)一個(gè)端口,那 bossGroup 只需要 1 個(gè) EventLoop 就可以了,多了純屬浪費(fèi)。
第二個(gè),默認(rèn)情況下,Netty 會(huì)創(chuàng)建“2*CPU 核數(shù)”個(gè) EventLoop,由于網(wǎng)絡(luò)連接與 EventLoop 有穩(wěn)定的關(guān)系,所以事件處理器在處理網(wǎng)絡(luò)事件的時(shí)候是不能有阻塞操作的,否則很容易導(dǎo)致請(qǐng)求大面積超時(shí)。如果實(shí)在無(wú)法避免使用阻塞操作,那可以通過(guò)線程池來(lái)異步處理。
//事件處理器 final EchoServerHandler serverHandler = new EchoServerHandler(); //boss線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //worker線程組 EventLoopGroup workerGroup = new NioEventLoopGroup(); try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch){ch.pipeline().addLast(serverHandler);}});//bind服務(wù)端端口 ChannelFuture f = b.bind(9090).sync();f.channel().closeFuture().sync(); } finally {//終止工作線程組workerGroup.shutdownGracefully();//終止boss線程組bossGroup.shutdownGracefully(); }//socket連接處理器 class EchoServerHandler extends ChannelInboundHandlerAdapter {//處理讀事件 @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg){ctx.write(msg);}//處理讀完成事件@Overridepublic void channelReadComplete(ChannelHandlerContext ctx){ctx.flush();}//處理異常事件@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }Netty 是一個(gè)款優(yōu)秀的網(wǎng)絡(luò)編程框架,性能非常好,為了實(shí)現(xiàn)高性能的目標(biāo),Netty 做了很多優(yōu)化,例如優(yōu)化了 ByteBuffer、支持零拷貝等等,和并發(fā)編程相關(guān)的就是它的線程模型了。Netty 的線程模型設(shè)計(jì)得很精巧,每個(gè)網(wǎng)絡(luò)連接都關(guān)聯(lián)到了一個(gè)線程上,這樣做的好處是:對(duì)于一個(gè)網(wǎng)絡(luò)連接,讀寫(xiě)操作都是單線程執(zhí)行的,從而避免了并發(fā)程序的各種問(wèn)題。
15、高性能隊(duì)列Disruptor
Java SDK 提供了 2 個(gè)有界隊(duì)列:ArrayBlockingQueue 和 LinkedBlockingQueue,它們都是基于 ReentrantLock 實(shí)現(xiàn)的,在高并發(fā)場(chǎng)景下,鎖的效率并不高,那有沒(méi)有更好的替代品呢?有,今天我們就介紹一種性能更高的有界隊(duì)列:Disruptor。
Disruptor 是一款高性能的有界內(nèi)存隊(duì)列,目前應(yīng)用非常廣泛,Log4j2、Spring Messaging、HBase、Storm 都用到了 Disruptor,那 Disruptor 的性能為什么這么高呢?Disruptor 項(xiàng)目團(tuán)隊(duì)曾經(jīng)寫(xiě)過(guò)一篇論文,詳細(xì)解釋了其原因,可以總結(jié)為如下:
① 內(nèi)存分配更加合理,使用 RingBuffer 數(shù)據(jù)結(jié)構(gòu),數(shù)組元素在初始化時(shí)一次性全部創(chuàng)建,提升緩存命中率;對(duì)象循環(huán)利用,避免頻繁 GC。
② 能夠避免偽共享,提升緩存利用率。
③ 采用無(wú)鎖算法,避免頻繁加鎖、解鎖的性能消耗。
④ 支持批量消費(fèi),消費(fèi)者可以無(wú)鎖方式消費(fèi)多個(gè)消息。
其中,前三點(diǎn)涉及到的知識(shí)比較多,所以今天咱們重點(diǎn)講解前三點(diǎn),不過(guò)在詳細(xì)介紹這些知識(shí)之前,我們先來(lái)聊聊 Disruptor 如何使用,好讓你先對(duì) Disruptor 有個(gè)感官的認(rèn)識(shí)。
下面的代碼出自官方示例,我略做了一些修改,相較而言,Disruptor 的使用比 Java SDK 提供 BlockingQueue 要復(fù)雜一些,但是總體思路還是一致的,其大致情況如下:
在 Disruptor 中,生產(chǎn)者生產(chǎn)的對(duì)象(也就是消費(fèi)者消費(fèi)的對(duì)象)稱(chēng)為 Event,使用 Disruptor 必須自定義 Event,例如示例代碼的自定義 Event 是 LongEvent;
構(gòu)建 Disruptor 對(duì)象除了要指定隊(duì)列大小外,還需要傳入一個(gè) EventFactory,示例代碼中傳入的是LongEvent::new;
消費(fèi) Disruptor 中的 Event 需要通過(guò) handleEventsWith() 方法注冊(cè)一個(gè)事件處理器,發(fā)布 Event 則需要通過(guò) publishEvent() 方法。
//自定義Event class LongEvent {private long value;public void set(long value) {this.value = value;} } //指定RingBuffer大小, //必須是2的N次方 int bufferSize = 1024;//構(gòu)建Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new,bufferSize,DaemonThreadFactory.INSTANCE);//注冊(cè)事件處理器 disruptor.handleEventsWith((event, sequence, endOfBatch) ->System.out.println("E: "+event));//啟動(dòng)Disruptor disruptor.start();//獲取RingBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //生產(chǎn)Event ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++){bb.putLong(0, l);//生產(chǎn)者生產(chǎn)消息ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);Thread.sleep(1000); }(1)RingBuffer 如何提升性能
Java SDK 中 ArrayBlockingQueue 使用數(shù)組作為底層的數(shù)據(jù)存儲(chǔ),而 Disruptor 是使用 RingBuffer 作為數(shù)據(jù)存儲(chǔ)。RingBuffer 本質(zhì)上也是數(shù)組,所以?xún)H僅將數(shù)據(jù)存儲(chǔ)從數(shù)組換成 RingBuffer 并不能提升性能,但是 Disruptor 在 RingBuffer 的基礎(chǔ)上還做了很多優(yōu)化,其中一項(xiàng)優(yōu)化就是和內(nèi)存分配有關(guān)的。
在介紹這項(xiàng)優(yōu)化之前,你需要先了解一下程序的局部性原理。簡(jiǎn)單來(lái)講,程序的局部性原理指的是在一段時(shí)間內(nèi)程序的執(zhí)行會(huì)限定在一個(gè)局部范圍內(nèi)。這里的“局部性”可以從兩個(gè)方面來(lái)理解,一個(gè)是時(shí)間局部性,另一個(gè)是空間局部性。時(shí)間局部性指的是程序中的某條指令一旦被執(zhí)行,不久之后這條指令很可能再次被執(zhí)行;如果某條數(shù)據(jù)被訪問(wèn),不久之后這條數(shù)據(jù)很可能再次被訪問(wèn)。而空間局部性是指某塊內(nèi)存一旦被訪問(wèn),不久之后這塊內(nèi)存附近的內(nèi)存也很可能被訪問(wèn)。
CPU 的緩存就利用了程序的局部性原理:CPU 從內(nèi)存中加載數(shù)據(jù) X 時(shí),會(huì)將數(shù)據(jù) X 緩存在高速緩存 Cache 中,實(shí)際上 CPU 緩存 X 的同時(shí),還緩存了 X 周?chē)臄?shù)據(jù),因?yàn)楦鶕?jù)程序具備局部性原理,X 周?chē)臄?shù)據(jù)也很有可能被訪問(wèn)。從另外一個(gè)角度來(lái)看,如果程序能夠很好地體現(xiàn)出局部性原理,也就能更好地利用 CPU 的緩存,從而提升程序的性能。Disruptor 在設(shè)計(jì) RingBuffer 的時(shí)候就充分考慮了這個(gè)問(wèn)題,下面我們就對(duì)比著 ArrayBlockingQueue 來(lái)分析一下。
首先是 ArrayBlockingQueue。生產(chǎn)者線程向 ArrayBlockingQueue 增加一個(gè)元素,每次增加元素 E 之前,都需要?jiǎng)?chuàng)建一個(gè)對(duì)象 E,如下圖所示,ArrayBlockingQueue 內(nèi)部有 6 個(gè)元素,這 6 個(gè)元素都是由生產(chǎn)者線程創(chuàng)建的,由于創(chuàng)建這些元素的時(shí)間基本上是離散的,所以這些元素的內(nèi)存地址大概率也不是連續(xù)的。
下面我們?cè)倏纯?Disruptor 是如何處理的。Disruptor 內(nèi)部的 RingBuffer 也是用數(shù)組實(shí)現(xiàn)的,但是這個(gè)數(shù)組中的所有元素在初始化時(shí)是一次性全部創(chuàng)建的,所以這些元素的內(nèi)存地址大概率是連續(xù)的,相關(guān)的代碼如下所示。
for (int i=0; i<bufferSize; i++){//entries[]就是RingBuffer內(nèi)部的數(shù)組//eventFactory就是前面示例代碼中傳入的LongEvent::newentries[BUFFER_PAD + i] = eventFactory.newInstance(); }Disruptor 內(nèi)部 RingBuffer 的結(jié)構(gòu)可以簡(jiǎn)化成下圖,那問(wèn)題來(lái)了,數(shù)組中所有元素內(nèi)存地址連續(xù)能提升性能嗎?能!為什么呢?因?yàn)橄M(fèi)者線程在消費(fèi)的時(shí)候,是遵循空間局部性原理的,消費(fèi)完第 1 個(gè)元素,很快就會(huì)消費(fèi)第 2 個(gè)元素;當(dāng)消費(fèi)第 1 個(gè)元素 E1 的時(shí)候,CPU 會(huì)把內(nèi)存中 E1 后面的數(shù)據(jù)也加載進(jìn) Cache,如果 E1 和 E2 在內(nèi)存中的地址是連續(xù)的,那么 E2 也就會(huì)被加載進(jìn) Cache 中,然后當(dāng)消費(fèi)第 2 個(gè)元素的時(shí)候,由于 E2 已經(jīng)在 Cache 中了,所以就不需要從內(nèi)存中加載了,這樣就能大大提升性能。
除此之外,在 Disruptor 中,生產(chǎn)者線程通過(guò) publishEvent() 發(fā)布 Event 的時(shí)候,并不是創(chuàng)建一個(gè)新的 Event,而是通過(guò) event.set() 方法修改 Event, 也就是說(shuō) RingBuffer 創(chuàng)建的 Event 是可以循環(huán)利用的,這樣還能避免頻繁創(chuàng)建、刪除 Event 導(dǎo)致的頻繁 GC 問(wèn)題。
(2)如何避免“偽共享”
高效利用 Cache,能夠大大提升性能,所以要努力構(gòu)建能夠高效利用 Cache 的內(nèi)存結(jié)構(gòu)。而從另外一個(gè)角度看,努力避免不能高效利用 Cache 的內(nèi)存結(jié)構(gòu)也同樣重要。
有一種叫做“偽共享(False sharing)”的內(nèi)存布局就會(huì)使 Cache 失效,那什么是“偽共享”呢?
偽共享和 CPU 內(nèi)部的 Cache 有關(guān),Cache 內(nèi)部是按照緩存行(Cache Line)管理的,緩存行的大小通常是 64 個(gè)字節(jié);CPU 從內(nèi)存中加載數(shù)據(jù) X,會(huì)同時(shí)加載 X 后面(64-size(X))個(gè)字節(jié)的數(shù)據(jù)。下面的示例代碼出自 Java SDK 的 ArrayBlockingQueue,其內(nèi)部維護(hù)了 4 個(gè)成員變量,分別是隊(duì)列數(shù)組 items、出隊(duì)索引 takeIndex、入隊(duì)索引 putIndex 以及隊(duì)列中的元素總數(shù) count。
/** 隊(duì)列數(shù)組 */ final Object[] items; /** 出隊(duì)索引 */ int takeIndex; /** 入隊(duì)索引 */ int putIndex; /** 隊(duì)列中元素總數(shù) */ int count;當(dāng) CPU 從內(nèi)存中加載 takeIndex 的時(shí)候,會(huì)同時(shí)將 putIndex 以及 count 都加載進(jìn) Cache。下圖是某個(gè)時(shí)刻 CPU 中 Cache 的狀況,為了簡(jiǎn)化,緩存行中我們僅列出了 takeIndex 和 putIndex。
假設(shè)線程 A 運(yùn)行在 CPU-1 上,執(zhí)行入隊(duì)操作,入隊(duì)操作會(huì)修改 putIndex,而修改 putIndex 會(huì)導(dǎo)致其所在的所有核上的緩存行均失效;此時(shí)假設(shè)運(yùn)行在 CPU-2 上的線程執(zhí)行出隊(duì)操作,出隊(duì)操作需要讀取 takeIndex,由于 takeIndex 所在的緩存行已經(jīng)失效,所以 CPU-2 必須從內(nèi)存中重新讀取。入隊(duì)操作本不會(huì)修改 takeIndex,但是由于 takeIndex 和 putIndex 共享的是一個(gè)緩存行,就導(dǎo)致出隊(duì)操作不能很好地利用 Cache,這其實(shí)就是偽共享。簡(jiǎn)單來(lái)講,偽共享指的是由于共享緩存行導(dǎo)致緩存無(wú)效的場(chǎng)景。
ArrayBlockingQueue 的入隊(duì)和出隊(duì)操作是用鎖來(lái)保證互斥的,所以入隊(duì)和出隊(duì)不會(huì)同時(shí)發(fā)生。如果允許入隊(duì)和出隊(duì)同時(shí)發(fā)生,那就會(huì)導(dǎo)致線程 A 和線程 B 爭(zhēng)用同一個(gè)緩存行,這樣也會(huì)導(dǎo)致性能問(wèn)題。所以為了更好地利用緩存,我們必須避免偽共享,那如何避免呢?
方案很簡(jiǎn)單,每個(gè)變量獨(dú)占一個(gè)緩存行、不共享緩存行就可以了,具體技術(shù)是緩存行填充。比如想讓 takeIndex 獨(dú)占一個(gè)緩存行,可以在 takeIndex 的前后各填充 56 個(gè)字節(jié),這樣就一定能保證 takeIndex 獨(dú)占一個(gè)緩存行。下面的示例代碼出自 Disruptor,Sequence 對(duì)象中的 value 屬性就能避免偽共享,因?yàn)檫@個(gè)屬性前后都填充了 56 個(gè)字節(jié)。Disruptor 中很多對(duì)象,例如 RingBuffer、RingBuffer 內(nèi)部的數(shù)組都用到了這種填充技術(shù)來(lái)避免偽共享。
//前:填充56字節(jié) class LhsPadding{long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding{volatile long value; } //后:填充56字節(jié) class RhsPadding extends Value{long p9, p10, p11, p12, p13, p14, p15; } class Sequence extends RhsPadding{//省略實(shí)現(xiàn) }(3)Disruptor 中的無(wú)鎖算法
ArrayBlockingQueue 是利用管程實(shí)現(xiàn)的,中規(guī)中矩,生產(chǎn)、消費(fèi)操作都需要加鎖,實(shí)現(xiàn)起來(lái)簡(jiǎn)單,但是性能并不十分理想。Disruptor 采用的是無(wú)鎖算法,很復(fù)雜,但是核心無(wú)非是生產(chǎn)和消費(fèi)兩個(gè)操作。Disruptor 中最復(fù)雜的是入隊(duì)操作,所以我們重點(diǎn)來(lái)看看入隊(duì)操作是如何實(shí)現(xiàn)的。
對(duì)于入隊(duì)操作,最關(guān)鍵的要求是不能覆蓋沒(méi)有消費(fèi)的元素;對(duì)于出隊(duì)操作,最關(guān)鍵的要求是不能讀取沒(méi)有寫(xiě)入的元素,所以 Disruptor 中也一定會(huì)維護(hù)類(lèi)似出隊(duì)索引和入隊(duì)索引這樣兩個(gè)關(guān)鍵變量。Disruptor 中的 RingBuffer 維護(hù)了入隊(duì)索引,但是并沒(méi)有維護(hù)出隊(duì)索引,這是因?yàn)樵?Disruptor 中多個(gè)消費(fèi)者可以同時(shí)消費(fèi),每個(gè)消費(fèi)者都會(huì)有一個(gè)出隊(duì)索引,所以 RingBuffer 的出隊(duì)索引是所有消費(fèi)者里面最小的那一個(gè)。
下面是 Disruptor 生產(chǎn)者入隊(duì)操作的核心代碼,看上去很復(fù)雜,其實(shí)邏輯很簡(jiǎn)單:如果沒(méi)有足夠的空余位置,就出讓 CPU 使用權(quán),然后重新計(jì)算;反之則用 CAS 設(shè)置入隊(duì)索引。
//生產(chǎn)者獲取n個(gè)寫(xiě)入位置 do {//cursor類(lèi)似于入隊(duì)索引,指的是上次生產(chǎn)到這里current = cursor.get();//目標(biāo)是在生產(chǎn)n個(gè)next = current + n;//減掉一個(gè)循環(huán)long wrapPoint = next - bufferSize;//獲取上一次的最小消費(fèi)位置long cachedGatingSequence = gatingSequenceCache.get();//沒(méi)有足夠的空余位置if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){//重新計(jì)算所有消費(fèi)者里面的最小值位置long gatingSequence = Util.getMinimumSequence(gatingSequences, current);//仍然沒(méi)有足夠的空余位置,出讓CPU使用權(quán),重新執(zhí)行下一循環(huán)if (wrapPoint > gatingSequence){LockSupport.parkNanos(1);continue;}//從新設(shè)置上一次的最小消費(fèi)位置gatingSequenceCache.set(gatingSequence);} else if (cursor.compareAndSet(current, next)){//獲取寫(xiě)入位置成功,跳出循環(huán)break;} } while (true);Disruptor 在優(yōu)化并發(fā)性能方面可謂是做到了極致,優(yōu)化的思路大體是兩個(gè)方面,一個(gè)是利用無(wú)鎖算法避免鎖的爭(zhēng)用,另外一個(gè)則是將硬件(CPU)的性能發(fā)揮到極致。尤其是后者,在 Java 領(lǐng)域基本上屬于經(jīng)典之作了。
發(fā)揮硬件的能力一般是 C 這種面向硬件的語(yǔ)言常干的事兒,C 語(yǔ)言領(lǐng)域經(jīng)常通過(guò)調(diào)整內(nèi)存布局優(yōu)化內(nèi)存占用,而 Java 領(lǐng)域則用的很少,原因在于 Java 可以智能地優(yōu)化內(nèi)存布局,內(nèi)存布局對(duì) Java 程序員的透明的。這種智能的優(yōu)化大部分場(chǎng)景是很友好的,但是如果你想通過(guò)填充方式避免偽共享就必須繞過(guò)這種優(yōu)化,關(guān)于這方面 Disruptor 提供了經(jīng)典的實(shí)現(xiàn),你可以參考。
由于偽共享問(wèn)題如此重要,所以 Java 也開(kāi)始重視它了,比如 Java 8 中,提供了避免偽共享的注解:@sun.misc.Contended,通過(guò)這個(gè)注解就能輕松避免偽共享(需要設(shè)置 JVM 參數(shù) -XX:-RestrictContended)。不過(guò)避免偽共享是以犧牲內(nèi)存為代價(jià)的,所以具體使用的時(shí)候還是需要仔細(xì)斟酌。
16、高性能數(shù)據(jù)庫(kù)連接池HiKariCP
只要和數(shù)據(jù)庫(kù)打交道,就免不了使用數(shù)據(jù)庫(kù)連接池。業(yè)界知名的數(shù)據(jù)庫(kù)連接池有不少,例如 c3p0、DBCP、Tomcat JDBC Connection Pool、Druid 等,不過(guò)最近最火的是 HiKariCP。
HiKariCP 號(hào)稱(chēng)是業(yè)界跑得最快的數(shù)據(jù)庫(kù)連接池,這兩年發(fā)展得順風(fēng)順?biāo)?#xff0c;尤其是 Springboot 2.0 將其作為默認(rèn)數(shù)據(jù)庫(kù)連接池后,江湖一哥的地位已是毋庸置疑了。那它為什么那么快呢?今天咱們就重點(diǎn)聊聊這個(gè)話(huà)題。
(1)什么是數(shù)據(jù)庫(kù)連接池
在詳細(xì)分析 HiKariCP 高性能之前,我們有必要先簡(jiǎn)單介紹一下什么是數(shù)據(jù)庫(kù)連接池。本質(zhì)上,數(shù)據(jù)庫(kù)連接池和線程池一樣,都屬于池化資源,作用都是避免重量級(jí)資源的頻繁創(chuàng)建和銷(xiāo)毀,對(duì)于數(shù)據(jù)庫(kù)連接池來(lái)說(shuō),也就是避免數(shù)據(jù)庫(kù)連接頻繁創(chuàng)建和銷(xiāo)毀。如下圖所示,服務(wù)端會(huì)在運(yùn)行期持有一定數(shù)量的數(shù)據(jù)庫(kù)連接,當(dāng)需要執(zhí)行 SQL 時(shí),并不是直接創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接,而是從連接池中獲取一個(gè);當(dāng) SQL 執(zhí)行完,也并不是將數(shù)據(jù)庫(kù)連接真的關(guān)掉,而是將其歸還到連接池中。
在實(shí)際工作中,我們都是使用各種持久化框架來(lái)完成數(shù)據(jù)庫(kù)的增刪改查,基本上不會(huì)直接和數(shù)據(jù)庫(kù)連接池打交道,為了能讓你更好地理解數(shù)據(jù)庫(kù)連接池的工作原理,下面的示例代碼并沒(méi)有使用任何框架,而是原生地使用 HiKariCP。執(zhí)行數(shù)據(jù)庫(kù)操作基本上是一系列規(guī)范化的步驟:
① 通過(guò)數(shù)據(jù)源獲取一個(gè)數(shù)據(jù)庫(kù)連接;
② 創(chuàng)建 Statement;
③ 執(zhí)行 SQL;
④ 通過(guò) ResultSet 獲取 SQL 執(zhí)行結(jié)果;
⑤ 釋放 ResultSet;
⑥ 釋放 Statement;
⑦ 釋放數(shù)據(jù)庫(kù)連接。
下面的示例代碼,通過(guò) ds.getConnection() 獲取一個(gè)數(shù)據(jù)庫(kù)連接時(shí),其實(shí)是向數(shù)據(jù)庫(kù)連接池申請(qǐng)一個(gè)數(shù)據(jù)庫(kù)連接,而不是創(chuàng)建一個(gè)新的數(shù)據(jù)庫(kù)連接。同樣,通過(guò) conn.close() 釋放一個(gè)數(shù)據(jù)庫(kù)連接時(shí),也不是直接將連接關(guān)閉,而是將連接歸還給數(shù)據(jù)庫(kù)連接池。
//數(shù)據(jù)庫(kù)連接池配置 HikariConfig config = new HikariConfig(); config.setMinimumIdle(1); config.setMaximumPoolSize(2); config.setConnectionTestQuery("SELECT 1"); config.setDataSourceClassName("org.h2.jdbcx.JdbcDataSource"); config.addDataSourceProperty("url", "jdbc:h2:mem:test"); // 創(chuàng)建數(shù)據(jù)源 DataSource ds = new HikariDataSource(config); Connection conn = null; Statement stmt = null; ResultSet rs = null; try {// 獲取數(shù)據(jù)庫(kù)連接conn = ds.getConnection();// 創(chuàng)建Statement stmt = conn.createStatement();// 執(zhí)行SQLrs = stmt.executeQuery("select * from abc");// 獲取結(jié)果while (rs.next()) {int id = rs.getInt(1);......} } catch(Exception e) {e.printStackTrace(); } finally {//關(guān)閉ResultSetclose(rs);//關(guān)閉Statement close(stmt);//關(guān)閉Connectionclose(conn); } //關(guān)閉資源 void close(AutoCloseable rs) {if (rs != null) {try {rs.close();} catch (SQLException e) {e.printStackTrace();}} }HiKariCP 官方網(wǎng)站解釋了其性能之所以如此之高的秘密。微觀上 HiKariCP 程序編譯出的字節(jié)碼執(zhí)行效率更高,站在字節(jié)碼的角度去優(yōu)化 Java 代碼,HiKariCP 的作者對(duì)性能的執(zhí)著可見(jiàn)一斑,不過(guò)遺憾的是他并沒(méi)有詳細(xì)解釋都做了哪些優(yōu)化。而宏觀上主要是和兩個(gè)數(shù)據(jù)結(jié)構(gòu)有關(guān),一個(gè)是 FastList,另一個(gè)是 ConcurrentBag。下面我們來(lái)看看它們是如何提升 HiKariCP 的性能的。
(2)FastList 解決了哪些性能問(wèn)題
按照規(guī)范步驟,執(zhí)行完數(shù)據(jù)庫(kù)操作之后,需要依次關(guān)閉 ResultSet、Statement、Connection,但是總有粗心的同學(xué)只是關(guān)閉了 Connection,而忘了關(guān)閉 ResultSet 和 Statement。為了解決這種問(wèn)題,最好的辦法是當(dāng)關(guān)閉 Connection 時(shí),能夠自動(dòng)關(guān)閉 Statement。為了達(dá)到這個(gè)目標(biāo),Connection 就需要跟蹤創(chuàng)建的 Statement,最簡(jiǎn)單的辦法就是將創(chuàng)建的 Statement 保存在數(shù)組 ArrayList 里,這樣當(dāng)關(guān)閉 Connection 的時(shí)候,就可以依次將數(shù)組中的所有 Statement 關(guān)閉。
HiKariCP 覺(jué)得用 ArrayList 還是太慢,當(dāng)通過(guò) conn.createStatement() 創(chuàng)建一個(gè) Statement 時(shí),需要調(diào)用 ArrayList 的 add() 方法加入到 ArrayList 中,這個(gè)是沒(méi)有問(wèn)題的;但是當(dāng)通過(guò) stmt.close() 關(guān)閉 Statement 的時(shí)候,需要調(diào)用 ArrayList 的 remove() 方法來(lái)將其從 ArrayList 中刪除,這里是有優(yōu)化余地的。
假設(shè)一個(gè) Connection 依次創(chuàng)建 6 個(gè) Statement,分別是 S1、S2、S3、S4、S5、S6,按照正常的編碼習(xí)慣,關(guān)閉 Statement 的順序一般是逆序的,關(guān)閉的順序是:S6、S5、S4、S3、S2、S1,而 ArrayList 的 remove(Object o) 方法是順序遍歷查找,逆序刪除而順序查找,這樣的查找效率就太慢了。如何優(yōu)化呢?很簡(jiǎn)單,優(yōu)化成逆序查找就可以了。
HiKariCP 中的 FastList 相對(duì)于 ArrayList 的一個(gè)優(yōu)化點(diǎn)就是將 remove(Object element) 方法的查找順序變成了逆序查找。除此之外,FastList 還有另一個(gè)優(yōu)化點(diǎn),是 get(int index) 方法沒(méi)有對(duì) index 參數(shù)進(jìn)行越界檢查,HiKariCP 能保證不會(huì)越界,所以不用每次都進(jìn)行越界檢查。
整體來(lái)看,FastList 的優(yōu)化點(diǎn)還是很簡(jiǎn)單的。下面我們?cè)賮?lái)聊聊 HiKariCP 中的另外一個(gè)數(shù)據(jù)結(jié)構(gòu) ConcurrentBag,看看它又是如何提升性能的。
(3)ConcurrentBag 解決了哪些性能問(wèn)題
如果讓我們自己來(lái)實(shí)現(xiàn)一個(gè)數(shù)據(jù)庫(kù)連接池,最簡(jiǎn)單的辦法就是用兩個(gè)阻塞隊(duì)列來(lái)實(shí)現(xiàn),一個(gè)用于保存空閑數(shù)據(jù)庫(kù)連接的隊(duì)列 idle,另一個(gè)用于保存忙碌數(shù)據(jù)庫(kù)連接的隊(duì)列 busy;獲取連接時(shí)將空閑的數(shù)據(jù)庫(kù)連接從 idle 隊(duì)列移動(dòng)到 busy 隊(duì)列,而關(guān)閉連接時(shí)將數(shù)據(jù)庫(kù)連接從 busy 移動(dòng)到 idle。這種方案將并發(fā)問(wèn)題委托給了阻塞隊(duì)列,實(shí)現(xiàn)簡(jiǎn)單,但是性能并不是很理想。因?yàn)?Java SDK 中的阻塞隊(duì)列是用鎖實(shí)現(xiàn)的,而高并發(fā)場(chǎng)景下鎖的爭(zhēng)用對(duì)性能影響很大。
//忙碌隊(duì)列 BlockingQueue<Connection> busy; //空閑隊(duì)列 BlockingQueue<Connection> idle;HiKariCP 并沒(méi)有使用 Java SDK 中的阻塞隊(duì)列,而是自己實(shí)現(xiàn)了一個(gè)叫做 ConcurrentBag 的并發(fā)容器。ConcurrentBag 的設(shè)計(jì)最初源自 C#,它的一個(gè)核心設(shè)計(jì)是使用 ThreadLocal 避免部分并發(fā)問(wèn)題,不過(guò) HiKariCP 中的 ConcurrentBag 并沒(méi)有完全參考 C# 的實(shí)現(xiàn),下面我們來(lái)看看它是如何實(shí)現(xiàn)的。
ConcurrentBag 中最關(guān)鍵的屬性有 4 個(gè),分別是:用于存儲(chǔ)所有的數(shù)據(jù)庫(kù)連接的共享隊(duì)列 sharedList、線程本地存儲(chǔ) threadList、等待數(shù)據(jù)庫(kù)連接的線程數(shù) waiters 以及分配數(shù)據(jù)庫(kù)連接的工具 handoffQueue。其中,handoffQueue 用的是 Java SDK 提供的 SynchronousQueue,SynchronousQueue 主要用于線程之間傳遞數(shù)據(jù)。
//用于存儲(chǔ)所有的數(shù)據(jù)庫(kù)連接 CopyOnWriteArrayList<T> sharedList; //線程本地存儲(chǔ)中的數(shù)據(jù)庫(kù)連接 ThreadLocal<List<Object>> threadList; //等待數(shù)據(jù)庫(kù)連接的線程數(shù) AtomicInteger waiters; //分配數(shù)據(jù)庫(kù)連接的工具 SynchronousQueue<T> handoffQueue;當(dāng)線程池創(chuàng)建了一個(gè)數(shù)據(jù)庫(kù)連接時(shí),通過(guò)調(diào)用 ConcurrentBag 的 add() 方法加入到 ConcurrentBag 中,下面是 add() 方法的具體實(shí)現(xiàn),邏輯很簡(jiǎn)單,就是將這個(gè)連接加入到共享隊(duì)列 sharedList 中,如果此時(shí)有線程在等待數(shù)據(jù)庫(kù)連接,那么就通過(guò) handoffQueue 將這個(gè)連接分配給等待的線程。
//將空閑連接添加到隊(duì)列 void add(final T bagEntry){//加入共享隊(duì)列sharedList.add(bagEntry);//如果有等待連接的線程,//則通過(guò)handoffQueue直接分配給等待的線程while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {yield();} }通過(guò) ConcurrentBag 提供的 borrow() 方法,可以獲取一個(gè)空閑的數(shù)據(jù)庫(kù)連接,borrow() 的主要邏輯是:
① 首先查看線程本地存儲(chǔ)是否有空閑連接,如果有,則返回一個(gè)空閑的連接;
② 如果線程本地存儲(chǔ)中無(wú)空閑連接,則從共享隊(duì)列中獲取。
③ 如果共享隊(duì)列中也沒(méi)有空閑的連接,則請(qǐng)求線程需要等待。
需要注意的是,線程本地存儲(chǔ)中的連接是可以被其他線程竊取的,所以需要用 CAS 方法防止重復(fù)分配。在共享隊(duì)列中獲取空閑連接,也采用了 CAS 方法防止重復(fù)分配。
T borrow(long timeout, final TimeUnit timeUnit){// 先查看線程本地存儲(chǔ)是否有空閑連接final List<Object> list = threadList.get();for (int i = list.size() - 1; i >= 0; i--) {final Object entry = list.remove(i);final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;//線程本地存儲(chǔ)中的連接也可以被竊取,//所以需要用CAS方法防止重復(fù)分配if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}// 線程本地存儲(chǔ)中無(wú)空閑連接,則從共享隊(duì)列中獲取final int waiting = waiters.incrementAndGet();try {for (T bagEntry : sharedList) {//如果共享隊(duì)列中有空閑連接,則返回if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}//共享隊(duì)列中沒(méi)有連接,則需要等待timeout = timeUnit.toNanos(timeout);do {final long start = currentTime();final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}//重新計(jì)算等待時(shí)間timeout -= elapsedNanos(start);} while (timeout > 10_000);//超時(shí)沒(méi)有獲取到連接,返回nullreturn null;} finally {waiters.decrementAndGet();} }釋放連接需要調(diào)用 ConcurrentBag 提供的 requite() 方法,該方法的邏輯很簡(jiǎn)單,首先將數(shù)據(jù)庫(kù)連接狀態(tài)更改為 STATE_NOT_IN_USE,之后查看是否存在等待線程,如果有,則分配給等待線程;如果沒(méi)有,則將該數(shù)據(jù)庫(kù)連接保存到線程本地存儲(chǔ)里。
//釋放連接 void requite(final T bagEntry){//更新連接狀態(tài)bagEntry.setState(STATE_NOT_IN_USE);//如果有等待的線程,則直接分配給線程,無(wú)需進(jìn)入任何隊(duì)列for (int i = 0; waiters.get() > 0; i++) {if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {return;} else if ((i & 0xff) == 0xff) {parkNanos(MICROSECONDS.toNanos(10));} else {yield();}}//如果沒(méi)有等待的線程,則進(jìn)入線程本地存儲(chǔ)final List<Object> threadLocalList = threadList.get();if (threadLocalList.size() < 50) {threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);} }HiKariCP 中的 FastList 和 ConcurrentBag 這兩個(gè)數(shù)據(jù)結(jié)構(gòu)使用得非常巧妙,雖然實(shí)現(xiàn)起來(lái)并不復(fù)雜,但是對(duì)于性能的提升非常明顯,根本原因在于這兩個(gè)數(shù)據(jù)結(jié)構(gòu)適用于數(shù)據(jù)庫(kù)連接池這個(gè)特定的場(chǎng)景。FastList 適用于逆序刪除場(chǎng)景;而 ConcurrentBag 通過(guò) ThreadLocal 做一次預(yù)分配,避免直接競(jìng)爭(zhēng)共享資源,非常適合池化資源的分配。
在實(shí)際工作中,我們遇到的并發(fā)問(wèn)題千差萬(wàn)別,這時(shí)選擇合適的并發(fā)數(shù)據(jù)結(jié)構(gòu)就非常重要了。當(dāng)然能選對(duì)的前提是對(duì)特定場(chǎng)景的并發(fā)特性有深入的了解,只有了解到無(wú)謂的性能消耗在哪里,才能對(duì)癥下藥。
17、Actor模型:面向?qū)ο笤牟l(fā)模型
上學(xué)的時(shí)候,有門(mén)計(jì)算機(jī)專(zhuān)業(yè)課叫做面向?qū)ο缶幊?#xff0c;學(xué)這門(mén)課的時(shí)候有個(gè)問(wèn)題困擾了我很久,按照面向?qū)ο缶幊痰睦碚?#xff0c;對(duì)象之間通信需要依靠消息,而實(shí)際上,像 C++、Java 這些面向?qū)ο蟮恼Z(yǔ)言,對(duì)象之間通信,依靠的是對(duì)象方法。對(duì)象方法和過(guò)程語(yǔ)言里的函數(shù)本質(zhì)上沒(méi)有區(qū)別,有入?yún)ⅰ⒂谐鰠?#xff0c;思維方式很相似,使用起來(lái)都很簡(jiǎn)單。那面向?qū)ο罄碚摾锏南⑹欠窬偷葍r(jià)于面向?qū)ο笳Z(yǔ)言里的對(duì)象方法呢?很長(zhǎng)一段時(shí)間里,我都以為對(duì)象方法是面向?qū)ο罄碚撝邢⒌囊环N實(shí)現(xiàn),直到接觸到 Actor 模型,才明白消息壓根不是這個(gè)實(shí)現(xiàn)法。
(1)Hello Actor 模型
Actor 模型本質(zhì)上是一種計(jì)算模型,基本的計(jì)算單元稱(chēng)為 Actor,換言之,在 Actor 模型中,所有的計(jì)算都是在 Actor 中執(zhí)行的。在面向?qū)ο缶幊汤锩?#xff0c;一切都是對(duì)象;在 Actor 模型里,一切都是 Actor,并且 Actor 之間是完全隔離的,不會(huì)共享任何變量。
當(dāng)看到“不共享任何變量”的時(shí)候,相信你一定會(huì)眼前一亮,并發(fā)問(wèn)題的根源就在于共享變量,而 Actor 模型中 Actor 之間不共享變量,那用 Actor 模型解決并發(fā)問(wèn)題,一定是相當(dāng)順手。的確是這樣,所以很多人就把 Actor 模型定義為一種并發(fā)計(jì)算模型。其實(shí) Actor 模型早在 1973 年就被提出來(lái)了,只是直到最近幾年才被廣泛關(guān)注,一個(gè)主要原因就在于它是解決并發(fā)問(wèn)題的利器,而最近幾年隨著多核處理器的發(fā)展,并發(fā)問(wèn)題被推到了風(fēng)口浪尖上。
但是 Java 語(yǔ)言本身并不支持 Actor 模型,所以如果你想在 Java 語(yǔ)言里使用 Actor 模型,就需要借助第三方類(lèi)庫(kù),目前能完備地支持 Actor 模型而且比較成熟的類(lèi)庫(kù)就是 Akka 了。在詳細(xì)介紹 Actor 模型之前,我們就先基于 Akka 寫(xiě)一個(gè) Hello World 程序,讓你對(duì) Actor 模型先有個(gè)感官的印象。
在下面的示例代碼中,我們首先創(chuàng)建了一個(gè) ActorSystem(Actor 不能脫離 ActorSystem 存在);之后創(chuàng)建了一個(gè) HelloActor,Akka 中創(chuàng)建 Actor 并不是 new 一個(gè)對(duì)象出來(lái),而是通過(guò)調(diào)用 system.actorOf() 方法創(chuàng)建的,該方法返回的是 ActorRef,而不是 HelloActor;最后通過(guò)調(diào)用 ActorRef 的 tell() 方法給 HelloActor 發(fā)送了一條消息 “Actor” 。
//該Actor當(dāng)收到消息message后, //會(huì)打印Hello message static class HelloActor extends UntypedActor {@Overridepublic void onReceive(Object message) {System.out.println("Hello " + message);} }public static void main(String[] args) {//創(chuàng)建Actor系統(tǒng)ActorSystem system = ActorSystem.create("HelloSystem");//創(chuàng)建HelloActorActorRef helloActor = system.actorOf(Props.create(HelloActor.class));//發(fā)送消息給HelloActorhelloActor.tell("Actor", ActorRef.noSender()); }通過(guò)這個(gè)例子,你會(huì)發(fā)現(xiàn) Actor 模型和面向?qū)ο缶幊唐鹾隙确浅8?#xff0c;完全可以用 Actor 類(lèi)比面向?qū)ο缶幊汤锩娴膶?duì)象,而且 Actor 之間的通信方式完美地遵守了消息機(jī)制,而不是通過(guò)對(duì)象方法來(lái)實(shí)現(xiàn)對(duì)象之間的通信。那 Actor 中的消息機(jī)制和面向?qū)ο笳Z(yǔ)言里的對(duì)象方法有什么區(qū)別呢?
(2)消息和對(duì)象方法的區(qū)別
在沒(méi)有計(jì)算機(jī)的時(shí)代,異地的朋友往往是通過(guò)寫(xiě)信來(lái)交流感情的,但信件發(fā)出去之后,也許會(huì)在寄送過(guò)程中弄丟了,也有可能寄到后,對(duì)方一直沒(méi)有時(shí)間寫(xiě)回信……這個(gè)時(shí)候都可以讓郵局“背個(gè)鍋”,不過(guò)無(wú)論如何,也不過(guò)是重寫(xiě)一封,生活繼續(xù)。
Actor 中的消息機(jī)制,就可以類(lèi)比這現(xiàn)實(shí)世界里的寫(xiě)信。Actor 內(nèi)部有一個(gè)郵箱(Mailbox),接收到的消息都是先放到郵箱里,如果郵箱里有積壓的消息,那么新收到的消息就不會(huì)馬上得到處理,也正是因?yàn)?Actor 使用單線程處理消息,所以不會(huì)出現(xiàn)并發(fā)問(wèn)題。你可以把 Actor 內(nèi)部的工作模式想象成只有一個(gè)消費(fèi)者線程的生產(chǎn)者 - 消費(fèi)者模式。
所以,在 Actor 模型里,發(fā)送消息僅僅是把消息發(fā)出去而已,接收消息的 Actor 在接收到消息后,也不一定會(huì)立即處理,也就是說(shuō) Actor 中的消息機(jī)制完全是異步的。而調(diào)用對(duì)象方法,實(shí)際上是同步的,對(duì)象方法 return 之前,調(diào)用方會(huì)一直等待。
除此之外,調(diào)用對(duì)象方法,需要持有對(duì)象的引用,所有的對(duì)象必須在同一個(gè)進(jìn)程中。而在 Actor 中發(fā)送消息,類(lèi)似于現(xiàn)實(shí)中的寫(xiě)信,只需要知道對(duì)方的地址就可以,發(fā)送消息和接收消息的 Actor 可以不在一個(gè)進(jìn)程中,也可以不在同一臺(tái)機(jī)器上。因此,Actor 模型不但適用于并發(fā)計(jì)算,還適用于分布式計(jì)算。
(3)Actor 的規(guī)范化定義
通過(guò)上面的介紹,相信你應(yīng)該已經(jīng)對(duì) Actor 有一個(gè)感官印象了,下面我們?cè)賮?lái)看看 Actor 規(guī)范化的定義是什么樣的。Actor 是一種基礎(chǔ)的計(jì)算單元,具體來(lái)講包括三部分能力,分別是:
① 處理能力,處理接收到的消息。
② 存儲(chǔ)能力,Actor 可以存儲(chǔ)自己的內(nèi)部狀態(tài),并且內(nèi)部狀態(tài)在不同 Actor 之間是絕對(duì)隔離的。
③ 通信能力,Actor 可以和其他 Actor 之間通信。
當(dāng)一個(gè) Actor 接收的一條消息之后,這個(gè) Actor 可以做以下三件事:
① 創(chuàng)建更多的 Actor;
② 發(fā)消息給其他 Actor;
③ 確定如何處理下一條消息。
其中前兩條還是很好理解的,就是最后一條,該如何去理解呢?前面我們說(shuō)過(guò) Actor 具備存儲(chǔ)能力,它有自己的內(nèi)部狀態(tài),所以你也可以把 Actor 看作一個(gè)狀態(tài)機(jī),把 Actor 處理消息看作是觸發(fā)狀態(tài)機(jī)的狀態(tài)變化;而狀態(tài)機(jī)的變化往往要基于上一個(gè)狀態(tài),觸發(fā)狀態(tài)機(jī)發(fā)生變化的時(shí)刻,上一個(gè)狀態(tài)必須是確定的,所以確定如何處理下一條消息,本質(zhì)上不過(guò)是改變內(nèi)部狀態(tài)。
在多線程里面,由于可能存在競(jìng)態(tài)條件,所以根據(jù)當(dāng)前狀態(tài)確定如何處理下一條消息還是有難度的,需要使用各種同步工具,但在 Actor 模型里,由于是單線程處理,所以就不存在競(jìng)態(tài)條件問(wèn)題了。
(4)用 Actor 實(shí)現(xiàn)累加器
支持并發(fā)的累加器可能是最簡(jiǎn)單并且有代表性的并發(fā)問(wèn)題了,可以基于互斥鎖方案實(shí)現(xiàn),也可以基于原子類(lèi)實(shí)現(xiàn),但今天我們要嘗試用 Actor 來(lái)實(shí)現(xiàn)。
在下面的示例代碼中,CounterActor 內(nèi)部持有累計(jì)值 counter,當(dāng) CounterActor 接收到一個(gè)數(shù)值型的消息 message 時(shí),就將累計(jì)值 counter += message;但如果是其他類(lèi)型的消息,則打印當(dāng)前累計(jì)值 counter。在 main() 方法中,我們啟動(dòng)了 4 個(gè)線程來(lái)執(zhí)行累加操作。整個(gè)程序沒(méi)有鎖,也沒(méi)有 CAS,但是程序是線程安全的。
//累加器 static class CounterActor extends UntypedActor {private int counter = 0;@Overridepublic void onReceive(Object message){//如果接收到的消息是數(shù)字類(lèi)型,執(zhí)行累加操作,//否則打印counter的值if (message instanceof Number) {counter += ((Number) message).intValue();} else {System.out.println(counter);}} } public static void main(String[] args) throws InterruptedException {//創(chuàng)建Actor系統(tǒng)ActorSystem system = ActorSystem.create("HelloSystem");//4個(gè)線程生產(chǎn)消息ExecutorService es = Executors.newFixedThreadPool(4);//創(chuàng)建CounterActor ActorRef counterActor = system.actorOf(Props.create(CounterActor.class));//生產(chǎn)4*100000個(gè)消息 for (int i=0; i<4; i++) {es.execute(()->{for (int j=0; j<100000; j++) {counterActor.tell(1, ActorRef.noSender());}});}//關(guān)閉線程池es.shutdown();//等待CounterActor處理完所有消息Thread.sleep(1000);//打印結(jié)果counterActor.tell("", ActorRef.noSender());//關(guān)閉Actor系統(tǒng)system.shutdown(); }Actor 模型是一種非常簡(jiǎn)單的計(jì)算模型,其中 Actor 是最基本的計(jì)算單元,Actor 之間是通過(guò)消息進(jìn)行通信。Actor 與面向?qū)ο缶幊?#xff08;OOP)中的對(duì)象匹配度非常高,在面向?qū)ο缶幊汤?#xff0c;系統(tǒng)由類(lèi)似于生物細(xì)胞那樣的對(duì)象構(gòu)成,對(duì)象之間也是通過(guò)消息進(jìn)行通信,所以在面向?qū)ο笳Z(yǔ)言里使用 Actor 模型基本上不會(huì)有違和感。
在 Java 領(lǐng)域,除了可以使用 Akka 來(lái)支持 Actor 模型外,還可以使用 Vert.x,不過(guò)相對(duì)來(lái)說(shuō) Vert.x 更像是 Actor 模型的隱式實(shí)現(xiàn),對(duì)應(yīng)關(guān)系不像 Akka 那樣明顯,不過(guò)本質(zhì)上也是一種 Actor 模型。
Actor 可以創(chuàng)建新的 Actor,這些 Actor 最終會(huì)呈現(xiàn)出一個(gè)樹(shù)狀結(jié)構(gòu),非常像現(xiàn)實(shí)世界里的組織結(jié)構(gòu),所以利用 Actor 模型來(lái)對(duì)程序進(jìn)行建模,和現(xiàn)實(shí)世界的匹配度非常高。Actor 模型和現(xiàn)實(shí)世界一樣都是異步模型,理論上不保證消息百分百送達(dá),也不保證消息送達(dá)的順序和發(fā)送的順序是一致的,甚至無(wú)法保證消息會(huì)被百分百處理。雖然實(shí)現(xiàn) Actor 模型的廠商都在試圖解決這些問(wèn)題,但遺憾的是解決得并不完美,所以使用 Actor 模型也是有成本的。
18、軟件事務(wù)內(nèi)存:借鑒數(shù)據(jù)庫(kù)的并發(fā)經(jīng)驗(yàn)
很多同學(xué)反饋說(shuō),工作了挺長(zhǎng)時(shí)間但是沒(méi)有機(jī)會(huì)接觸并發(fā)編程,實(shí)際上我們天天都在寫(xiě)并發(fā)程序,只不過(guò)并發(fā)相關(guān)的問(wèn)題都被類(lèi)似 Tomcat 這樣的 Web 服務(wù)器以及 MySQL 這樣的數(shù)據(jù)庫(kù)解決了。尤其是數(shù)據(jù)庫(kù),在解決并發(fā)問(wèn)題方面,可謂成績(jī)斐然,它的事務(wù)機(jī)制非常簡(jiǎn)單易用,能甩 Java 里面的鎖、原子類(lèi)十條街。技術(shù)無(wú)邊界,很顯然要借鑒一下。
其實(shí)很多編程語(yǔ)言都有從數(shù)據(jù)庫(kù)的事務(wù)管理中獲得靈感,并且總結(jié)出了一個(gè)新的并發(fā)解決方案:軟件事務(wù)內(nèi)存(Software Transactional Memory,簡(jiǎn)稱(chēng) STM)。傳統(tǒng)的數(shù)據(jù)庫(kù)事務(wù),支持 4 個(gè)特性:原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability),也就是大家常說(shuō)的 ACID,STM 由于不涉及到持久化,所以只支持 ACI。
STM 的使用很簡(jiǎn)單,下面我們以經(jīng)典的轉(zhuǎn)賬操作為例,看看用 STM 該如何實(shí)現(xiàn)。
(1)用 STM 實(shí)現(xiàn)轉(zhuǎn)賬
并發(fā)轉(zhuǎn)賬的例子,示例代碼如下。簡(jiǎn)單地使用 synchronized 將 transfer() 方法變成同步方法并不能解決并發(fā)問(wèn)題,因?yàn)檫€存在死鎖問(wèn)題。
class UnsafeAccount {//余額private long balance;//構(gòu)造函數(shù)public UnsafeAccount(long balance) {this.balance = balance;}//轉(zhuǎn)賬void transfer(UnsafeAccount target, long amt){if (this.balance > amt) {this.balance -= amt;target.balance += amt;}} }該轉(zhuǎn)賬操作若使用數(shù)據(jù)庫(kù)事務(wù)就會(huì)非常簡(jiǎn)單,如下面的示例代碼所示。如果所有 SQL 都正常執(zhí)行,則通過(guò) commit() 方法提交事務(wù);如果 SQL 在執(zhí)行過(guò)程中有異常,則通過(guò) rollback() 方法回滾事務(wù)。數(shù)據(jù)庫(kù)保證在并發(fā)情況下不會(huì)有死鎖,而且還能保證前面我們說(shuō)的原子性、一致性、隔離性和持久性,也就是 ACID。
Connection conn = null; try{//獲取數(shù)據(jù)庫(kù)連接conn = DriverManager.getConnection();//設(shè)置手動(dòng)提交事務(wù)conn.setAutoCommit(false);//執(zhí)行轉(zhuǎn)賬SQL......//提交事務(wù)conn.commit(); } catch (Exception e) {//出現(xiàn)異常回滾事務(wù)conn.rollback(); }那如果用 STM 又該如何實(shí)現(xiàn)呢?Java 語(yǔ)言并不支持 STM,不過(guò)可以借助第三方的類(lèi)庫(kù)來(lái)支持,Multiverse就是個(gè)不錯(cuò)的選擇。下面的示例代碼就是借助 Multiverse 實(shí)現(xiàn)了線程安全的轉(zhuǎn)賬操作,相比較上面線程不安全的 UnsafeAccount,其改動(dòng)并不大,僅僅是將余額的類(lèi)型從 long 變成了 TxnLong ,將轉(zhuǎn)賬的操作放到了 atomic(()->{}) 中。
class Account{//余額private TxnLong balance;//構(gòu)造函數(shù)public Account(long balance){this.balance = StmUtils.newTxnLong(balance);}//轉(zhuǎn)賬public void transfer(Account to, int amt){//原子化操作atomic(()->{if (this.balance.get() > amt) {this.balance.decrement(amt);to.balance.increment(amt);}});} }一個(gè)關(guān)鍵的 atomic() 方法就把并發(fā)問(wèn)題解決了,這個(gè)方案看上去比傳統(tǒng)的方案的確簡(jiǎn)單了很多,那它是如何實(shí)現(xiàn)的呢?數(shù)據(jù)庫(kù)事務(wù)發(fā)展了幾十年了,目前被廣泛使用的是 MVCC(全稱(chēng)是 Multi-Version Concurrency Control),也就是多版本并發(fā)控制。
MVCC 可以簡(jiǎn)單地理解為數(shù)據(jù)庫(kù)事務(wù)在開(kāi)啟的時(shí)候,會(huì)給數(shù)據(jù)庫(kù)打一個(gè)快照,以后所有的讀寫(xiě)都是基于這個(gè)快照的。當(dāng)提交事務(wù)的時(shí)候,如果所有讀寫(xiě)過(guò)的數(shù)據(jù)在該事務(wù)執(zhí)行期間沒(méi)有發(fā)生過(guò)變化,那么就可以提交;如果發(fā)生了變化,說(shuō)明該事務(wù)和有其他事務(wù)讀寫(xiě)的數(shù)據(jù)沖突了,這個(gè)時(shí)候是不可以提交的。
為了記錄數(shù)據(jù)是否發(fā)生了變化,可以給每條數(shù)據(jù)增加一個(gè)版本號(hào),這樣每次成功修改數(shù)據(jù)都會(huì)增加版本號(hào)的值。MVCC 的工作原理和樂(lè)觀鎖非常相似。有不少 STM 的實(shí)現(xiàn)方案都是基于 MVCC 的,例如知名的 Clojure STM。
下面我們就用最簡(jiǎn)單的代碼基于 MVCC 實(shí)現(xiàn)一個(gè)簡(jiǎn)版的 STM,這樣你會(huì)對(duì) STM 以及 MVCC 的工作原理有更深入的認(rèn)識(shí)。
(1)自己實(shí)現(xiàn) STM
我們首先要做的,就是讓 Java 中的對(duì)象有版本號(hào),在下面的示例代碼中,VersionedRef 這個(gè)類(lèi)的作用就是將對(duì)象 value 包裝成帶版本號(hào)的對(duì)象。按照 MVCC 理論,數(shù)據(jù)的每一次修改都對(duì)應(yīng)著一個(gè)唯一的版本號(hào),所以不存在僅僅改變 value 或者 version 的情況,用不變性模式就可以很好地解決這個(gè)問(wèn)題,所以 VersionedRef 這個(gè)類(lèi)被我們?cè)O(shè)計(jì)成了不可變的。
所有對(duì)數(shù)據(jù)的讀寫(xiě)操作,一定是在一個(gè)事務(wù)里面,TxnRef 這個(gè)類(lèi)負(fù)責(zé)完成事務(wù)內(nèi)的讀寫(xiě)操作,讀寫(xiě)操作委托給了接口 Txn,Txn 代表的是讀寫(xiě)操作所在的當(dāng)前事務(wù), 內(nèi)部持有的 curRef 代表的是系統(tǒng)中的最新值。
//帶版本號(hào)的對(duì)象引用 public final class VersionedRef<T> {final T value;final long version;//構(gòu)造方法public VersionedRef(T value, long version) {this.value = value;this.version = version;} } //支持事務(wù)的引用 public class TxnRef<T> {//當(dāng)前數(shù)據(jù),帶版本號(hào)volatile VersionedRef curRef;//構(gòu)造方法public TxnRef(T value) {this.curRef = new VersionedRef(value, 0L);}//獲取當(dāng)前事務(wù)中的數(shù)據(jù)public T getValue(Txn txn) {return txn.get(this);}//在當(dāng)前事務(wù)中設(shè)置數(shù)據(jù)public void setValue(T value, Txn txn) {txn.set(this, value);} }STMTxn 是 Txn 最關(guān)鍵的一個(gè)實(shí)現(xiàn)類(lèi),事務(wù)內(nèi)對(duì)于數(shù)據(jù)的讀寫(xiě),都是通過(guò)它來(lái)完成的。STMTxn 內(nèi)部有兩個(gè) Map:inTxnMap,用于保存當(dāng)前事務(wù)中所有讀寫(xiě)的數(shù)據(jù)的快照;writeMap,用于保存當(dāng)前事務(wù)需要寫(xiě)入的數(shù)據(jù)。每個(gè)事務(wù)都有一個(gè)唯一的事務(wù) ID txnId,這個(gè) txnId 是全局遞增的。
STMTxn 有三個(gè)核心方法,分別是讀數(shù)據(jù)的 get() 方法、寫(xiě)數(shù)據(jù)的 set() 方法和提交事務(wù)的 commit() 方法。其中,get() 方法將要讀取數(shù)據(jù)作為快照放入 inTxnMap,同時(shí)保證每次讀取的數(shù)據(jù)都是一個(gè)版本。set() 方法會(huì)將要寫(xiě)入的數(shù)據(jù)放入 writeMap,但如果寫(xiě)入的數(shù)據(jù)沒(méi)被讀取過(guò),也會(huì)將其放入 inTxnMap。
至于 commit() 方法,我們?yōu)榱撕?jiǎn)化實(shí)現(xiàn),使用了互斥鎖,所以事務(wù)的提交是串行的。commit() 方法的實(shí)現(xiàn)很簡(jiǎn)單,首先檢查 inTxnMap 中的數(shù)據(jù)是否發(fā)生過(guò)變化,如果沒(méi)有發(fā)生變化,那么就將 writeMap 中的數(shù)據(jù)寫(xiě)入(這里的寫(xiě)入其實(shí)就是 TxnRef 內(nèi)部持有的 curRef);如果發(fā)生過(guò)變化,那么就不能將 writeMap 中的數(shù)據(jù)寫(xiě)入了。
//事務(wù)接口 public interface Txn {<T> T get(TxnRef<T> ref);<T> void set(TxnRef<T> ref, T value); } //STM事務(wù)實(shí)現(xiàn)類(lèi) public final class STMTxn implements Txn {//事務(wù)ID生成器private static AtomicLong txnSeq = new AtomicLong(0);//當(dāng)前事務(wù)所有的相關(guān)數(shù)據(jù)private Map<TxnRef, VersionedRef> inTxnMap = new HashMap<>();//當(dāng)前事務(wù)所有需要修改的數(shù)據(jù)private Map<TxnRef, Object> writeMap = new HashMap<>();//當(dāng)前事務(wù)IDprivate long txnId;//構(gòu)造函數(shù),自動(dòng)生成當(dāng)前事務(wù)IDSTMTxn() {txnId = txnSeq.incrementAndGet();}//獲取當(dāng)前事務(wù)中的數(shù)據(jù)@Overridepublic <T> T get(TxnRef<T> ref) {//將需要讀取的數(shù)據(jù),加入inTxnMapif (!inTxnMap.containsKey(ref)) {inTxnMap.put(ref, ref.curRef);}return (T) inTxnMap.get(ref).value;}//在當(dāng)前事務(wù)中修改數(shù)據(jù)@Overridepublic <T> void set(TxnRef<T> ref, T value) {//將需要修改的數(shù)據(jù),加入inTxnMapif (!inTxnMap.containsKey(ref)) {inTxnMap.put(ref, ref.curRef);}writeMap.put(ref, value);}//提交事務(wù)boolean commit() {synchronized (STM.commitLock) {//是否校驗(yàn)通過(guò)boolean isValid = true;//校驗(yàn)所有讀過(guò)的數(shù)據(jù)是否發(fā)生過(guò)變化for(Map.Entry<TxnRef, VersionedRef> entry : inTxnMap.entrySet()){VersionedRef curRef = entry.getKey().curRef;VersionedRef readRef = entry.getValue();//通過(guò)版本號(hào)來(lái)驗(yàn)證數(shù)據(jù)是否發(fā)生過(guò)變化if (curRef.version != readRef.version) {isValid = false;break;}}//如果校驗(yàn)通過(guò),則所有更改生效if (isValid) {writeMap.forEach((k, v) -> {k.curRef = new VersionedRef(v, txnId);});}return isValid;} }下面我們來(lái)模擬實(shí)現(xiàn) Multiverse 中的原子化操作 atomic()。atomic() 方法中使用了類(lèi)似于 CAS 的操作,如果事務(wù)提交失敗,那么就重新創(chuàng)建一個(gè)新的事務(wù),重新執(zhí)行。
@FunctionalInterface public interface TxnRunnable {void run(Txn txn); } //STM public final class STM {//私有化構(gòu)造方法private STM() {//提交數(shù)據(jù)需要用到的全局鎖 static final Object commitLock = new Object();//原子化提交方法public static void atomic(TxnRunnable action) {boolean committed = false;//如果沒(méi)有提交成功,則一直重試while (!committed) {//創(chuàng)建新的事務(wù)STMTxn txn = new STMTxn();//執(zhí)行業(yè)務(wù)邏輯action.run(txn);//提交事務(wù)committed = txn.commit();}} }}就這樣,我們自己實(shí)現(xiàn)了 STM,并完成了線程安全的轉(zhuǎn)賬操作,使用方法和 Multiverse 差不多,這里就不贅述了,具體代碼如下面所示。
class Account {//余額private TxnRef<Integer> balance;//構(gòu)造方法public Account(int balance) {this.balance = new TxnRef<Integer>(balance);}//轉(zhuǎn)賬操作public void transfer(Account target, int amt){STM.atomic((txn)->{Integer from = balance.getValue(txn);balance.setValue(from-amt, txn);Integer to = target.balance.getValue(txn);target.balance.setValue(to+amt, txn);});} }STM 借鑒的是數(shù)據(jù)庫(kù)的經(jīng)驗(yàn),數(shù)據(jù)庫(kù)雖然復(fù)雜,但僅僅存儲(chǔ)數(shù)據(jù),而編程語(yǔ)言除了有共享變量之外,還會(huì)執(zhí)行各種 I/O 操作,很顯然 I/O 操作是很難支持回滾的。所以,STM 也不是萬(wàn)能的。目前支持 STM 的編程語(yǔ)言主要是函數(shù)式語(yǔ)言,函數(shù)式語(yǔ)言里的數(shù)據(jù)天生具備不可變性,利用這種不可變性實(shí)現(xiàn) STM 相對(duì)來(lái)說(shuō)更簡(jiǎn)單。
19、協(xié)程:更輕量級(jí)的線程
Java 語(yǔ)言里解決并發(fā)問(wèn)題靠的是多線程,但線程是個(gè)重量級(jí)的對(duì)象,不能頻繁創(chuàng)建、銷(xiāo)毀,而且線程切換的成本也很高,為了解決這些問(wèn)題,Java SDK 提供了線程池。然而用好線程池并不容易,Java 圍繞線程池提供了很多工具類(lèi),這些工具類(lèi)學(xué)起來(lái)也不容易。那有沒(méi)有更好的解決方案呢?Java 語(yǔ)言里目前還沒(méi)有,但是其他語(yǔ)言里有,這個(gè)方案就是協(xié)程(Coroutine)。
我們可以把協(xié)程簡(jiǎn)單地理解為一種輕量級(jí)的線程。從操作系統(tǒng)的角度來(lái)看,線程是在內(nèi)核態(tài)中調(diào)度的,而協(xié)程是在用戶(hù)態(tài)調(diào)度的,所以相對(duì)于線程來(lái)說(shuō),協(xié)程切換的成本更低。協(xié)程雖然也有自己的棧,但是相比線程棧要小得多,典型的線程棧大小差不多有 1M,而協(xié)程棧的大小往往只有幾 K 或者幾十 K。所以,無(wú)論是從時(shí)間維度還是空間維度來(lái)看,協(xié)程都比線程輕量得多。
支持協(xié)程的語(yǔ)言還是挺多的,例如 Golang、Python、Lua、Kotlin 等都支持協(xié)程。下面我們就以 Golang 為代表,看看協(xié)程是如何在 Golang 中使用的。
(1)Golang 中的協(xié)程
在 Golang 中創(chuàng)建協(xié)程非常簡(jiǎn)單,在下面的示例代碼中,要讓 hello() 方法在一個(gè)新的協(xié)程中執(zhí)行,只需要go hello("World") 這一行代碼就搞定了。你可以對(duì)比著想想在 Java 里是如何“辛勤”地創(chuàng)建線程和線程池的吧,我的感覺(jué)一直都是:每次寫(xiě)完 Golang 的代碼,就再也不想寫(xiě) Java 代碼了。
import ("fmt""time" ) func hello(msg string) {fmt.Println("Hello " + msg) } func main() {//在新的協(xié)程中執(zhí)行hello方法go hello("World")fmt.Println("Run in main")//等待100毫秒讓協(xié)程執(zhí)行結(jié)束time.Sleep(100 * time.Millisecond) }利用協(xié)程能夠很好地實(shí)現(xiàn) Thread-Per-Message 模式。Thread-Per-Message 模式非常簡(jiǎn)單,其實(shí)越是簡(jiǎn)單的模式,功能上就越穩(wěn)定,可理解性也越好。
下面的示例代碼是用 Golang 實(shí)現(xiàn)的 echo 程序的服務(wù)端,用的是 Thread-Per-Message 模式,為每個(gè)成功建立連接的 socket 分配一個(gè)協(xié)程,相比 Java 線程池的實(shí)現(xiàn)方案,Golang 中協(xié)程的方案更簡(jiǎn)單。
import ("log""net" )func main() {//監(jiān)聽(tīng)本地9090端口socket, err := net.Listen("tcp", "127.0.0.1:9090")if err != nil {log.Panicln(err)}defer socket.Close()for {//處理連接請(qǐng)求 conn, err := socket.Accept()if err != nil {log.Panicln(err)}//處理已經(jīng)成功建立連接的請(qǐng)求go handleRequest(conn)} } //處理已經(jīng)成功建立連接的請(qǐng)求 func handleRequest(conn net.Conn) {defer conn.Close()for {buf := make([]byte, 1024)//讀取請(qǐng)求數(shù)據(jù)size, err := conn.Read(buf)if err != nil {return}//回寫(xiě)相應(yīng)數(shù)據(jù) conn.Write(buf[:size])} }(2)利用協(xié)程實(shí)現(xiàn)同步
其實(shí)協(xié)程并不僅限于實(shí)現(xiàn) Thread-Per-Message 模式,它還可以將異步模式轉(zhuǎn)換為同步模式。異步編程雖然近幾年取得了長(zhǎng)足發(fā)展,但是異步的思維模式對(duì)于普通人來(lái)講畢竟是有難度的,只有線性的思維模式才是適合所有人的。而線性的思維模式反映到編程世界,就是同步。
在 Java 里使用多線程并發(fā)地處理 I/O,基本上用的都是異步非阻塞模型,這種模型的異步主要是靠注冊(cè)回調(diào)函數(shù)實(shí)現(xiàn)的,那能否都使用同步處理呢?顯然是不能的。因?yàn)橥揭馕吨却?#xff0c;而線程等待,本質(zhì)上就是一種嚴(yán)重的浪費(fèi)。不過(guò)對(duì)于協(xié)程來(lái)說(shuō),等待的成本就沒(méi)有那么高了,所以基于協(xié)程實(shí)現(xiàn)同步非阻塞是一個(gè)可行的方案。
OpenResty 里實(shí)現(xiàn)的 cosocket 就是一種同步非阻塞方案,借助 cosocket 我們可以用線性的思維模式來(lái)編寫(xiě)非阻塞的程序。下面的示例代碼是用 cosocket 實(shí)現(xiàn)的 socket 程序的客戶(hù)端,建立連接、發(fā)送請(qǐng)求、讀取響應(yīng)所有的操作都是同步的,由于 cosocket 本身是非阻塞的,所以這些操作雖然是同步的,但是并不會(huì)阻塞。
-- 創(chuàng)建socket local sock = ngx.socket.tcp() -- 設(shè)置socket超時(shí)時(shí)間 sock:settimeouts(connect_timeout, send_timeout, read_timeout) -- 連接到目標(biāo)地址 local ok, err = sock:connect(host, port) if not ok then - -- 省略異常處理 end -- 發(fā)送請(qǐng)求 local bytes, err = sock:send(request_data) if not bytes then-- 省略異常處理 end -- 讀取響應(yīng) local line, err = sock:receive() if err then-- 省略異常處理 end -- 關(guān)閉socket sock:close() -- 處理讀取到的數(shù)據(jù)line handle(line)(3)結(jié)構(gòu)化并發(fā)編程
Golang 中的 go 語(yǔ)句讓協(xié)程用起來(lái)太簡(jiǎn)單了,但是這種簡(jiǎn)單也蘊(yùn)藏著風(fēng)險(xiǎn)。要深入了解這個(gè)風(fēng)險(xiǎn)是什么,就需要先了解一下 goto 語(yǔ)句的前世今生。
在我上學(xué)的時(shí)候,各種各樣的編程語(yǔ)言書(shū)籍中都會(huì)談到不建議使用 goto 語(yǔ)句,原因是 goto 語(yǔ)句會(huì)讓程序變得混亂,當(dāng)時(shí)對(duì)于這個(gè)問(wèn)題我也沒(méi)有多想,不建議用那就不用了。那為什么 goto 語(yǔ)句會(huì)讓程序變得混亂呢?混亂具體指的又是什么呢?多年之后,我才了解到所謂的混亂指的是代碼的書(shū)寫(xiě)順序和執(zhí)行順序不一致。代碼的書(shū)寫(xiě)順序,代表的是我們的思維過(guò)程,如果思維的過(guò)程與代碼執(zhí)行的順序不一致,那就會(huì)干擾我們對(duì)代碼的理解。我們的思維是線性的,傻傻地一條道兒跑到黑,而 goto 語(yǔ)句太靈活,隨時(shí)可以穿越時(shí)空,實(shí)在是太“混亂”了。
首先發(fā)現(xiàn) goto 語(yǔ)句是“毒藥”的人是著名的計(jì)算機(jī)科學(xué)家艾茲格·迪科斯徹(Edsger Dijkstra),同時(shí)他還提出了結(jié)構(gòu)化程序設(shè)計(jì)。在結(jié)構(gòu)化程序設(shè)計(jì)中,可以使用三種基本控制結(jié)構(gòu)來(lái)代替 goto,這三種基本的控制結(jié)構(gòu)就是今天我們廣泛使用的順序結(jié)構(gòu)、選擇結(jié)構(gòu)和循環(huán)結(jié)構(gòu)。
這三種基本的控制結(jié)構(gòu)奠定了今天高級(jí)語(yǔ)言的基礎(chǔ),如果仔細(xì)觀察這三種結(jié)構(gòu),你會(huì)發(fā)現(xiàn)它們的入口和出口只有一個(gè),這意味它們是可組合的,而且組合起來(lái)一定是線性的,整體來(lái)看,代碼的書(shū)寫(xiě)順序和執(zhí)行順序也是一致的。
我們以前寫(xiě)的并發(fā)程序,是否違背了結(jié)構(gòu)化程序設(shè)計(jì)呢?這個(gè)問(wèn)題以前并沒(méi)有被關(guān)注,但是最近兩年,隨著并發(fā)編程的快速發(fā)展,已經(jīng)開(kāi)始有人關(guān)注了,而且劍指 Golang 中的 go 語(yǔ)句,指其為“毒藥”,類(lèi)比的是 goto 語(yǔ)句。詳情可以參考相關(guān)的文章。
Golang 中的 go 語(yǔ)句不過(guò)是快速創(chuàng)建協(xié)程的方法而已,這篇文章本質(zhì)上并不僅僅在批判 Golang 中的 go 語(yǔ)句,而是在批判開(kāi)啟新的線程(或者協(xié)程)異步執(zhí)行這種粗糙的做法,違背了結(jié)構(gòu)化程序設(shè)計(jì),Java 語(yǔ)言其實(shí)也在其列。
當(dāng)開(kāi)啟一個(gè)新的線程時(shí),程序會(huì)并行地出現(xiàn)兩個(gè)分支,主線程一個(gè)分支,子線程一個(gè)分支,這兩個(gè)分支很多情況下都是天各一方、永不相見(jiàn)。而結(jié)構(gòu)化的程序,可以有分支,但是最終一定要匯聚,不能有多個(gè)出口,因?yàn)橹挥羞@樣它們組合起來(lái)才是線性的。
最近幾年支持協(xié)程的開(kāi)發(fā)語(yǔ)言越來(lái)越多了,Java OpenSDK 中 Loom 項(xiàng)目的目標(biāo)就是支持協(xié)程,相信不久的將來(lái),Java 程序員也可以使用協(xié)程來(lái)解決并發(fā)問(wèn)題了。
計(jì)算機(jī)里很多面向開(kāi)發(fā)人員的技術(shù),大多數(shù)都是在解決一個(gè)問(wèn)題:易用性。協(xié)程作為一項(xiàng)并發(fā)編程技術(shù),本質(zhì)上也不過(guò)是解決并發(fā)工具的易用性問(wèn)題而已。對(duì)于易用性,我覺(jué)得最重要的就是要適應(yīng)我們的思維模式,在工作的前幾年,我并沒(méi)有怎么關(guān)注它,但是最近幾年思維模式已成為我重點(diǎn)關(guān)注的對(duì)象。因?yàn)樗季S模式對(duì)工作的很多方面都會(huì)產(chǎn)生影響,例如質(zhì)量。
一個(gè)軟件產(chǎn)品是否能夠活下去,從質(zhì)量的角度看,最核心的就是代碼寫(xiě)得好。那什么樣的代碼是好代碼呢?我覺(jué)得,最根本的是可讀性好。可讀性好的代碼,意味著大家都可以上手,而且上手后不會(huì)大動(dòng)干戈。那如何讓代碼的可讀性好呢?很簡(jiǎn)單,換位思考,用大眾、普通的思維模式去寫(xiě)代碼,而不是炫耀自己的各種設(shè)計(jì)能力。我覺(jué)得好的代碼,就像人民的藝術(shù)一樣,應(yīng)該是為人民群眾服務(wù)的,只有根植于廣大群眾之中,才有生命力。
20、CSP模型:Golang的主力隊(duì)員
Golang 是一門(mén)號(hào)稱(chēng)從語(yǔ)言層面支持并發(fā)的編程語(yǔ)言,支持并發(fā)是 Golang 一個(gè)非常重要的特性,Golang 支持協(xié)程,協(xié)程可以類(lèi)比 Java 中的線程,解決并發(fā)問(wèn)題的難點(diǎn)就在于線程(協(xié)程)之間的協(xié)作。
那 Golang 是如何解決協(xié)作問(wèn)題的呢?
總的來(lái)說(shuō),Golang 提供了兩種不同的方案:一種方案支持協(xié)程之間以共享內(nèi)存的方式通信,Golang 提供了管程和原子類(lèi)來(lái)對(duì)協(xié)程進(jìn)行同步控制,這個(gè)方案與 Java 語(yǔ)言類(lèi)似;另一種方案支持協(xié)程之間以消息傳遞(Message-Passing)的方式通信,本質(zhì)上是要避免共享,Golang 的這個(gè)方案是基于 CSP(Communicating Sequential Processes)模型實(shí)現(xiàn)的。Golang 比較推薦的方案是后者。
(1)什么是 CSP 模型
Actor 模型中 Actor 之間就是不能共享內(nèi)存的,彼此之間通信只能依靠消息傳遞的方式。Golang 實(shí)現(xiàn)的 CSP 模型和 Actor 模型看上去非常相似,Golang 程序員中有句格言:“不要以共享內(nèi)存方式通信,要以通信方式共享內(nèi)存(Don’t communicate by sharing memory, share memory by communicating)。”雖然 Golang 中協(xié)程之間,也能夠以共享內(nèi)存的方式通信,但是并不推薦;而推薦的以通信的方式共享內(nèi)存,實(shí)際上指的就是協(xié)程之間以消息傳遞方式來(lái)通信。
下面我們先結(jié)合一個(gè)簡(jiǎn)單的示例,看看 Golang 中協(xié)程之間是如何以消息傳遞的方式實(shí)現(xiàn)通信的。我們示例的目標(biāo)是打印從 1 累加到 100 億的結(jié)果,如果使用單個(gè)協(xié)程來(lái)計(jì)算,大概需要 4 秒多的時(shí)間。單個(gè)協(xié)程,只能用到 CPU 中的一個(gè)核,為了提高計(jì)算性能,我們可以用多個(gè)協(xié)程來(lái)并行計(jì)算,這樣就能發(fā)揮多核的優(yōu)勢(shì)了。
在下面的示例代碼中,我們用了 4 個(gè)子協(xié)程來(lái)并行執(zhí)行,這 4 個(gè)子協(xié)程分別計(jì)算[1, 25 億]、(25 億, 50 億]、(50 億, 75 億]、(75 億, 100 億],最后再在主協(xié)程中匯總 4 個(gè)子協(xié)程的計(jì)算結(jié)果。主協(xié)程要匯總 4 個(gè)子協(xié)程的計(jì)算結(jié)果,勢(shì)必要和 4 個(gè)子協(xié)程之間通信,Golang 中協(xié)程之間通信推薦的是使用 channel,channel 你可以形象地理解為現(xiàn)實(shí)世界里的管道。另外,calc() 方法的返回值是一個(gè)只能接收數(shù)據(jù)的 channel ch,它創(chuàng)建的子協(xié)程會(huì)把計(jì)算結(jié)果發(fā)送到這個(gè) ch 中,而主協(xié)程也會(huì)將這個(gè)計(jì)算結(jié)果通過(guò) ch 讀取出來(lái)。
import ("fmt""time" )func main() {// 變量聲明var result, i uint64// 單個(gè)協(xié)程執(zhí)行累加操作start := time.Now()for i = 1; i <= 10000000000; i++ {result += i}// 統(tǒng)計(jì)計(jì)算耗時(shí)elapsed := time.Since(start)fmt.Printf("執(zhí)行消耗的時(shí)間為:", elapsed)fmt.Println(", result:", result)// 4個(gè)協(xié)程共同執(zhí)行累加操作start = time.Now()ch1 := calc(1, 2500000000)ch2 := calc(2500000001, 5000000000)ch3 := calc(5000000001, 7500000000)ch4 := calc(7500000001, 10000000000)// 匯總4個(gè)協(xié)程的累加結(jié)果result = <-ch1 + <-ch2 + <-ch3 + <-ch4// 統(tǒng)計(jì)計(jì)算耗時(shí)elapsed = time.Since(start)fmt.Printf("執(zhí)行消耗的時(shí)間為:", elapsed)fmt.Println(", result:", result) } // 在協(xié)程中異步執(zhí)行累加操作,累加結(jié)果通過(guò)channel傳遞 func calc(from uint64, to uint64) <-chan uint64 {// channel用于協(xié)程間的通信ch := make(chan uint64)// 在協(xié)程中執(zhí)行累加操作go func() {result := fromfor i := from + 1; i <= to; i++ {result += i}// 將結(jié)果寫(xiě)入channelch <- result}()// 返回結(jié)果是用于通信的channelreturn ch }(2)CSP 模型與生產(chǎn)者 - 消費(fèi)者模式
你可以簡(jiǎn)單地把 Golang 實(shí)現(xiàn)的 CSP 模型類(lèi)比為生產(chǎn)者 - 消費(fèi)者模式,而 channel 可以類(lèi)比為生產(chǎn)者 - 消費(fèi)者模式中的阻塞隊(duì)列。不過(guò),需要注意的是 Golang 中 channel 的容量可以是 0,容量為 0 的 channel 在 Golang 中被稱(chēng)為無(wú)緩沖的 channel,容量大于 0 的則被稱(chēng)為有緩沖的 channel。
無(wú)緩沖的 channel 類(lèi)似于 Java 中提供的 SynchronousQueue,主要用途是在兩個(gè)協(xié)程之間做數(shù)據(jù)交換。比如上面累加器的示例代碼中,calc() 方法內(nèi)部創(chuàng)建的 channel 就是無(wú)緩沖的 channel。
而創(chuàng)建一個(gè)有緩沖的 channel 也很簡(jiǎn)單,在下面的示例代碼中,我們創(chuàng)建了一個(gè)容量為 4 的 channel,同時(shí)創(chuàng)建了 4 個(gè)協(xié)程作為生產(chǎn)者、4 個(gè)協(xié)程作為消費(fèi)者。
// 創(chuàng)建一個(gè)容量為4的channel ch := make(chan int, 4) // 創(chuàng)建4個(gè)協(xié)程,作為生產(chǎn)者 for i := 0; i < 4; i++ {go func() {ch <- 7}() } // 創(chuàng)建4個(gè)協(xié)程,作為消費(fèi)者 for i := 0; i < 4; i++ {go func() {o := <-chfmt.Println("received:", o)}() }Golang 中的 channel 是語(yǔ)言層面支持的,所以可以使用一個(gè)左向箭頭(<-)來(lái)完成向 channel 發(fā)送數(shù)據(jù)和讀取數(shù)據(jù)的任務(wù),使用上還是比較簡(jiǎn)單的。Golang 中的 channel 是支持雙向傳輸?shù)?#xff0c;所謂雙向傳輸,指的是一個(gè)協(xié)程既可以通過(guò)它發(fā)送數(shù)據(jù),也可以通過(guò)它接收數(shù)據(jù)。
不僅如此,Golang 中還可以將一個(gè)雙向的 channel 變成一個(gè)單向的 channel,在累加器的例子中,calc() 方法中創(chuàng)建了一個(gè)雙向 channel,但是返回的就是一個(gè)只能接收數(shù)據(jù)的單向 channel,所以主協(xié)程中只能通過(guò)它接收數(shù)據(jù),而不能通過(guò)它發(fā)送數(shù)據(jù),如果試圖通過(guò)它發(fā)送數(shù)據(jù),編譯器會(huì)提示錯(cuò)誤。對(duì)比之下,雙向變單向的功能,如果以 SDK 方式實(shí)現(xiàn),還是很困難的。
(3)CSP 模型與 Actor 模型的區(qū)別
同樣是以消息傳遞的方式來(lái)避免共享,那 Golang 實(shí)現(xiàn)的 CSP 模型和 Actor 模型有什么區(qū)別呢?
第一個(gè)最明顯的區(qū)別就是:Actor 模型中沒(méi)有 channel。雖然 Actor 模型中的 mailbox 和 channel 非常像,看上去都像個(gè) FIFO 隊(duì)列,但是區(qū)別還是很大的。Actor 模型中的 mailbox 對(duì)于程序員來(lái)說(shuō)是“透明”的,mailbox 明確歸屬于一個(gè)特定的 Actor,是 Actor 模型中的內(nèi)部機(jī)制;而且 Actor 之間是可以直接通信的,不需要通信中介。但 CSP 模型中的 channel 就不一樣了,它對(duì)于程序員來(lái)說(shuō)是“可見(jiàn)”的,是通信的中介,傳遞的消息都是直接發(fā)送到 channel 中的。
第二個(gè)區(qū)別是:Actor 模型中發(fā)送消息是非阻塞的,而 CSP 模型中是阻塞的。Golang 實(shí)現(xiàn)的 CSP 模型,channel 是一個(gè)阻塞隊(duì)列,當(dāng)阻塞隊(duì)列已滿(mǎn)的時(shí)候,向 channel 中發(fā)送數(shù)據(jù),會(huì)導(dǎo)致發(fā)送消息的協(xié)程阻塞。
第三個(gè)區(qū)別則是關(guān)于消息送達(dá)的。 Actor 模型理論上不保證消息百分百送達(dá),而在 Golang 實(shí)現(xiàn)的 CSP 模型中,是能保證消息百分百送達(dá)的。不過(guò)這種百分百送達(dá)也是有代價(jià)的,那就是有可能會(huì)導(dǎo)致死鎖。
比如,下面這段代碼就存在死鎖問(wèn)題,在主協(xié)程中,我們創(chuàng)建了一個(gè)無(wú)緩沖的 channel ch,然后從 ch 中接收數(shù)據(jù),此時(shí)主協(xié)程阻塞,main() 方法中的主協(xié)程阻塞,整個(gè)應(yīng)用就阻塞了。這就是 Golang 中最簡(jiǎn)單的一種死鎖。
func main() {// 創(chuàng)建一個(gè)無(wú)緩沖的channel ch := make(chan int)// 主協(xié)程會(huì)阻塞在此處,發(fā)生死鎖<- ch }Golang 中雖然也支持傳統(tǒng)的共享內(nèi)存的協(xié)程間通信方式,但是推薦的還是使用 CSP 模型,以通信的方式共享內(nèi)存。
Golang 中實(shí)現(xiàn)的 CSP 模型功能上還是很豐富的,例如支持 select 語(yǔ)句,select 語(yǔ)句類(lèi)似于網(wǎng)絡(luò)編程里的多路復(fù)用函數(shù) select(),只要有一個(gè) channel 能夠發(fā)送成功或者接收到數(shù)據(jù)就可以跳出阻塞狀態(tài)。鑒于篇幅原因,我就點(diǎn)到這里,不詳細(xì)介紹那么多了。
CSP 模型是托尼·霍爾(Tony Hoare)在 1978 年提出的,不過(guò)這個(gè)模型這些年一直都在發(fā)展,其理論遠(yuǎn)比 Golang 的實(shí)現(xiàn)復(fù)雜得多,如果你感興趣,可以參考霍爾寫(xiě)的Communicating Sequential Processes這本電子書(shū)。另外,霍爾在并發(fā)領(lǐng)域還有一項(xiàng)重要成就,那就是提出了霍爾管程模型,這個(gè)你應(yīng)該很熟悉了,Java 領(lǐng)域解決并發(fā)問(wèn)題的理論基礎(chǔ)就是它。
Java 領(lǐng)域可以借助第三方的類(lèi)庫(kù)JCSP來(lái)支持 CSP 模型,相比 Golang 的實(shí)現(xiàn),JCSP 更接近理論模型,如果你感興趣,可以下載學(xué)習(xí)。不過(guò)需要注意的是,JCSP 并沒(méi)有經(jīng)過(guò)廣泛的生產(chǎn)環(huán)境檢驗(yàn),所以并不建議你在生產(chǎn)環(huán)境中使用。
總結(jié)
- 上一篇: 【JZOJ 4623】搬运干草捆
- 下一篇: 关于数据埋点的认识以及在流量分析系统中的