Java多线程并发编程
一、線程池
1.1、什么是線程池
? ? ? ?線程池是一種多線程的處理方式,利用已有線程對象繼續服務新的任務(按照一定的執行策略),而不是頻繁地創建銷毀線程對象,由此提高服務的吞吐能力,減少CPU的閑置時間。具體組成部分包括:
(1)、線程池管理器(ThreadPool)用于創建和管理線程池,包括創建線程池、銷毀線程池,添加新任務。
(2)、工作線程(Worker)線程池中的線程,閑置的時候處于等待狀態,可以循環回收利用。
(3)、任務接口(Task)每個任務必須實現的接口類,為工作線程提供調用,主要規定了任務的入口、任務完成的收尾工作、任務的狀態。
(4)、等待隊列(Queue)存放等待處理的任務,提供緩沖機制。
1.2、線程池的種類
(1)、FixedThreadPool:固定數量的線程池,線程池中的線程數量是固定的,不會改變。
(2)、SingleThreadExecutor:單一線程池,線程池中只有一個線程。
(3)、CachedThreadPool:緩存線程池,線程池中的線程數量不固定,會根據需求的大小進行改變。
(4)、ScheduledThreadPool:計劃任務調度的線程池,用于執行計劃任務,比如每隔5分鐘怎么樣。
ThreadPoolExecutor構造函數中參數的含義:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}(1)、corePoolSize:線程池中核心線程數的數目
(2)、maximumPoolSize:線程池中最多能容納多少個線程
(3)、keepAliveTime:當現在線程數目大于corePoolSize時,超過keepAliveTime時間后,多出corePoolSize的那些線程將被終結。
(4)、unit:keepAliveTime的單位
(5)、workQueue:當任務數量很大,線程池中線程無法滿足時,提交的任務會被放到阻塞隊列中,線程空閑下來則會不斷從阻塞隊列中取數據。
(6)、threadFactory:執行程序創建新線程時使用的工廠。
(7)、handler:由于超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序。
RejectedExecutionHandler拒絕的4種策略:
(1)、AbortPolicy:如果不能接受任務了,則拋出異常。
(2)、CallerRunsPolicy:如果不能接受任務了,則讓調用的線程去完成。
(3)、DiscardOldestPolicy:如果不能接受任務了,則丟棄最老的一個任務,由一個隊列來維護。
(4)、DiscardPolicy:如果不能接受任務了,則丟棄任務。
1.3、攜帶結果的任務Callable和Future/FutureTask
(1)、Callable:解決Runnable接口不能返回一個值或受檢查的異常,可以采用Callable接口實現一個任務。
(2)、Future:表示異步計算的結果,可以對于具體的Runnable或者Callable任務進行查詢是否完成,查詢是否取消,獲取執行結果,取消任務等操作。
(3)、FutureTask:是一個RunnableFuture<V>,而RunnableFuture實現了Runnbale又實現了Futrue<V>這兩個接口。
1.4、Fork/Join框架
???????Fork/Join框架是Java7提供了的一個用于并行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。
? ? ? ?Fork/Join類似MapReduce算法,兩者區別是:Fork/Join 只有在必要時如任務非常大的情況下才分割成一個個小任務,而 MapReduce總是在開始執行第一步進行分割。看來,Fork/Join更適合一個JVM內線程級別,而MapReduce適合分布式系統。
(1)、工作竊取算法
? ? ? ?工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行。工作竊取的運行流程圖如下:
? ? ? ?那么為什么需要使用工作竊取算法呢?
? ? ? ?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
? ? ? ?工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并且消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。
(2)、如何設計一個Fork/Join框架?
? ? ? ?第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小。
? ? ? ?第二步執行任務并合并結果。分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合并這些數據。
? ? ? ?Fork/Join使用兩個類來完成以上兩件事情:
? ? ? ?1)、ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制,通常情況下我們不需要直接繼承ForkJoinTask類,而只需要繼承它的子類,Fork/Join框架提供了以下兩個子類:
? ? ? ?RecursiveAction:用于沒有返回結果的任務。
? ? ? ?RecursiveTask :用于有返回結果的任務。
? ? ? ?2)、ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。
(2)、Fork/Join框架的實現原理
? ? ? ?ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責存放程序提交給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。
? ? ? ?ForkJoinTask的fork方法實現原理:當我們調用ForkJoinTask的fork方法時,程序會調用ForkJoinWorkerThread的pushTask方法異步的執行這個任務,然后立即返回結果。pushTask方法把當前任務存放在ForkJoinTask 數組queue里。然后再調用ForkJoinPool的signalWork()方法喚醒或創建一個工作線程來執行任務。
? ? ? ?ForkJoinTask的join方法實現原理:Join方法的主要作用是阻塞當前線程并等待獲取結果。
(4)、Fork/Join框架的異常處理
? ? ? ?ForkJoinTask在執行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,并且可以通過ForkJoinTask的getException方法獲取異常。
二、JDK并發包
2.1、同步控制工具類
(1)、Semaphore
???????計數信號量,常用于限制可以訪問某些資源(物理或邏輯的)線程數目。是一個共享鎖,允許N個線程同時進入臨界區, 但是超出許可范圍的只能等待;如果N = 1, 則類似于lock
(2)、ReentrantLock
? ? ? ?簡而言之, 就是自由度更高的synchronized, 主要具備以下優點
? ? ? ?可重入:單線程可以重復進入,但要重復退出
? ? ? ?可中斷:lock.lockInterruptibly()
? ? ? ?可限時:超時不能獲得鎖,就返回false,不會永久等待構成死鎖
? ? ? ?公平鎖:先來先得,public ReentrantLock(boolean fair),默認鎖不公平的, 根據線程優先級競爭
(3)、Condition
? ? ? ?類似于 Object.wait()和Object.notify(), 需要與ReentrantLock結合使用.
(4)、ReadWriteLock
? ? ? ?讀寫分離鎖, 可以大幅提升系統并行度.
? ? ? ?讀-讀不互斥:讀讀之間不阻塞。
? ? ? ?讀-寫互斥:讀阻塞寫,寫也會阻塞讀。
? ? ? ?寫-寫互斥:寫寫阻塞。
(5)、CountDownLatch倒數計時器
? ? ? ?一種典型的場景就是火箭發射。在火箭發射前,為了保證萬無一失,往往還要進行各項設備、儀器的檢查。
只有等所有檢查完畢后,引擎才能點火。這種場景就非常適合使用CountDownLatch。它可以使得點火線程,?
等待所有檢查線程全部完工后,再執行
(6)、CyclicBarrier循環柵欄
? ? ? ?Cyclic意為循環,也就是說這個計數器可以反復使用。比如,假設我們將計數器設置為10。那么湊齊
第一批10個線程后,計數器就會歸零,然后接著湊齊下一批10個線程
(7)、LockSupport
? ? ? ?一個線程阻塞工具, 可以在任意位置讓線程阻塞。與wait/notify比較,如果unpark發生在park之前,?并不會導致線程凍結,也不需要獲取鎖
? ? ? ?LockSupport比Object的wait/notify有兩大優勢:
? ? ? ?1)、LockSupport不需要在同步代碼塊里 。所以線程間也不需要維護一個共享的同步對象了,實現了線程間的解耦
? ? ? ?2)、unpark函數可以先于park調用,所以不需要擔心線程間的執行的先后順序
2.2、并發容器
(1)、Collections.synchronizedMap
? ? ? ?其本質是在讀寫map操作上都加了鎖,因此不推薦在高并發場景使用
(2)、ConcurrentHashMap
? ? ? ?內部使用分區Segment來表示不同的部分, 每個分區其實就是一個小的hashtable,各自有自己的鎖,只要多個修改發生在不同的分區,他們就可以并發的進行。把一個整體分成了16個Segment,最高支持16個線程并發修改
(3)、BlockingQueue
? ? ? ?阻塞隊列, 主要用于多線程之間共享數據;當一個線程讀取數據時,如果隊列是空的,則當前線程會進入等待狀態;如果隊列滿了,當一個線程嘗試寫入數據時,同樣會進入等待狀態;適用于生產消費者模型,因為BlockingQueue在put、take等操作有鎖,因此非高性能容器;如果需要高并發支持的隊列,則可以使用ConcurrentLinkedQueue,他內部運用了大量無鎖操作
(4)、CopyOnWriteArrayList
? ? ? ?CopyOnWriteArrayList通過在新增元素時,復制一份新的數組出來,并在其中寫入數據,之后將原數組引用指向到新數組,其Add操作是在內部通過ReentrantLock進行鎖保護,防止多線程場景復制多份數組,而Read操作內部無鎖,直接返回數組引用,并發下效率高,因此適用于讀多寫少的場景
參考文章:
(1)、Java多線程并發編程一覽筆錄
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Java多线程并发编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 会议交流 | 最新NLP核心技术与前沿实
- 下一篇: 开源开放 | 疾病科室、心血管系统疾病知