1. Synchronized
零 簡單用法
??同步關(guān)鍵字,用兩種用法,一種是加在方法簽名上,一種是里面包著一個monitor對象。
public class SyncDemo {private static int[] array = {0};private static synchronized void change(){array[0]++;}/****/public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10; i++) {new Thread(SyncDemo::change).start();}Thread.currentThread().join(1000);System.out.println(array[0]);} }一 wait與notify
??wait方法與notify是對應(yīng)的。wait方法定義在Object類中,當(dāng)對象成為monitor時可以調(diào)用。Wait,具體含義就是等待別人notify。我講一下細(xì)節(jié)Synchronized關(guān)鍵字后面有個括號,里面放個對象,這個對象就是monitor。如果沒有加括號,那么靜態(tài)方法上class對象就是monitor,非靜態(tài)方法,this就是monitor。
 ??默認(rèn)是這樣的,N個線程同步一個monitor對象。第一個線程拿到了monitor,其他線程只能等待,那么何時能主動調(diào)用wait呢?
 ??就是拿到了monitor,這時候進(jìn)入synchronized方法時,可以調(diào)用monitor對象的wait方法,也就是主動放棄辛苦拿到的monitor。而且wait方法只能由monitor去調(diào)用,否則會拋java.lang.IllegalMonitorStateException異常。
 ??那么我們寫個demo。AB兩個線程,先讓B運行,B運行到一半時,讓A運行。A運行結(jié)束了,再運行B。那么我們需要三個對象:線程A、線程B和monitor。線程A,很簡單,如果拿到monitor就調(diào)用monitor的wait讓出去,然后再執(zhí)行自己的方法。執(zhí)行完了之后調(diào)用notify。線程B,拿到monitor之后,執(zhí)行自己的任務(wù),執(zhí)行到一半,再notify,然后繼續(xù)wait。
 ??流程如下
 ??A等待B運行 -> b 通知A -> b等待 -> A運行 -> A通知B -> B繼續(xù)運行。
 ??A 的代碼:
??B的代碼
public class ThreadB extends Thread {private Object monitor;public ThreadB(Object monitor) {this.monitor = monitor;}@Overridepublic void run() {synchronized (monitor) {for (int i = 1; i <= 100; i++) {try {System.out.print("\rB is running. " + i + " %.");if (i == 50) {System.out.println();monitor.notify();monitor.wait();}Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}}}??測試類
public static void main(String[] args) {Object lock = new Object();new ThreadA(lock).start();new ThreadB(lock).start(); }??運行結(jié)果
B is running. 50 %. A is running B is running. 100 %.??源碼網(wǎng)址:https://e.coding.net/buildt/learn/thread.git
二 wait與interrupt
??除notify外,interrupt也可以中斷wait的狀態(tài),但是會引發(fā)InterruptedException異常。所以需要在wait代碼中catch這個異常。我覺得interrupt適用于獲取不到monitor的情況。此外,java的debug框架,比如說IDEA的debug,可以手動中斷waiting中的線程,如下圖所示:
 
