CAS
CAS 是一種樂觀鎖,syncronized 是一種悲觀鎖
AtomicInteger
AtomicInteger count
= new AtomicInteger ( 0 ) ; void m ( ) { for ( int i
= 0 ; i
< 10000 ; i
++ ) count
. incrementAndGet ( ) ; }
普通long(Syncronized),AtomicLong,LongAdder比較
注意:以下是在1000個線程下測試得到的結果。如果是較少線程的情況,LongAdder的效率未必有這么高。
Atomic: 100000000 time 6277
Sync: 100000000 time 11591
LongAdder: 100000000 time 1783
為什么會出現這種現象? Atomic比Syncronized long快:Syncronized long可能會升級為操作系統的重量級鎖,Atomic是無鎖的CAS LongAdder比Atomic快:LongAdder內部做了一個類似于分段鎖,最終將每一個向上遞增的結果加到一起。
package com
. mashibing
. juc
. c_018_00_AtomicXXX
; import java
. util
. concurrent
. TimeUnit
;
import java
. util
. concurrent
. atomic
. AtomicLong
;
import java
. util
. concurrent
. atomic
. LongAdder
; public class T02_AtomicVsSyncVsLongAdder { static long count2
= 0 L
; static AtomicLong count1
= new AtomicLong ( 0 L
) ; static LongAdder count3
= new LongAdder ( ) ; public static void main ( String
[ ] args
) throws Exception
{ Thread
[ ] threads
= new Thread [ 1000 ] ; for ( int i
= 0 ; i
< threads
. length
; i
++ ) { threads
[ i
] = new Thread ( ( ) - > { for ( int k
= 0 ; k
< 100000 ; k
++ ) count1
. incrementAndGet ( ) ; } ) ; } long start
= System
. currentTimeMillis ( ) ; for ( Thread t
: threads
) t
. start ( ) ; for ( Thread t
: threads
) t
. join ( ) ; long end
= System
. currentTimeMillis ( ) ; System
. out
. println ( "Atomic: " + count1
. get ( ) + " time " + ( end
- start
) ) ; Object lock
= new Object ( ) ; for ( int i
= 0 ; i
< threads
. length
; i
++ ) { threads
[ i
] = new Thread ( new Runnable ( ) { @Override public void run ( ) { for ( int k
= 0 ; k
< 100000 ; k
++ ) synchronized ( lock
) { count2
++ ; } } } ) ; } start
= System
. currentTimeMillis ( ) ; for ( Thread t
: threads
) t
. start ( ) ; for ( Thread t
: threads
) t
. join ( ) ; end
= System
. currentTimeMillis ( ) ; System
. out
. println ( "Sync: " + count2
+ " time " + ( end
- start
) ) ; for ( int i
= 0 ; i
< threads
. length
; i
++ ) { threads
[ i
] = new Thread ( ( ) - > { for ( int k
= 0 ; k
< 100000 ; k
++ ) count3
. increment ( ) ; } ) ; } start
= System
. currentTimeMillis ( ) ; for ( Thread t
: threads
) t
. start ( ) ; for ( Thread t
: threads
) t
. join ( ) ; end
= System
. currentTimeMillis ( ) ; System
. out
. println ( "LongAdder: " + count1
. longValue ( ) + " time " + ( end
- start
) ) ; } static void microSleep ( int m
) { try { TimeUnit
. MICROSECONDS
. sleep ( m
) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } }
}
ReentrantLock 可重入鎖
reentrantlock用于替代synchronized 由于m1鎖定this,只有m1執行完畢的時候,m2才能執行 這里是復習synchronized最原始的語義 使用reentrantlock可以完成同樣的功能 需要注意的是,必須要必須要必須要手動釋放鎖 使用syn鎖定的話如果遇到異常,jvm會自動釋放鎖,但是lock必須手動釋放鎖,因此經常在finally中進行鎖的釋放 使用reentrantlock可以進行“嘗試鎖定”tryLock,這樣無法鎖定,或者在指定時間內無法鎖定,線程可以決定是否繼續等待 使用ReentrantLock還可以調用lockInterruptibly方法,可以對線程interrupt方法做出響應, 在一個線程等待鎖的過程中,可以被打斷 ReentrantLock還可以指定為公平鎖
package com
. mashibing
. juc
. c_020
; import java
. util
. concurrent
. locks
. ReentrantLock
; public class T05_ReentrantLock5 extends Thread { private static ReentrantLock lock
= new ReentrantLock ( true ) ; public void run ( ) { for ( int i
= 0 ; i
< 100 ; i
++ ) { lock
. lock ( ) ; try { System
. out
. println ( Thread
. currentThread ( ) . getName ( ) + "獲得鎖" ) ; } finally { lock
. unlock ( ) ; } } } public static void main ( String
[ ] args
) { T05_ReentrantLock5 rl
= new T05_ReentrantLock5 ( ) ; Thread th1
= new Thread ( rl
) ; Thread th2
= new Thread ( rl
) ; th1
. start ( ) ; th2
. start ( ) ; }
}
CountDownLatch
CountDownLatch和Join的對比: CountDownLatch可以更靈活,因為在一個線程中,CountDownLatch可以根據你的需要countDown很多次。而Join是等待所有join進來的線程結束之后才繼續執行被join的線程。
package com
. mashibing
. juc
. c_020
; import java
. util
. concurrent
. CountDownLatch
; public class T06_TestCountDownLatch { public static void main ( String
[ ] args
) { usingJoin ( ) ; usingCountDownLatch ( ) ; } private static void usingCountDownLatch ( ) { Thread
[ ] threads
= new Thread [ 100 ] ; CountDownLatch latch
= new CountDownLatch ( threads
. length
) ; for ( int i
= 0 ; i
< threads
. length
; i
++ ) { threads
[ i
] = new Thread ( ( ) - > { int result
= 0 ; for ( int j
= 0 ; j
< 10000 ; j
++ ) result
+= j
; latch
. countDown ( ) ; } ) ; } for ( int i
= 0 ; i
< threads
. length
; i
++ ) { threads
[ i
] . start ( ) ; } try { latch
. await ( ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } System
. out
. println ( "end latch" ) ; } private static void usingJoin ( ) { Thread
[ ] threads
= new Thread [ 100 ] ; for ( int i
= 0 ; i
< threads
. length
; i
++ ) { threads
[ i
] = new Thread ( ( ) - > { int result
= 0 ; for ( int j
= 0 ; j
< 10000 ; j
++ ) result
+= j
; } ) ; } for ( int i
= 0 ; i
< threads
. length
; i
++ ) { threads
[ i
] . start ( ) ; } for ( int i
= 0 ; i
< threads
. length
; i
++ ) { try { threads
[ i
] . join ( ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } } System
. out
. println ( "end join" ) ; }
}
CyclicBarrier 循環柵欄
CyclicBarrier意思是循環柵欄。這里有一個柵欄,什么時候人滿了,就把柵欄推倒,嘩啦嘩啦的都放出去,出去之后,柵欄又重新起來,再來人,滿了推倒,以此類推。
下面的程序有兩個參數,第二個參數不傳也是可以的,就是滿了之后不做任何事情。第一個參數是20,滿了之后幫我調用第二個參數的指定動作。我們這個指定的動作就是一個Runnable對象,打印:滿人,發車啦!然后就發車。
適用于:有一個線程,必須等其他線程結束了,才能繼續執行,這種情況。
舉例來說,CyclicBarrier的概念呢,比如說一個復雜的操作,需要訪問數據庫,需要訪問網絡,需要訪問文件,有一種方式是順序執行,挨個的都執行完,效率非常低。這是一種方式,還有一種可能性就是病發執行原來123,順序執行并。發執執行是,不同的線程去執行不同的操作,有的線程去數據庫找有的線程去網絡訪問,有的線程去讀文件。必須是這三個線程全部到位了,我才能去進行,這個時候我們就可以用CyclicBarrier。
package com
. mashibing
. juc
. c_020
; import java
. util
. concurrent
. BrokenBarrierException
;
import java
. util
. concurrent
. CyclicBarrier
; public class T07_TestCyclicBarrier { public static void main ( String
[ ] args
) { CyclicBarrier barrier
= new CyclicBarrier ( 20 , ( ) - > System
. out
. println ( "滿人,發車啦" ) ) ; for ( int i
= 0 ; i
< 100 ; i
++ ) { new Thread ( ( ) - > { try { barrier
. await ( ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } catch ( BrokenBarrierException e
) { e
. printStackTrace ( ) ; } } ) . start ( ) ; } }
}
Phaser
這個可以用來拓寬知識面,目前還沒有人面試被問到。
Phaser是按照不同的階段來對線程進行執行 。
他本身維護一個階段這樣的一個成員變量,當前我是執行到哪個階段?是第零個,還是第一個,還是第二個階段,等等。
每個階段不同的時候,這個線程都可以往前走。有的線程走到某個階段就停了,有的線程一直會走到結束你的程序中,如果說用到分好幾個階段執行,而且有的人必須得幾個共同參與的同一種情況的情況下,可能會用到這個Phaser。
p4 到達現場!
新郎 到達現場!
p2 到達現場!
p0 到達現場!
p3 到達現場!
新娘 到達現場!
p1 到達現場!
所有人到齊了!7p4 吃完!
p1 吃完!
新郎 吃完!
p2 吃完!
p0 吃完!
p3 吃完!
新娘 吃完!
所有人吃完了!7p3 離開!
新郎 離開!
p1 離開!
p0 離開!
p4 離開!
p2 離開!
新娘 離開!
所有人離開了!7新郎 洞房!
新娘 洞房!
婚禮結束!新郎新娘抱抱!2Process finished with exit code 0
package com
. mashibing
. juc
. c_020
; import java
. util
. Random
;
import java
. util
. concurrent
. Phaser
;
import java
. util
. concurrent
. TimeUnit
; public class T09_TestPhaser2 { static Random r
= new Random ( ) ; static MarriagePhaser phaser
= new MarriagePhaser ( ) ; static void milliSleep ( int milli
) { try { TimeUnit
. MILLISECONDS
. sleep ( milli
) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } } public static void main ( String
[ ] args
) { phaser
. bulkRegister ( 7 ) ; for ( int i
= 0 ; i
< 5 ; i
++ ) { new Thread ( new Person ( "person" + i
) ) . start ( ) ; } new Thread ( new Person ( "新郎" ) ) . start ( ) ; new Thread ( new Person ( "新娘" ) ) . start ( ) ; } static class MarriagePhaser extends Phaser { @Override protected boolean onAdvance ( int phase
, int registeredParties
) { switch ( phase
) { case 0 : System
. out
. println ( "所有人到齊了!" + registeredParties
) ; System
. out
. println ( ) ; return false ; case 1 : System
. out
. println ( "所有人吃完了!" + registeredParties
) ; System
. out
. println ( ) ; return false ; case 2 : System
. out
. println ( "所有人離開了!" + registeredParties
) ; System
. out
. println ( ) ; return false ; case 3 : System
. out
. println ( "婚禮結束!新郎新娘抱抱!" + registeredParties
) ; return true ; default : return true ; } } } static class Person implements Runnable { String name
; public Person ( String name
) { this . name
= name
; } public void arrive ( ) { milliSleep ( r
. nextInt ( 1000 ) ) ; System
. out
. printf ( "%s 到達現場!\n" , name
) ; phaser
. arriveAndAwaitAdvance ( ) ; } public void eat ( ) { milliSleep ( r
. nextInt ( 1000 ) ) ; System
. out
. printf ( "%s 吃完!\n" , name
) ; phaser
. arriveAndAwaitAdvance ( ) ; } public void leave ( ) { milliSleep ( r
. nextInt ( 1000 ) ) ; System
. out
. printf ( "%s 離開!\n" , name
) ; phaser
. arriveAndAwaitAdvance ( ) ; } private void hug ( ) { if ( name
. equals ( "新郎" ) || name
. equals ( "新娘" ) ) { milliSleep ( r
. nextInt ( 1000 ) ) ; System
. out
. printf ( "%s 洞房!\n" , name
) ; phaser
. arriveAndAwaitAdvance ( ) ; } else { phaser
. arriveAndDeregister ( ) ; } } @Override public void run ( ) { arrive ( ) ; eat ( ) ; leave ( ) ; hug ( ) ; } }
}
ReadWriteLock
這個面試常考。
這個ReadWriteLock是讀寫鎖 。讀寫鎖的概念,其實就是共享鎖 和排他鎖
讀鎖 是共享鎖 ,寫鎖 就是排他鎖 。
那這個是什么意思? 讀寫有很多種情況,比如,你數據庫里的某條數據,你放在內存里讀的時候特別多 ,你改的次數并不多 。這時候將讀寫的鎖分開,會大大提高效率,因為讀操作本質上是可以允許多個線程同時進行的。
package com
. mashibing
. juc
. c_020
; import java
. util
. Random
;
import java
. util
. concurrent
. atomic
. LongAdder
;
import java
. util
. concurrent
. locks
. Lock
;
import java
. util
. concurrent
. locks
. ReadWriteLock
;
import java
. util
. concurrent
. locks
. ReentrantLock
;
import java
. util
. concurrent
. locks
. ReentrantReadWriteLock
; public class T10_TestReadWriteLock { static Lock lock
= new ReentrantLock ( ) ; private static int value
; static ReadWriteLock readWriteLock
= new ReentrantReadWriteLock ( ) ; static Lock readLock
= readWriteLock
. readLock ( ) ; static Lock writeLock
= readWriteLock
. writeLock ( ) ; public static void read ( Lock lock
) { try { lock
. lock ( ) ; Thread
. sleep ( 1000 ) ; System
. out
. println ( "read Finish!" ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } finally { lock
. unlock ( ) ; } } public static void write ( Lock lock
, int v
) { try { lock
. lock ( ) ; Thread
. sleep ( 1000 ) ; value
= v
; System
. out
. println ( "write Finish!" ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } finally { lock
. unlock ( ) ; } } public static void main ( String
[ ] args
) { Runnable readR
= ( ) - > read ( readLock
) ; Runnable writeR
= ( ) - > write ( writeLock
, new Random ( ) . nextInt ( ) ) ; for ( int i
= 0 ; i
< 18 ; i
++ ) new Thread ( readR
) . start ( ) ; for ( int i
= 0 ; i
< 2 ; i
++ ) new Thread ( writeR
) . start ( ) ; }
}
Semaphore 信號量
Semaphore 可以用于限流:最多允許多少個 線程同時在運行 (我理解的semaphore類似于令牌桶)
package com
. mashibing
. juc
. c_020
; import java
. util
. concurrent
. Semaphore
; public class T11_TestSemaphore { public static void main ( String
[ ] args
) { Semaphore semaphore
= new Semaphore ( 2 , true ) ; new Thread ( ( ) - > { try { semaphore
. acquire ( ) ; System
. out
. println ( "T1 running..." ) ; Thread
. sleep ( 200 ) ; System
. out
. println ( "T1 running..." ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } finally { semaphore
. release ( ) ; } } ) . start ( ) ; new Thread ( ( ) - > { try { semaphore
. acquire ( ) ; System
. out
. println ( "T2 running..." ) ; Thread
. sleep ( 200 ) ; System
. out
. println ( "T2 running..." ) ; semaphore
. release ( ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } } ) . start ( ) ; }
}
Exchanger
基本上 不會 被問到,也是拓展知識面用的。 可以想象 exchanger 是一個容器 ,用來在兩個線程之間交換變量 。
示例: (可能的場景:游戲中兩個人交換裝備)
package com
. mashibing
. juc
. c_020
; import java
. util
. concurrent
. Exchanger
; public class T12_TestExchanger { static Exchanger
< String> exchanger
= new Exchanger < > ( ) ; public static void main ( String
[ ] args
) { new Thread ( ( ) - > { String str
= "T1" ; try { str
= exchanger
. exchange ( str
) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } System
. out
. println ( Thread
. currentThread ( ) . getName ( ) + " " + str
) ; } , "t1" ) . start ( ) ; new Thread ( ( ) - > { String str
= "T2" ; try { str
= exchanger
. exchange ( str
) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } System
. out
. println ( Thread
. currentThread ( ) . getName ( ) + " " + str
) ; } , "t2" ) . start ( ) ; }
}
總結
以上是生活随笔 為你收集整理的多线程与高并发(三):JUC包下新的同步机制:CAS,AtomicInteger,AtomicLong,ReentrantLock,CountDownLatch,ReadWriteLock等 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。