Java并发编程实战~软件事务内存
很多同學反饋說,工作了挺長時間但是沒有機會接觸并發編程,實際上我們天天都在寫并發程序,只不過并發相關的問題都被類似 Tomcat 這樣的 Web 服務器以及 MySQL 這樣的數據庫解決了。尤其是數據庫,在解決并發問題方面,可謂成績斐然,它的事務機制非常簡單易用,能甩 Java 里面的鎖、原子類十條街。技術無邊界,很顯然要借鑒一下。
其實很多編程語言都有從數據庫的事務管理中獲得靈感,并且總結出了一個新的并發解決方案:軟件事務內存(Software Transactional Memory,簡稱 STM)。傳統的數據庫事務,支持 4 個特性:原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability),也就是大家常說的 ACID,STM 由于不涉及到持久化,所以只支持 ACI。
STM 的使用很簡單,下面我們以經典的轉賬操作為例,看看用 STM 該如何實現。
用 STM 實現轉賬
我們曾經在《一不小心就死鎖了》這篇文章中,講到了并發轉賬的例子,示例代碼如下。簡單地使用 synchronized 將 transfer() 方法變成同步方法并不能解決并發問題,因為還存在死鎖問題。
class UnsafeAccount {//余額private long balance;//構造函數public UnsafeAccount(long balance) {this.balance = balance;}//轉賬void transfer(UnsafeAccount target, long amt){if (this.balance > amt) {this.balance -= amt;target.balance += amt;}} }該轉賬操作若使用數據庫事務就會非常簡單,如下面的示例代碼所示。如果所有 SQL 都正常執行,則通過 commit() 方法提交事務;如果 SQL 在執行過程中有異常,則通過 rollback() 方法回滾事務。數據庫保證在并發情況下不會有死鎖,而且還能保證前面我們說的原子性、一致性、隔離性和持久性,也就是 ACID。
Connection conn = null; try{//獲取數據庫連接conn = DriverManager.getConnection();//設置手動提交事務conn.setAutoCommit(false);//執行轉賬SQL......//提交事務conn.commit(); } catch (Exception e) {//出現異常回滾事務conn.rollback(); }那如果用 STM 又該如何實現呢?Java 語言并不支持 STM,不過可以借助第三方的類庫來支持,Multiverse就是個不錯的選擇。下面的示例代碼就是借助 Multiverse 實現了線程安全的轉賬操作,相比較上面線程不安全的 UnsafeAccount,其改動并不大,僅僅是將余額的類型從 long 變成了 TxnLong ,將轉賬的操作放到了 atomic(()->{}) 中。
class Account{//余額private TxnLong balance;//構造函數public Account(long balance){this.balance = StmUtils.newTxnLong(balance);}//轉賬public void transfer(Account to, int amt){//原子化操作atomic(()->{if (this.balance.get() > amt) {this.balance.decrement(amt);to.balance.increment(amt);}});} }一個關鍵的 atomic() 方法就把并發問題解決了,這個方案看上去比傳統的方案的確簡單了很多,那它是如何實現的呢?數據庫事務發展了幾十年了,目前被廣泛使用的是 MVCC(全稱是 Multi-Version Concurrency Control),也就是多版本并發控制。
MVCC 可以簡單地理解為數據庫事務在開啟的時候,會給數據庫打一個快照,以后所有的讀寫都是基于這個快照的。當提交事務的時候,如果所有讀寫過的數據在該事務執行期間沒有發生過變化,那么就可以提交;如果發生了變化,說明該事務和有其他事務讀寫的數據沖突了,這個時候是不可以提交的。
為了記錄數據是否發生了變化,可以給每條數據增加一個版本號,這樣每次成功修改數據都會增加版本號的值。MVCC 的工作原理和我們曾經在《StampedLock》中提到的樂觀鎖非常相似。有不少 STM 的實現方案都是基于 MVCC 的,例如知名的 Clojure STM。
下面我們就用最簡單的代碼基于 MVCC 實現一個簡版的 STM,這樣你會對 STM 以及 MVCC 的工作原理有更深入的認識。
自己實現 STM
我們首先要做的,就是讓 Java 中的對象有版本號,在下面的示例代碼中,VersionedRef 這個類的作用就是將對象 value 包裝成帶版本號的對象。按照 MVCC 理論,數據的每一次修改都對應著一個唯一的版本號,所以不存在僅僅改變 value 或者 version 的情況,用不變性模式就可以很好地解決這個問題,所以 VersionedRef 這個類被我們設計成了不可變的。
所有對數據的讀寫操作,一定是在一個事務里面,TxnRef 這個類負責完成事務內的讀寫操作,讀寫操作委托給了接口 Txn,Txn 代表的是讀寫操作所在的當前事務, 內部持有的 curRef 代表的是系統中的最新值。
//帶版本號的對象引用 public final class VersionedRef<T> {final T value;final long version;//構造方法public VersionedRef(T value, long version) {this.value = value;this.version = version;} } //支持事務的引用 public class TxnRef<T> {//當前數據,帶版本號volatile VersionedRef curRef;//構造方法public TxnRef(T value) {this.curRef = new VersionedRef(value, 0L);}//獲取當前事務中的數據public T getValue(Txn txn) {return txn.get(this);}//在當前事務中設置數據public void setValue(T value, Txn txn) {txn.set(this, value);} }STMTxn 是 Txn 最關鍵的一個實現類,事務內對于數據的讀寫,都是通過它來完成的。STMTxn 內部有兩個 Map:inTxnMap,用于保存當前事務中所有讀寫的數據的快照;writeMap,用于保存當前事務需要寫入的數據。每個事務都有一個唯一的事務 ID txnId,這個 txnId 是全局遞增的。
STMTxn 有三個核心方法,分別是讀數據的 get() 方法、寫數據的 set() 方法和提交事務的 commit() 方法。其中,get() 方法將要讀取數據作為快照放入 inTxnMap,同時保證每次讀取的數據都是一個版本。set() 方法會將要寫入的數據放入 writeMap,但如果寫入的數據沒被讀取過,也會將其放入 inTxnMap。
至于 commit() 方法,我們為了簡化實現,使用了互斥鎖,所以事務的提交是串行的。commit() 方法的實現很簡單,首先檢查 inTxnMap 中的數據是否發生過變化,如果沒有發生變化,那么就將 writeMap 中的數據寫入(這里的寫入其實就是 TxnRef 內部持有的 curRef);如果發生過變化,那么就不能將 writeMap 中的數據寫入了。
//事務接口 public interface Txn {<T> T get(TxnRef<T> ref);<T> void set(TxnRef<T> ref, T value); } //STM事務實現類 public final class STMTxn implements Txn {//事務ID生成器private static AtomicLong txnSeq = new AtomicLong(0);//當前事務所有的相關數據private Map<TxnRef, VersionedRef> inTxnMap = new HashMap<>();//當前事務所有需要修改的數據private Map<TxnRef, Object> writeMap = new HashMap<>();//當前事務IDprivate long txnId;//構造函數,自動生成當前事務IDSTMTxn() {txnId = txnSeq.incrementAndGet();}//獲取當前事務中的數據@Overridepublic <T> T get(TxnRef<T> ref) {//將需要讀取的數據,加入inTxnMapif (!inTxnMap.containsKey(ref)) {inTxnMap.put(ref, ref.curRef);}return (T) inTxnMap.get(ref).value;}//在當前事務中修改數據@Overridepublic <T> void set(TxnRef<T> ref, T value) {//將需要修改的數據,加入inTxnMapif (!inTxnMap.containsKey(ref)) {inTxnMap.put(ref, ref.curRef);}writeMap.put(ref, value);}//提交事務boolean commit() {synchronized (STM.commitLock) {//是否校驗通過boolean isValid = true;//校驗所有讀過的數據是否發生過變化for(Map.Entry<TxnRef, VersionedRef> entry : inTxnMap.entrySet()){VersionedRef curRef = entry.getKey().curRef;VersionedRef readRef = entry.getValue();//通過版本號來驗證數據是否發生過變化if (curRef.version != readRef.version) {isValid = false;break;}}//如果校驗通過,則所有更改生效if (isValid) {writeMap.forEach((k, v) -> {k.curRef = new VersionedRef(v, txnId);});}return isValid;} }下面我們來模擬實現 Multiverse 中的原子化操作 atomic()。atomic() 方法中使用了類似于 CAS 的操作,如果事務提交失敗,那么就重新創建一個新的事務,重新執行。
@FunctionalInterface public interface TxnRunnable {void run(Txn txn); } //STM public final class STM {//私有化構造方法private STM() {//提交數據需要用到的全局鎖 static final Object commitLock = new Object();//原子化提交方法public static void atomic(TxnRunnable action) {boolean committed = false;//如果沒有提交成功,則一直重試while (!committed) {//創建新的事務STMTxn txn = new STMTxn();//執行業務邏輯action.run(txn);//提交事務committed = txn.commit();}} }}就這樣,我們自己實現了 STM,并完成了線程安全的轉賬操作,使用方法和 Multiverse 差不多,這里就不贅述了,具體代碼如下面所示。
class Account {//余額private TxnRef<Integer> balance;//構造方法public Account(int balance) {this.balance = new TxnRef<Integer>(balance);}//轉賬操作public void transfer(Account target, int amt){STM.atomic((txn)->{Integer from = balance.getValue(txn);balance.setValue(from-amt, txn);Integer to = target.balance.getValue(txn);target.balance.setValue(to+amt, txn);});} }總結
STM 借鑒的是數據庫的經驗,數據庫雖然復雜,但僅僅存儲數據,而編程語言除了有共享變量之外,還會執行各種 I/O 操作,很顯然 I/O 操作是很難支持回滾的。所以,STM 也不是萬能的。目前支持 STM 的編程語言主要是函數式語言,函數式語言里的數據天生具備不可變性,利用這種不可變性實現 STM 相對來說更簡單。
另外,需要說明的是,文中的“自己實現 STM”部分我參考了Software Transactional Memory in Scala這篇博文以及一個 GitHub 項目,目前還很粗糙,并不是一個完備的 MVCC。如果你對這方面感興趣,可以參考Improving the STM: Multi-Version Concurrency Control 這篇博文,里面講到了如何優化,你可以嘗試學習下。
總結
以上是生活随笔為你收集整理的Java并发编程实战~软件事务内存的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Struts2源码阅读(四)_Dispa
- 下一篇: Spring Data JPA 从入门到