 ??需要注意的是,catch代碼塊需要重復(fù)中斷一次以維護(hù)線程的中斷標(biāo)志位。如以下代碼:
??我寫了個簡單的main函數(shù)中斷這個程序。可以通過控制臺輸入1或者debug啟動鼠標(biāo)點擊線程來中斷這個waiting的線程。
public class WaitInterruptDemo {public static void main(String[] args) throws IOException {final Thread thead = new Thread(new WaitingThead());thead.start();final int read = System.in.read();if (read == '1') {if (!thead.isInterrupted()) {System.out.println((char) read);thead.interrupt();}}System.out.println(thead.isInterrupted());} }三 線程的狀態(tài)
??由上面的例子,我們可以研究線程的狀態(tài)。線程總共6種狀態(tài),在線程類的枚舉java.lang.Thread.State中。總共有以下六種狀態(tài):NEW、RUNNABLE、BLOCKED、 WAITING、TIMED_WAITING、TERMINATED。
 ??New是線程start之前的狀態(tài)。
 ??Runnable是線程start之后的狀態(tài)。
 ??Terminated是線程結(jié)束之后的狀態(tài)
 ??如果遇到monitor,這些狀態(tài)就復(fù)雜了,于是乎就多了三種狀態(tài)。
 ??兩個線程同步同一個Monitor之后,其中一個線程獲取鎖,進(jìn)入運行中之后,另一個線程進(jìn)入BLOCKED狀態(tài)。
 ??只有當(dāng)線程調(diào)用了WAIT之后才會進(jìn)入WAITING狀態(tài)。
 ??線程調(diào)用了sleep,或者調(diào)用了monitor的帶時間參數(shù)的wait方法之后就進(jìn)入了TIMED_WAITING狀態(tài)。
 ??Blocked與Waiting的相同點是,都處于不能運行的狀態(tài)。區(qū)別在于,當(dāng)monitor被釋放后。Blocked狀態(tài)的線程會馬上進(jìn)入運行狀態(tài),而waiting狀態(tài)的線程繼續(xù)waiting。
 ??只有被notify之后,waiting狀態(tài)的線程才會變成runnable。
四 synchronized實現(xiàn)原理
??網(wǎng)上的說法是三級結(jié)構(gòu),偏向鎖->輕量級鎖->重量級鎖。一步一步鎖升級,一步一步鎖膨脹。首先必須得看看源碼,源碼路徑為:hotspot/src/share/vm/runtime/synchronizer.hpp。
 ??源碼里定義了三種進(jìn)入方法:
??看快速進(jìn)入方法,這里首先判斷是否只有一個線程,如果是則直接返回,否則使用慢速進(jìn)入方法,也就是我們說的輕量級鎖。
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {if (UseBiasedLocking) {if (!SafepointSynchronize::is_at_safepoint()) {BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {return;}} else {assert(!attempt_rebias, "can not rebias toward VM thread");BiasedLocking::revoke_at_safepoint(obj);}assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");}slow_enter (obj, lock, THREAD) ; }??再看看輕量級鎖代碼,注意mark->is_neutral(),這個判斷是否為“中性”,就是輕量級鎖的條件,而這個的判斷是讀取對象頭判斷的。如果不滿足這個條件,就進(jìn)入重量級鎖。也就是enter方法。
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {markOop mark = obj->mark();assert(!mark->has_bias_pattern(), "should not see bias pattern here");if (mark->is_neutral()) {// Anticipate successful CAS -- the ST of the displaced mark must// be visible <= the ST performed by the CAS.lock->set_displaced_header(mark);if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {TEVENT (slow_enter: release stacklock) ;return ;}// Fall through to inflate() ...} elseif (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {assert(lock != mark->locker(), "must not re-lock the same lock");assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");lock->set_displaced_header(NULL);return;}// The object header will never be displaced to this lock,// so it does not matter what the value is, except that it// must be non-zero to avoid looking like a re-entrant lock,// and must not look locked either.lock->set_displaced_header(markOopDesc::unused_mark());ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD); }// This routine is used to handle interpreter/compiler slow case // We don't need to use fast path here, because it must have // failed in the interpreter/compiler code. Simply use the heavy // weight monitor should be ok, unless someone find otherwise. void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {fast_exit (object, lock, THREAD) ; }??而JNI,也就是編譯后的鎖進(jìn)入方法,也是在JIT編譯器編譯后的代碼執(zhí)行的邏輯。從代碼可以看出,直接膨脹為重量級鎖。
void ObjectSynchronizer::jni_enter(Handle obj, TRAPS) { // possible entry from jni enter// the current locking is from JNI instead of Java codeTEVENT (jni_enter) ;if (UseBiasedLocking) {BiasedLocking::revoke_and_rebias(obj, false, THREAD);assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");}THREAD->set_current_pending_monitor_is_from_java(false);ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);THREAD->set_current_pending_monitor_is_from_java(true); }??所以總結(jié)一下,分為三種模式,JIT編譯器編譯為本地多了個鎖消除優(yōu)化,也就是無鎖狀態(tài),連偏向鎖都不需要了。
| 解釋器 | 偏向鎖->輕量級鎖->重量級鎖 | 
| 編譯器 | 偏向鎖或鎖消除->輕量級鎖->重量級鎖 | 
| JNI | 偏向鎖->重量級鎖 | 
五 偏向鎖升級過程
??首先看解釋器的場景。解釋器先會嘗試,如果能獲取偏向鎖,也就是發(fā)現(xiàn)沒有競爭,就直接執(zhí)行方法了,也就是不執(zhí)行虛擬機(jī)monitorenter指令,解釋器代碼在hotspot/src/share/vm/interpreter/bytecodeInterpreter.cpp的BytecodeInterpreter::run(interpreterState istate) 方法中的case method_entry代碼塊。執(zhí)行monitorenter指令時,就會調(diào)用fast_enter還會再嘗試一次調(diào)用偏向鎖。如果這時候還未成功,才會變成輕量級鎖,這就是revoke(再調(diào)用)的re(再)的由來。解釋器部分代碼如下:
if (!success) {markOop displaced = rcvr->mark()->set_unlocked();mon->lock()->set_displaced_header(displaced);if (Atomic::cmpxchg_ptr(mon, rcvr->mark_addr(), displaced) != displaced) {// Is it simple recursive case?if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {mon->lock()->set_displaced_header(NULL);} else {CALL_VM(InterpreterRuntime::monitorenter(THREAD, mon), handle_exception);}}}??再看看編譯器的場景。源碼位于hotspot/src/share/vm/c1/c1_Runtime1.cpp。但我找到的源碼是編譯器模式monitorenter實現(xiàn),內(nèi)部直接就調(diào)用了fast_enter方法。但是我不確定編譯器模式有沒有做像解釋器那樣連monifterenter指令都不調(diào)用的偏向鎖優(yōu)化。
JRT_ENTRY_NO_ASYNC(void, Runtime1::monitorenter(JavaThread* thread, oopDesc* obj, BasicObjectLock* lock))NOT_PRODUCT(_monitorenter_slowcase_cnt++;)if (PrintBiasedLockingStatistics) {Atomic::inc(BiasedLocking::slow_path_entry_count_addr());}Handle h_obj(thread, obj);assert(h_obj()->is_oop(), "must be NULL or an object");if (UseBiasedLocking) {// Retry fast entry if bias is revoked to avoid unnecessary inflationObjectSynchronizer::fast_enter(h_obj, lock->lock(), true, CHECK);} else {if (UseFastLocking) {// When using fast locking, the compiled code has already tried the fast caseassert(obj == lock->obj(), "must match");ObjectSynchronizer::slow_enter(h_obj, lock->lock(), THREAD);} else {lock->set_obj(obj);ObjectSynchronizer::fast_enter(h_obj, lock->lock(), false, THREAD);}} JRT_END??偏向鎖的升級代碼在hotspot/src/share/vm/runtime/biasedLocking.cpp文件的revoke_bias函數(shù)中。這段代碼有點長啊。這段就是網(wǎng)上常說的,如果線程1存活,線程2競爭不到就升級為輕量級鎖。注釋我翻譯一下:
 ??偏向線程存活。繼續(xù)檢查它是否擁有鎖,如果擁有,那就將頭寫入線程的棧中,否則,將對象頭恢復(fù)到無鎖或無偏向狀態(tài)。
??這個操作還是在fast_enter里判斷了返回值,因為這種場景返回值時BIAS_REVOKED,這個枚舉不包含偏向,所以進(jìn)入slow_enter,也就是輕量級鎖代碼。
五 重量級鎖
??重量級鎖的方法,不在這這個cpp文件里定義。文件為hotspot/src/share/vm/runtime/objectMonitor.cpp。重量級鎖,字段很多,但是最重要的是三個集合:_cxq,_EntryList和_WaitSet。cxq的定義是最近到達(dá)線程,在競爭時,線程先加入到cxq。后序再從cxq遷移到EntryList。而WaitSet是調(diào)用了wait方法的線程集合。結(jié)構(gòu)上cxq是單鏈表,而EntryList是雙鏈表。那程序員要關(guān)注的重點是什么?畢竟重量級鎖這么復(fù)雜。
 ??首先inflate方法,只是把輕量級鎖轉(zhuǎn)為重量級鎖。
??在重量級鎖獲取不到時,會調(diào)用調(diào)用Self->_ParkEvent->park()函數(shù)進(jìn)入BLOCKED狀態(tài),以下代碼位于void ATTR ObjectMonitor::EnterI (TRAPS)函數(shù),源碼在hotspot/src/share/vm/runtime/objectMonitor.cpp中:
for (;;) {if (TryLock (Self) > 0) break ;assert (_owner != Self, "invariant") ;if ((SyncFlags & 2) && _Responsible == NULL) {Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;}// park selfif (_Responsible == Self || (SyncFlags & 1)) {TEVENT (Inflated enter - park TIMED) ;Self->_ParkEvent->park ((jlong) RecheckInterval) ;// Increase the RecheckInterval, but clamp the value.RecheckInterval *= 8 ;if (RecheckInterval > 1000) RecheckInterval = 1000 ;} else {TEVENT (Inflated enter - park UNTIMED) ;Self->_ParkEvent->park() ;}if (TryLock(Self) > 0) break ;// The lock is still contested.// Keep a tally of the # of futile wakeups.// Note that the counter is not protected by a lock or updated by atomics.// That is by design - we trade "lossy" counters which are exposed to// races during updates for a lower probe effect.TEVENT (Inflated enter - Futile wakeup) ;if (ObjectMonitor::_sync_FutileWakeups != NULL) {ObjectMonitor::_sync_FutileWakeups->inc() ;}++ nWakeups ;// Assuming this is not a spurious wakeup we'll normally find _succ == Self.// We can defer clearing _succ until after the spin completes// TrySpin() must tolerate being called with _succ == Self.// Try yet another round of adaptive spinning.if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;// We can find that we were unpark()ed and redesignated _succ while// we were spinning. That's harmless. If we iterate and call park(),// park() will consume the event and return immediately and we'll// just spin again. This pattern can repeat, leaving _succ to simply// spin on a CPU. Enable Knob_ResetEvent to clear pending unparks().// Alternately, we can sample fired() here, and if set, forgo spinning// in the next iteration.if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {Self->_ParkEvent->reset() ;OrderAccess::fence() ;}if (_succ == Self) _succ = NULL ;// Invariant: after clearing _succ a thread *must* retry _owner before parking.OrderAccess::fence() ;}??而park的實現(xiàn)各個操作系統(tǒng)不一樣,以Windows為例子,調(diào)用了Windows系統(tǒng)獨有的方法WaitForSingleObject,源代碼處于hotspot/src/os/windows/vm/os_windows.cpp中:
void os::PlatformEvent::park () {guarantee (_ParkHandle != NULL, "Invariant") ;// Invariant: Only the thread associated with the Event/PlatformEvent// may call park().int v ;for (;;) {v = _Event ;if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;}guarantee ((v == 0) || (v == 1), "invariant") ;if (v != 0) return ;// Do this the hard way by blocking ...// TODO: consider a brief spin here, gated on the success of recent// spin attempts by this thread.while (_Event < 0) {DWORD rv = ::WaitForSingleObject (_ParkHandle, INFINITE) ;assert (rv == WAIT_OBJECT_0, "WaitForSingleObject failed") ;}// Usually we'll find _Event == 0 at this point, but as// an optional optimization we clear it, just in case can// multiple unpark() operations drove _Event up to 1._Event = 0 ;OrderAccess::fence() ;guarantee (_Event >= 0, "invariant") ; }總結(jié)
以上是生活随笔為你收集整理的1. Synchronized的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 平衡记分卡(BSC)初探
 - 下一篇: 战略绩效管理工具:平衡计分卡简介