多线程小抄集(新编二)
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/java/java-concurrent-note-list-new2/
ThreadLocal
ThreadLocal可以實現每個線程綁定自己的值,即每個線程有各自獨立的副本而互相不受影響。一共有四個方法:get, set, remove, initialValue。可以重寫initialValue()方法來為ThreadLocal賦初值。SimpleDateFormat不是線程安全的,可以通過如下的方式讓每個線程單獨擁有這個對象:
private static final ThreadLocal<DateFormat> DATE_FORMAT = new ThreadLocal<DateFormat>(){@Override protected DateFormat initialValue(){return new SimpleDateFormat("yyyy-mm-dd");} }; private static final ThreadLocal<StringBuilder> stringBuilder = new ThreadLocal<StringBuilder>(){@Override protected StringBuilder initialValue(){return new StringBuilder(256);} };ThreadLocal不是用來解決對象共享訪問問題的,而主要提供了線程保持對象的方法和避免參數傳遞的方便的對象訪問方式。ThreadLocal使用場合主要解決多線程中數據因并發產生不一致的問題。ThreadLocal為每個線程中的并發訪問的數據提供一個副本,通過訪問副本來運行業務,這樣的結果是耗費了內存,但大大減少了線程同步所帶來的線程消耗,也減少了線程并發控制的復雜度。
通過ThreadLocal.set()將新創建的對象的引用保存到各線程的自己的一個map(Thread類中的ThreadLocal.ThreadLocalMap的變量)中,每個線程都有這樣一個map,執行ThreadLocal.get()時,各線程從自己的map中取出放進去的對象,因此取出來的是各自自己線程中的對象,ThreadLocal實例是作為map的key來使用的。ThreadLocalMap初始容量為16的數組table,如果個數超過threshold(capacity*2/3)之后就會按照原來容量的2倍進行擴容。每當有新的ThreadLocal添加到ThreadLocalMap的時候會通過nextHashCode()方法計算hash值:
nextHashCode.getAndAdd(HASH_INCREMENT);//HASH_INCREMENT = 0x61c88647;然后根據key.threadLocalHashCode & (table.length - 1);計算出在table中的位置i,如果發生沖突,那就根據((i + 1 < len) ? i + 1 : 0)計算出新的位置i。只是找下一個可用空間并在其中插入元素(線性探測法)。其中0x61c88647為((根號5 -1 )/ 2)* 2的32次方,與斐波那契額和黃金分割有關,為了讓哈希碼能均勻地分布在2的n次方的數組內。
ThreadLocalMap中每個Entry的key(ThreadLocal實例)是弱引用,value是強引用(這點類似于WeakHashMap)。當把threadLocal實例置為null以后,沒有任何強引用指向threadLocal實例,所以threadLocal將會被gc回收,但是value卻不能被回收,因為其還存在于ThreadLocalMap的對象的Entry之中。只有當前Thread結束之后,所有與當前線程有關的資源才會被GC回收。所以如果在線程池中使用ThreadLocal,由于線程會復用,而又沒有顯示的調用remove的話的確是會有可能發生內存泄露的問題。線程復用還會產生臟數據。由于線程池會重用Thread對象,那么與Thread綁定時的類的靜態屬性ThreadLocal變量也會被重用。如果在實現的線程run()方法中不顯式地調用remove()清理與線程相關的ThreadLocal信息,那么倘若下一個線程不調用set設置的初始值,就可能get到重用的線程信息,包括ThreadLocal所關聯的線程對象的value值。
其實在ThreadLocalMap的get或者set方法中會探測其中的key是否被回收(調用expungeStaleEntry方法),然后將其value設置為null,這個功能幾乎和WeakHashMap中的expungeStaleEntries()方法一樣。因此value在key被gc后可能還會存活一段時間,但最終也會被回收,但是若不再調用get或者set方法時,那么這個value就在線程存活期間無法被釋放。
ThreadLocal建議
InheritableThreadLocal
使用類InheritableThreadLocal可以在子線程中取得父線程繼承下來的值。可以采用重寫childValue(Object parentValue)方法來更改繼承的值。InheritableThreadLocal是ThreadLocal的子類,代碼量很少,可以看一下:
public class InheritableThreadLocal<T> extends ThreadLocal<T> {protected T childValue(T parentValue) {return parentValue;}ThreadLocalMap getMap(Thread t) {return t.inheritableThreadLocals;}void createMap(Thread t, T firstValue) {t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);} }InheritableThreadLocal的使用示例:
private static final InheritableThreadLocal<Object> itl = new InheritableThreadLocal<Object>(){@Override protected Object initialValue(){return System.currentTimeMillis();}@Overrideprotected Object childValue(Object parentValue) {return "get value from father: " + parentValue;} };public static void main(String[] args) {System.out.println(itl.get());new Thread(){@Override public void run(){System.out.println(itl.get());new Thread(){@Override public void run(){System.out.println(itl.get());}}.start();}}.start(); }程序輸出:
1542475428082 get value from father: 1542475428082 get value from father: get value from father: 1542475428082ThreadLocalRandom
我們要避免Random實例被多線程使用,雖然共享該實例是線程安全的,但是會因競爭同一seed而導致性能下降。ThreadLocalRandom類是JDK7在JUC包下新增的隨機數生成器,它解決了Random類在多線程下多個線程競爭內部唯一的原子性種子變量而導致大量線程自旋重試的不足。
CountDownLatch
CountDownLatch允許一個或多個線程等待其他線程完成操作。CountDownLatch的構造函數接收一個int類型的參數作為計數器,如果你想等待N個點完成,這里就傳入N(CountDownLatch(int count))。CountDownLatch的方法有:await(), await(long timeout, TimeUnit unit), countDown(), getCount()等。計數器必須大于等于0,只是等于0的時候,計數器就是零,調用await方法時不會阻塞當前線程。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內部計數器的值。一個線程調用countDown方法happens-before另一個線程調用的await()方法。
CyclicBarrier
讓一組線程達到一個屏障時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經達到了屏障,然后當前線程被阻塞。CyclicBarrier還提供了一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction)用于在線程達到屏障時,優先執行barrierAction,方便處理更復雜的業務場景。
示例:
CyclicBarrier與CountDownLatch的區別
Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它協調各個線程,以保證合理的使用公共資源。Semaphore有兩個構造函數:Semaphore(int permits)默認是非公平的,Semaphore(int permits, boolean fair)可以設置為公平的。
Exchanger
用于線程間協作的工具類(線程間的數據交換)。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過exchange方法交換數據,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都達到同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。
Fork/Join框架
Fork/Join框架是JDK7提供的一個用于并行執行任務的框架,是一個把大任務切分為若干子任務并行的執行,最終匯總每個小任務后得到大任務結果的框架。我們再通過Fork和Join來理解下Fork/Join框架。Fork就是把一個大任務劃分成為若干子任務并行的執行,Join就是合并這些子任務的執行結果,最后得到這個大任務的結果。使用Fork/Join框架時,首先需要創建一個ForkJoin任務,它提供在任務中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask,只需要繼承它的子類,Fork/Join框架提供了兩個子類:RecursiveAction用于沒有返回結果的任務;RecursiveTask用于有返回結果的任務。ForkJoinTask需要通過ForkJoinPool來執行。任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。(工作竊取算法work-stealing)。
工作竊取算法是指某個線程從其他隊列里竊取任務來執行。在生產-消費者設計中,所有消費者有一個共享的工作隊列,而在work-stealing設計中,每個消費者都有各自的雙端隊列,如果一個消費者完成了自己雙端隊列中的全部任務,那么它可以從其他消費者雙端隊列末尾秘密地獲取工作。優點:充分利用線程進行并行計算,減少了線程間的競爭。 缺點:在某些情況下還是存在競爭,比如雙端隊列(Deque)里只有一個任務時。并且該算法會消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。
示例:
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask;public class CountTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 10;private int start;private int end;public CountTask(int start, int end){super();this.start = start;this.end = end;}@Overrideprotected Integer compute(){int sum = 0;boolean canCompute = (end-start) <= THRESHOLD;if(canCompute){for(int i=start;i<=end;i++){sum += i;}}else{int middle = (start+end)/2;CountTask leftTask = new CountTask(start,middle);CountTask rightTask = new CountTask(middle+1,end);leftTask.fork();rightTask.fork();int leftResult = leftTask.join();int rightResult = rightTask.join();sum = leftResult+rightResult;}return sum;}public static void main(String[] args){ForkJoinPool forkJoinPool = new ForkJoinPool();CountTask task = new CountTask(1,100);Future<Integer> result = forkJoinPool.submit(task);try{System.out.println(result.get());}catch (InterruptedException | ExecutionException e){e.printStackTrace();}if(task.isCompletedAbnormally()){System.out.println(task.getException());}} }歡迎跳轉到本文的原文鏈接:https://honeypps.com/java/java-concurrent-note-list-new2/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的多线程小抄集(新编二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多线程小抄集(新编一)
- 下一篇: 多线程小抄集(新编三)