spring 多线程 事务 源码解析(一)
大家好,我是烤鴨:
? ? 今天分享的是spring 多線程事務源碼分析。
環境:
spring-jdbc 5.0.4.REALEASE
今天分享一下spring事務的方法,這一篇還沒涉及到多線程。
簡單說一下doBegin的方法:
強轉獲取當前的事務對象,看看事務對象是否有連接保持器(是否是新的事務對象)
或者連接保持器的事務同步性(默認是新的事務對象,不同步,為false)。
滿足上面的條件,為當前連接設置一個新的連接保持器,設置當前保持器事務同步性為同步(true)。
獲取事務的之前的事務隔離級別,新事務的話,就把當前的事務隔離級別設置為當前事務的previousIsolationLevel。
當前事務是否自動提交,如果是自動提交,事務對象的 mustRestoreAutoCommit 屬性設置為true,設置自動提交為 false。
ps : mustRestoreAutoCommit這個屬性在事務完成之后的 doCleanupAfterCompletion方法中,判斷如果這個屬性為true,連接就setAutoCommit 為true。
事務開啟后準備正常的事務連接,設置事務的只讀標識為false。
設置當前事務的當前持有者激活屬性為true,表示事務在激活中。
設置連接器的超時時間,如果超時時間非默認,就設置超時時間。
※※※ 如果是新的事務連接器,用ThreadLocal的Map變量,key 是真實去包裝后的數據源,value 是當前的連接持有者。(將連接和線程綁定)
?
在詳細看一下 DataSourceTransactionManager 的doBegin方法:
需要傳兩個參數:
@0:
?? ?TransactionDefinition就是一個關于事務的定義類(常量),其中包含隔離級別,傳播類型,只讀標識,超時時間等。
@1:
?? ?transaction強轉成DataSourceTransactionObject,先去看一下 DataSourceTransactionObject 是什么類。
?? ?DataSourceTransactionObject是DataSourceTransactionManager的內部類。
? ??DataSourceTransactionObject類說明:
/*** DataSource transaction object, representing a ConnectionHolder.* Used as transaction object by DataSourceTransactionManager.*/// 數據源事務的對象,代表連接保持器,作為事務對象被DataSourceTransactionManager調用。@2:
?? ?hasConnectionHolder()是JdbcTransactionObjectSupport的方法。判斷該類的connectionHandle 是否為空。
?? ?JdbcTransactionObjectSupport 是什么鬼,看一下,抽象類 實現了 SavepointManager 和 SmartTransactionObject。
/*** Convenient base class for JDBC-aware transaction objects. Can contain a* {@link ConnectionHolder} with a JDBC {@code Connection}, and implements the* {@link SavepointManager} interface based on that {@code ConnectionHolder}.** <p>Allows for programmatic management of JDBC {@link java.sql.Savepoint Savepoints}.* Spring's {@link org.springframework.transaction.support.DefaultTransactionStatus}* automatically delegates to this, as it autodetects transaction objects which* implement the {@link SavepointManager} interface.*///便于jdbc的事務對象繼承的基類。能夠獲得jdbc連接狀態,實現了SavepointManager接口。//允許對jdbc程序化管理,spring的默認事務狀態自動委托給這個,自動檢測事務對象。public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject再看SavepointManager 。
仨方法: 創建/回滾到/釋放 保存點。
/*** Interface that specifies an API to programmatically manage transaction* savepoints in a generic fashion. Extended by TransactionStatus to* expose savepoint management functionality for a specific transaction.** <p>Note that savepoints can only work within an active transaction.* Just use this programmatic savepoint handling for advanced needs;* else, a subtransaction with PROPAGATION_NESTED is preferable.** <p>This interface is inspired by JDBC 3.0's Savepoint mechanism* but is independent from any specific persistence technology.**/// 是一個接口指定的API能夠在通常情況下程序化地管理事務保存點。// 對于指定的事務,擴展的事務狀態展示了功能化的保存點管理。// ps:保存點出現在活躍事務中,在高級需求中使用程序化保存點,// 當用到 PROPAGATION_NESTED 級別的子事務的時候更好。// 這個接口受啟發于jdbc3.0保存點原理,但是獨立于任何特定的持久化技術。public interface SavepointManager {/*** Create a new savepoint. You can roll back to a specific savepoint* via {@code rollbackToSavepoint}, and explicitly release a savepoint* that you don't need anymore via {@code releaseSavepoint}.* <p>Note that most transaction managers will automatically release* savepoints at transaction completion.* @return a savepoint object, to be passed into* {@link #rollbackToSavepoint} or {@link #releaseSavepoint}* @throws NestedTransactionNotSupportedException if the underlying* transaction does not support savepoints* @throws TransactionException if the savepoint could not be created,* for example because the transaction is not in an appropriate state* @see java.sql.Connection#setSavepoint*///創建保存點,能夠通過rollbackToSavepoint回滾到特定的保存點,可以在//不需要的時候通過releaseSavepoint方法釋放保存點。大部分事務管理器會在事務//完成的時候釋放保存點。Object createSavepoint() throws TransactionException;/*** Roll back to the given savepoint.* <p>The savepoint will <i>not</i> be automatically released afterwards.* You may explicitly call {@link #releaseSavepoint(Object)} or rely on* automatic release on transaction completion.* @param savepoint the savepoint to roll back to* @throws NestedTransactionNotSupportedException if the underlying* transaction does not support savepoints* @throws TransactionException if the rollback failed* @see java.sql.Connection#rollback(java.sql.Savepoint)*///回滾到給出的保存點。//保存點最后不會自動釋放。可以調用釋放方法或者等事務結束自動關閉。void rollbackToSavepoint(Object savepoint) throws TransactionException;/*** Explicitly release the given savepoint.* <p>Note that most transaction managers will automatically release* savepoints on transaction completion.* <p>Implementations should fail as silently as possible if proper* resource cleanup will eventually happen at transaction completion.* @param savepoint the savepoint to release* @throws NestedTransactionNotSupportedException if the underlying* transaction does not support savepoints* @throws TransactionException if the release failed* @see java.sql.Connection#releaseSavepoint*///釋放保存點//大部分事務管理器會在事務完成的時候釋放保存點。//如果正確的資源清理在事務結束時發生,這個方法實現可能失敗。void releaseSavepoint(Object savepoint) throws TransactionException;}?一句話概括一下就是,嵌套事務中有個存檔,當我們設置了存檔,出異常的事務可以回滾到存檔,不至于全部回滾。
?? ? 嵌套事務(事務隔壁級別是PROPAGATION_NESTED) 食用更佳。
SmartTransactionObject
public interface SmartTransactionObject extends Flushable {/*** Interface to be implemented by transaction objects that are able to* return an internal rollback-only marker, typically from a another* transaction that has participated and marked it as rollback-only.** <p>Autodetected by DefaultTransactionStatus, to always return a* current rollbackOnly flag even if not resulting from the current* TransactionStatus.*///被用于事務對象實現的接口能夠返回一個內部的僅回滾標記,代表有另一個事務參與并標記為僅回滾。/*** Return whether the transaction is internally marked as rollback-only.* Can, for example, check the JTA UserTransaction.* @see javax.transaction.UserTransaction#getStatus* @see javax.transaction.Status#STATUS_MARKED_ROLLBACK*///返回是否有另一個事務參與。boolean isRollbackOnly();/*** Flush the underlying sessions to the datastore, if applicable:* for example, all affected Hibernate/JPA sessions.*///如果適用的話,刷新潛在的會話回數據源,比如所有的影響的Hibernate/JPA會話。(清理緩存區的資源)@Overridevoid flush();}@3: isSynchronizedWithTransaction() 是ResourceHolderSupport 類中的方法
/*** Convenient base class for resource holders.** <p>Features rollback-only support for participating transactions.* Can expire after a certain number of seconds or milliseconds* in order to determine a transactional timeout.*///資源持有者的實用的基類//為了讓事務能夠在一定時間內過期,對于參與進來的事務提供僅回滾的支持public abstract class ResourceHolderSupport implements ResourceHolder/*** Return whether the resource is synchronized with a transaction.*/// 當前資源持有者是否與事務同步public boolean isSynchronizedWithTransaction() {return this.synchronizedWithTransaction;} }?
?
再看實現的接口 ResourceHolder
?? ??? ???
@5: 數據源工具類,這里用到的是prepareConnectionForTransaction,具體看下面的方法注釋。
/*** Helper class that provides static methods for obtaining JDBC Connections from* a {@link javax.sql.DataSource}. Includes special support for Spring-managed* transactional Connections, e.g. managed by {@link DataSourceTransactionManager}* or {@link org.springframework.transaction.jta.JtaTransactionManager}.** <p>Used internally by Spring's {@link org.springframework.jdbc.core.JdbcTemplate},* Spring's JDBC operation objects and the JDBC {@link DataSourceTransactionManager}.* Can also be used directly in application code.*///工具類提供靜態方法,從數據源中獲取jdbc連接,包括對spring管理的事務攔截的特別支撐,//在被spring的JdbcTemplate,spring的jdbc操作對象和jdbc事務管理器使用。也能被直接用在應用代碼中。 public abstract class DataSourceUtils {/*** Order value for TransactionSynchronization objects that clean up JDBC Connections.*/// TransactionSynchronization清理jdbc連接的對象的順序值public static final int CONNECTION_SYNCHRONIZATION_ORDER = 1000;private static final Log logger = LogFactory.getLog(DataSourceUtils.class);/*** Obtain a Connection from the given DataSource. Translates SQLExceptions into* the Spring hierarchy of unchecked generic data access exceptions, simplifying* calling code and making any exception that is thrown more meaningful.* <p>Is aware of a corresponding Connection bound to the current thread, for example* when using {@link DataSourceTransactionManager}. Will bind a Connection to the* thread if transaction synchronization is active, e.g. when running within a* {@link org.springframework.transaction.jta.JtaTransactionManager JTA} transaction).* @param dataSource the DataSource to obtain Connections from* @return a JDBC Connection from the given DataSource* @throws org.springframework.jdbc.CannotGetJdbcConnectionException* if the attempt to get a Connection failed* @see #releaseConnection*///通過數據源獲取連接。將sql異常拋給spring的層級中未檢查的普通數據,簡化調用代碼,讓任何異常都清晰//要知道比如使用連接池數據管理器,一致的連接是和當前線程綁定。//舉例子,如果JTA事務中,同步事務被激活,連接和線程綁定。public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {try {return doGetConnection(dataSource);}catch (SQLException ex) {throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);}catch (IllegalStateException ex) {throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());}}/*** Actually obtain a JDBC Connection from the given DataSource.* Same as {@link #getConnection}, but throwing the original SQLException.* <p>Is aware of a corresponding Connection bound to the current thread, for example* when using {@link DataSourceTransactionManager}. Will bind a Connection to the thread* if transaction synchronization is active (e.g. if in a JTA transaction).* <p>Directly accessed by {@link TransactionAwareDataSourceProxy}.* @param dataSource the DataSource to obtain Connections from* @return a JDBC Connection from the given DataSource* @throws SQLException if thrown by JDBC methods* @see #doReleaseConnection*///獲取當前數據源的真正連接//和getConnection方法一樣,但拋出的是原始的sql異常。//要知道比如使用連接池數據管理器,一致的連接是和當前線程綁定。//舉例子,如果JTA事務中,同步事務被激活,連接和線程綁定。public static Connection doGetConnection(DataSource dataSource) throws SQLException {Assert.notNull(dataSource, "No DataSource specified");ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {conHolder.requested();if (!conHolder.hasConnection()) {logger.debug("Fetching resumed JDBC Connection from DataSource");conHolder.setConnection(fetchConnection(dataSource));}return conHolder.getConnection();}// Else we either got no holder or an empty thread-bound holder here.logger.debug("Fetching JDBC Connection from DataSource");Connection con = fetchConnection(dataSource);if (TransactionSynchronizationManager.isSynchronizationActive()) {logger.debug("Registering transaction synchronization for JDBC Connection");// Use same Connection for further JDBC actions within the transaction.// Thread-bound object will get removed by synchronization at transaction completion.ConnectionHolder holderToUse = conHolder;if (holderToUse == null) {holderToUse = new ConnectionHolder(con);}else {holderToUse.setConnection(con);}holderToUse.requested();TransactionSynchronizationManager.registerSynchronization(new ConnectionSynchronization(holderToUse, dataSource));holderToUse.setSynchronizedWithTransaction(true);if (holderToUse != conHolder) {TransactionSynchronizationManager.bindResource(dataSource, holderToUse);}}return con;}/*** Actually fetch a {@link Connection} from the given {@link DataSource},* defensively turning an unexpected {@code null} return value from* {@link DataSource#getConnection()} into an {@link IllegalStateException}.* @param dataSource the DataSource to obtain Connections from* @return a JDBC Connection from the given DataSource (never {@code null})* @throws SQLException if thrown by JDBC methods* @throws IllegalStateException if the DataSource returned a null value* @see DataSource#getConnection()*///從數據源獲取連接,如果返回空,拋異常。private static Connection fetchConnection(DataSource dataSource) throws SQLException {Connection con = dataSource.getConnection();if (con == null) {throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource);}return con;}/*** Prepare the given Connection with the given transaction semantics.* @param con the Connection to prepare* @param definition the transaction definition to apply* @return the previous isolation level, if any* @throws SQLException if thrown by JDBC methods* @see #resetConnectionAfterTransaction*///準備給傳入的連接加事務@Nullable public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)throws SQLException {Assert.notNull(con, "No Connection specified");// Set read-only flag.if (definition != null && definition.isReadOnly()) {try {if (logger.isDebugEnabled()) {logger.debug("Setting JDBC Connection [" + con + "] read-only");}con.setReadOnly(true);}catch (SQLException | RuntimeException ex) {Throwable exToCheck = ex;while (exToCheck != null) {if (exToCheck.getClass().getSimpleName().contains("Timeout")) {// Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0throw ex;}exToCheck = exToCheck.getCause();}// "read-only not supported" SQLException -> ignore, it's just a hint anyway//只讀不支持sql異常,忽略這個,只是一個提示logger.debug("Could not set JDBC Connection read-only", ex);}}// Apply specific isolation level, if any.//接受任何的隔離級別Integer previousIsolationLevel = null;if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {if (logger.isDebugEnabled()) {logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +definition.getIsolationLevel());}int currentIsolation = con.getTransactionIsolation();if (currentIsolation != definition.getIsolationLevel()) {previousIsolationLevel = currentIsolation;con.setTransactionIsolation(definition.getIsolationLevel());}}return previousIsolationLevel;}/*** Reset the given Connection after a transaction,* regarding read-only flag and isolation level.* @param con the Connection to reset* @param previousIsolationLevel the isolation level to restore, if any* @see #prepareConnectionForTransaction*///重置給定連接的事務,包括只讀標識和隔離級別public static void resetConnectionAfterTransaction(Connection con, @Nullable Integer previousIsolationLevel) {Assert.notNull(con, "No Connection specified");try {// Reset transaction isolation to previous value, if changed for the transaction.// 如果事務更改,則將事務隔離重置為前值if (previousIsolationLevel != null) {if (logger.isDebugEnabled()) {logger.debug("Resetting isolation level of JDBC Connection [" +con + "] to " + previousIsolationLevel);}con.setTransactionIsolation(previousIsolationLevel);}// Reset read-only flag.if (con.isReadOnly()) {if (logger.isDebugEnabled()) {logger.debug("Resetting read-only flag of JDBC Connection [" + con + "]");}con.setReadOnly(false);}}catch (Throwable ex) {logger.debug("Could not reset JDBC Connection after transaction", ex);}}/*** Determine whether the given JDBC Connection is transactional, that is,* bound to the current thread by Spring's transaction facilities.* @param con the Connection to check* @param dataSource the DataSource that the Connection was obtained from* (may be {@code null})* @return whether the Connection is transactional*///決定給定的jdbc連接是否支持事務,spring事務設置的是和當前線程綁定。public static boolean isConnectionTransactional(Connection con, @Nullable DataSource dataSource) {if (dataSource == null) {return false;}ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);return (conHolder != null && connectionEquals(conHolder, con));}/*** Apply the current transaction timeout, if any,* to the given JDBC Statement object.* @param stmt the JDBC Statement object* @param dataSource the DataSource that the Connection was obtained from* @throws SQLException if thrown by JDBC methods* @see java.sql.Statement#setQueryTimeout*///如果有的話,對于jdbc陳述對象的,應用當前事務的聲明的超時時間。public static void applyTransactionTimeout(Statement stmt, @Nullable DataSource dataSource) throws SQLException {applyTimeout(stmt, dataSource, -1);}/*** Apply the specified timeout - overridden by the current transaction timeout,* if any - to the given JDBC Statement object.* @param stmt the JDBC Statement object* @param dataSource the DataSource that the Connection was obtained from* @param timeout the timeout to apply (or 0 for no timeout outside of a transaction)* @throws SQLException if thrown by JDBC methods* @see java.sql.Statement#setQueryTimeout*///如果有的話,對于jdbc陳述對象的,接受被當前事務所覆蓋的特定的超時時間。public static void applyTimeout(Statement stmt, @Nullable DataSource dataSource, int timeout) throws SQLException {Assert.notNull(stmt, "No Statement specified");ConnectionHolder holder = null;if (dataSource != null) {holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);}if (holder != null && holder.hasTimeout()) {// Remaining transaction timeout overrides specified value.stmt.setQueryTimeout(holder.getTimeToLiveInSeconds());}else if (timeout >= 0) {// No current transaction timeout -> apply specified value.stmt.setQueryTimeout(timeout);}}/*** Close the given Connection, obtained from the given DataSource,* if it is not managed externally (that is, not bound to the thread).* @param con the Connection to close if necessary* (if this is {@code null}, the call will be ignored)* @param dataSource the DataSource that the Connection was obtained from* (may be {@code null})* @see #getConnection*///如果不是外部管理(非線程綁定),關閉從數據源獲取到的連接。public static void releaseConnection(@Nullable Connection con, @Nullable DataSource dataSource) {try {doReleaseConnection(con, dataSource);}catch (SQLException ex) {logger.debug("Could not close JDBC Connection", ex);}catch (Throwable ex) {logger.debug("Unexpected exception on closing JDBC Connection", ex);}}/*** Actually close the given Connection, obtained from the given DataSource.* Same as {@link #releaseConnection}, but throwing the original SQLException.* <p>Directly accessed by {@link TransactionAwareDataSourceProxy}.* @param con the Connection to close if necessary* (if this is {@code null}, the call will be ignored)* @param dataSource the DataSource that the Connection was obtained from* (may be {@code null})* @throws SQLException if thrown by JDBC methods* @see #doGetConnection*///真實關閉從數據源獲取到的連接,和釋放連接方法一樣,但是拋出一個sql異常public static void doReleaseConnection(@Nullable Connection con, @Nullable DataSource dataSource) throws SQLException {if (con == null) {return;}if (dataSource != null) {ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);if (conHolder != null && connectionEquals(conHolder, con)) {// It's the transactional Connection: Don't close it.conHolder.released();return;}}logger.debug("Returning JDBC Connection to DataSource");doCloseConnection(con, dataSource);}/*** Close the Connection, unless a {@link SmartDataSource} doesn't want us to.* @param con the Connection to close if necessary* @param dataSource the DataSource that the Connection was obtained from* @throws SQLException if thrown by JDBC methods* @see Connection#close()* @see SmartDataSource#shouldClose(Connection)*///關閉異常,除非SmartDataSource 不想讓我們關閉。public static void doCloseConnection(Connection con, @Nullable DataSource dataSource) throws SQLException {if (!(dataSource instanceof SmartDataSource) || ((SmartDataSource) dataSource).shouldClose(con)) {con.close();}}/*** Determine whether the given two Connections are equal, asking the target* Connection in case of a proxy. Used to detect equality even if the* user passed in a raw target Connection while the held one is a proxy.* @param conHolder the ConnectionHolder for the held Connection (potentially a proxy)* @param passedInCon the Connection passed-in by the user* (potentially a target Connection without proxy)* @return whether the given Connections are equal* @see #getTargetConnection*///比較兩個連接是否相同,詢問目標連接是否是代理,如果用戶在原目標連接傳遞,//當保持其中一個目標連接是代理的時候,檢測是否相同private static boolean connectionEquals(ConnectionHolder conHolder, Connection passedInCon) {if (!conHolder.hasConnection()) {return false;}Connection heldCon = conHolder.getConnection();// Explicitly check for identity too: for Connection handles that do not implement// "equals" properly, such as the ones Commons DBCP exposes).return (heldCon == passedInCon || heldCon.equals(passedInCon) ||getTargetConnection(heldCon).equals(passedInCon));}/*** Return the innermost target Connection of the given Connection. If the given* Connection is a proxy, it will be unwrapped until a non-proxy Connection is* found. Otherwise, the passed-in Connection will be returned as-is.* @param con the Connection proxy to unwrap* @return the innermost target Connection, or the passed-in one if no proxy* @see ConnectionProxy#getTargetConnection()*///返回最深層的目標連接,如果連接是代理的,將會去包裝直到非代理連接。//否則,傳遞的連接將會被返回。public static Connection getTargetConnection(Connection con) {Connection conToUse = con;while (conToUse instanceof ConnectionProxy) {conToUse = ((ConnectionProxy) conToUse).getTargetConnection();}return conToUse;}/*** Determine the connection synchronization order to use for the given* DataSource. Decreased for every level of nesting that a DataSource* has, checked through the level of DelegatingDataSource nesting.* @param dataSource the DataSource to check* @return the connection synchronization order to use* @see #CONNECTION_SYNCHRONIZATION_ORDER*///決定連接同步為了使用給定的數據源。減少數據源的嵌套層級,通過代理數據源的嵌套//檢測層級private static int getConnectionSynchronizationOrder(DataSource dataSource) {int order = CONNECTION_SYNCHRONIZATION_ORDER;DataSource currDs = dataSource;while (currDs instanceof DelegatingDataSource) {order--;currDs = ((DelegatingDataSource) currDs).getTargetDataSource();}return order;}/*** Callback for resource cleanup at the end of a non-native JDBC transaction* (e.g. when participating in a JtaTransactionManager transaction).* @see org.springframework.transaction.jta.JtaTransactionManager*///在非本地jdbc事務結束時資源清理的回調private static class ConnectionSynchronization extends TransactionSynchronizationAdapter {private final ConnectionHolder connectionHolder;private final DataSource dataSource;private int order;private boolean holderActive = true;public ConnectionSynchronization(ConnectionHolder connectionHolder, DataSource dataSource) {this.connectionHolder = connectionHolder;this.dataSource = dataSource;this.order = getConnectionSynchronizationOrder(dataSource);}@Overridepublic int getOrder() {return this.order;}@Overridepublic void suspend() {if (this.holderActive) {TransactionSynchronizationManager.unbindResource(this.dataSource);if (this.connectionHolder.hasConnection() && !this.connectionHolder.isOpen()) {// Release Connection on suspend if the application doesn't keep// a handle to it anymore. We will fetch a fresh Connection if the// application accesses the ConnectionHolder again after resume,// assuming that it will participate in the same transaction.//如果應用不再保持連接,暫停釋放資源。再應用再次獲取連接持有者后,會再次獲取新的連接,//假設仍會參與到相同的事務中releaseConnection(this.connectionHolder.getConnection(), this.dataSource);this.connectionHolder.setConnection(null);}}}@Overridepublic void resume() {if (this.holderActive) {TransactionSynchronizationManager.bindResource(this.dataSource, this.connectionHolder);}}@Overridepublic void beforeCompletion() {// Release Connection early if the holder is not open anymore// (that is, not used by another resource like a Hibernate Session// that has its own cleanup via transaction synchronization),// to avoid issues with strict JTA implementations that expect// the close call before transaction completion.// 如果連接不再開放,早早釋放連接,不像Hibernate的另一個資源在事務同步的時候有會// 自己清理,避免在事務結束前,JTA實現的問題。if (!this.connectionHolder.isOpen()) {TransactionSynchronizationManager.unbindResource(this.dataSource);this.holderActive = false;if (this.connectionHolder.hasConnection()) {releaseConnection(this.connectionHolder.getConnection(), this.dataSource);}}}@Overridepublic void afterCompletion(int status) {// If we haven't closed the Connection in beforeCompletion,// close it now. The holder might have been used for other// cleanup in the meantime, for example by a Hibernate Session.// 如果在完成前不關閉連接,現在關閉。這個保持可能同時已經被另一個清理所使用,比如Hibernate會話。if (this.holderActive) {// The thread-bound ConnectionHolder might not be available anymore,// since afterCompletion might get called from a different thread.// 這個線程綁定的連接持有者也許不再可用,自從完成之后可能會不同的線程回調。TransactionSynchronizationManager.unbindResourceIfPossible(this.dataSource);this.holderActive = false;if (this.connectionHolder.hasConnection()) {releaseConnection(this.connectionHolder.getConnection(), this.dataSource);// Reset the ConnectionHolder: It might remain bound to the thread.// 重置連接保持器:也許能夠和線程綁定。this.connectionHolder.setConnection(null);}}this.connectionHolder.reset();}}}@6 :?? prepareTransactionalConnection 方法
/*** Prepare the transactional {@code Connection} right after transaction begin.* <p>The default implementation executes a "SET TRANSACTION READ ONLY" statement* if the {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true}* and the transaction definition indicates a read-only transaction.* <p>The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres* and may work with other databases as well. If you'd like to adapt this treatment,* override this method accordingly.* @param con the transactional JDBC Connection* @param definition the current transaction definition* @throws SQLException if thrown by JDBC API* @since 4.3.7* @see #setEnforceReadOnly*/// 事務開啟后準備正常的事務連接。// 如果 enforceReadOnly 標識是true、事務定義標示一個只讀事務,默認的實現執行了"設置事務為只讀" 語句。// "設置事務只讀"是oracle,mysql和postgres和其他數據庫的理解。 如果你想適應這個方式,按照一致規則重寫這個方法。protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)throws SQLException {if (isEnforceReadOnly() && definition.isReadOnly()) {Statement stmt = con.createStatement();try {stmt.executeUpdate("SET TRANSACTION READ ONLY");}finally {stmt.close();}}}@7:?? ??? ?ConnectionHolder
/*** Connection holder, wrapping a JDBC Connection.* {@link DataSourceTransactionManager} binds instances of this class* to the thread, for a specific DataSource.** <p>Inherits rollback-only support for nested JDBC transactions* and reference count functionality from the base class.** <p>Note: This is an SPI class, not intended to be used by applications.*///連接持有者,是對jdbc連接的包裝。事務管理器是和當前類實例和線程綁定的,目的是指定連接池。//對于嵌套的hdbc事務,從基類繼承了僅回滾的支持和功能上的引用計數。//這是一個service provider interface class,不被任何應用使用。public class ConnectionHolder extends ResourceHolderSupport {public static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_";@Nullableprivate ConnectionHandle connectionHandle;@Nullableprivate Connection currentConnection;private boolean transactionActive = false;@Nullableprivate Boolean savepointsSupported;private int savepointCounter = 0;/*** Create a new ConnectionHolder for the given ConnectionHandle.* @param connectionHandle the ConnectionHandle to hold*///創建連接持有者public ConnectionHolder(ConnectionHandle connectionHandle) {Assert.notNull(connectionHandle, "ConnectionHandle must not be null");this.connectionHandle = connectionHandle;}/*** Create a new ConnectionHolder for the given JDBC Connection,* wrapping it with a {@link SimpleConnectionHandle},* assuming that there is no ongoing transaction.* @param connection the JDBC Connection to hold* @see SimpleConnectionHandle* @see #ConnectionHolder(java.sql.Connection, boolean)*/public ConnectionHolder(Connection connection) {this.connectionHandle = new SimpleConnectionHandle(connection);}/*** Create a new ConnectionHolder for the given JDBC Connection,* wrapping it with a {@link SimpleConnectionHandle}.* @param connection the JDBC Connection to hold* @param transactionActive whether the given Connection is involved* in an ongoing transaction* @see SimpleConnectionHandle*/public ConnectionHolder(Connection connection, boolean transactionActive) {this(connection);this.transactionActive = transactionActive;}/*** Return the ConnectionHandle held by this ConnectionHolder.*/@Nullablepublic ConnectionHandle getConnectionHandle() {return this.connectionHandle;}/*** Return whether this holder currently has a Connection.*/protected boolean hasConnection() {return (this.connectionHandle != null);}/*** Set whether this holder represents an active, JDBC-managed transaction.* @see DataSourceTransactionManager*/protected void setTransactionActive(boolean transactionActive) {this.transactionActive = transactionActive;}/*** Return whether this holder represents an active, JDBC-managed transaction.*///返回當前持有者是否代表了活動的jdbc管理的事務。protected boolean isTransactionActive() {return this.transactionActive;}/*** Override the existing Connection handle with the given Connection.* Reset the handle if given {@code null}.* <p>Used for releasing the Connection on suspend (with a {@code null}* argument) and setting a fresh Connection on resume.*/protected void setConnection(@Nullable Connection connection) {if (this.currentConnection != null) {if (this.connectionHandle != null) {this.connectionHandle.releaseConnection(this.currentConnection);}this.currentConnection = null;}if (connection != null) {this.connectionHandle = new SimpleConnectionHandle(connection);}else {this.connectionHandle = null;}}/*** Return the current Connection held by this ConnectionHolder.* <p>This will be the same Connection until {@code released}* gets called on the ConnectionHolder, which will reset the* held Connection, fetching a new Connection on demand.* @see ConnectionHandle#getConnection()* @see #released()*/public Connection getConnection() {Assert.notNull(this.connectionHandle, "Active Connection is required");if (this.currentConnection == null) {this.currentConnection = this.connectionHandle.getConnection();}return this.currentConnection;}/*** Return whether JDBC 3.0 Savepoints are supported.* Caches the flag for the lifetime of this ConnectionHolder.* @throws SQLException if thrown by the JDBC driver*/public boolean supportsSavepoints() throws SQLException {if (this.savepointsSupported == null) {this.savepointsSupported = getConnection().getMetaData().supportsSavepoints();}return this.savepointsSupported;}/*** Create a new JDBC 3.0 Savepoint for the current Connection,* using generated savepoint names that are unique for the Connection.* @return the new Savepoint* @throws SQLException if thrown by the JDBC driver*/public Savepoint createSavepoint() throws SQLException {this.savepointCounter++;return getConnection().setSavepoint(SAVEPOINT_NAME_PREFIX + this.savepointCounter);}/*** Releases the current Connection held by this ConnectionHolder.* <p>This is necessary for ConnectionHandles that expect "Connection borrowing",* where each returned Connection is only temporarily leased and needs to be* returned once the data operation is done, to make the Connection available* for other operations within the same transaction.*/@Overridepublic void released() {super.released();if (!isOpen() && this.currentConnection != null) {if (this.connectionHandle != null) {this.connectionHandle.releaseConnection(this.currentConnection);}this.currentConnection = null;}}@Overridepublic void clear() {super.clear();this.transactionActive = false;this.savepointsSupported = null;this.savepointCounter = 0;}}@8:?? ?TransactionSynchronizationManager 這個類比較重要,也就是多線程事務的關鍵。
?? ?簡單來說,這個類里邊維護了很多個線程副本(ThreadLocal)。我們都知道線程副本和當前線程綁定,
?? ?通過get方法可以獲取與當前線程與key對應的值。
?? ?其中包含 資源,事務的同步狀態,名稱,只讀標識,隔離級別,和真正的活躍的事務(多數都是代理,去除代理找到真正的非代理對象)。
?? ?這里的調用的綁定資源方法key是當前數據源,value是當前連接持有者。
/*** Central delegate that manages resources and transaction synchronizations per thread.* To be used by resource management code but not by typical application code.* * <p>Supports one resource per key without overwriting, that is, a resource needs* to be removed before a new one can be set for the same key.* Supports a list of transaction synchronizations if synchronization is active.** <p>Resource management code should check for thread-bound resources, e.g. JDBC* Connections or Hibernate Sessions, via {@code getResource}. Such code is* normally not supposed to bind resources to threads, as this is the responsibility* of transaction managers. A further option is to lazily bind on first use if* transaction synchronization is active, for performing transactions that span* an arbitrary number of resources.** <p>Transaction synchronization must be activated and deactivated by a transaction* manager via {@link #initSynchronization()} and {@link #clearSynchronization()}.* This is automatically supported by {@link AbstractPlatformTransactionManager},* and thus by all standard Spring transaction managers, such as* {@link org.springframework.transaction.jta.JtaTransactionManager} and* {@link org.springframework.jdbc.datasource.DataSourceTransactionManager}.** <p>Resource management code should only register synchronizations when this* manager is active, which can be checked via {@link #isSynchronizationActive};* it should perform immediate resource cleanup else. If transaction synchronization* isn't active, there is either no current transaction, or the transaction manager* doesn't support transaction synchronization.** <p>Synchronization is for example used to always return the same resources* within a JTA transaction, e.g. a JDBC Connection or a Hibernate Session for* any given DataSource or SessionFactory, respectively.**/// 中央對每個線程委托管理資源和事務同步。 public abstract class TransactionSynchronizationManager {private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);private static final ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources");private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =new NamedThreadLocal<>("Transaction synchronizations");private static final ThreadLocal<String> currentTransactionName =new NamedThreadLocal<>("Current transaction name");private static final ThreadLocal<Boolean> currentTransactionReadOnly =new NamedThreadLocal<>("Current transaction read-only status");private static final ThreadLocal<Integer> currentTransactionIsolationLevel =new NamedThreadLocal<>("Current transaction isolation level");private static final ThreadLocal<Boolean> actualTransactionActive =new NamedThreadLocal<>("Actual transaction active");//-------------------------------------------------------------------------// Management of transaction-associated resource handles//--------------------------------------------------------------------------/*** Return all resources that are bound to the current thread.* <p>Mainly for debugging purposes. Resource managers should always invoke* {@code hasResource} for a specific resource key that they are interested in.* @return a Map with resource keys (usually the resource factory) and resource* values (usually the active resource object), or an empty Map if there are* currently no resources bound* @see #hasResource*/public static Map<Object, Object> getResourceMap() {Map<Object, Object> map = resources.get();return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());}/*** Check if there is a resource for the given key bound to the current thread.* @param key the key to check (usually the resource factory)* @return if there is a value bound to the current thread* @see ResourceTransactionManager#getResourceFactory()*/public static boolean hasResource(Object key) {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Object value = doGetResource(actualKey);return (value != null);}/*** Retrieve a resource for the given key that is bound to the current thread.* @param key the key to check (usually the resource factory)* @return a value bound to the current thread (usually the active* resource object), or {@code null} if none* @see ResourceTransactionManager#getResourceFactory()*/@Nullablepublic static Object getResource(Object key) {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Object value = doGetResource(actualKey);if (value != null && logger.isTraceEnabled()) {logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +Thread.currentThread().getName() + "]");}return value;}/*** Actually check the value of the resource that is bound for the given key.*/@Nullableprivate static Object doGetResource(Object actualKey) {Map<Object, Object> map = resources.get();if (map == null) {return null;}Object value = map.get(actualKey);// Transparently remove ResourceHolder that was marked as void...if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {map.remove(actualKey);// Remove entire ThreadLocal if empty...if (map.isEmpty()) {resources.remove();}value = null;}return value;}/*** Bind the given resource for the given key to the current thread.* @param key the key to bind the value to (usually the resource factory)* @param value the value to bind (usually the active resource object)* @throws IllegalStateException if there is already a value bound to the thread* @see ResourceTransactionManager#getResourceFactory()*/// 將給定的key和給定的資源和當前線程綁定。// key和value對應(通常是資源工廠)// value和(通常是活躍的資源對象)綁定// 如果當前線程已經有一個值綁定了,會拋出異常public static void bindResource(Object key, Object value) throws IllegalStateException {//將key去包裝,直到找到非包裝對象 unwrapResourceIfNecessary 中方法判斷是aop還是實現InfrastructureProxy接口的,再調各種的去除包裝的方法Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, "Value must not be null");Map<Object, Object> map = resources.get();// set ThreadLocal Map if none foundif (map == null) {map = new HashMap<>();resources.set(map);}Object oldValue = map.put(actualKey, value);// Transparently suppress a ResourceHolder that was marked as void...// 無效的資源所有者被標記為透明if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {oldValue = null;}if (oldValue != null) {throw new IllegalStateException("Already value [" + oldValue + "] for key [" +actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");}if (logger.isTraceEnabled()) {logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +Thread.currentThread().getName() + "]");}}/*** Unbind a resource for the given key from the current thread.* @param key the key to unbind (usually the resource factory)* @return the previously bound value (usually the active resource object)* @throws IllegalStateException if there is no value bound to the thread* @see ResourceTransactionManager#getResourceFactory()*/public static Object unbindResource(Object key) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Object value = doUnbindResource(actualKey);if (value == null) {throw new IllegalStateException("No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");}return value;}/*** Unbind a resource for the given key from the current thread.* @param key the key to unbind (usually the resource factory)* @return the previously bound value, or {@code null} if none bound*/@Nullablepublic static Object unbindResourceIfPossible(Object key) {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);return doUnbindResource(actualKey);}/*** Actually remove the value of the resource that is bound for the given key.*/@Nullableprivate static Object doUnbindResource(Object actualKey) {Map<Object, Object> map = resources.get();if (map == null) {return null;}Object value = map.remove(actualKey);// Remove entire ThreadLocal if empty...if (map.isEmpty()) {resources.remove();}// Transparently suppress a ResourceHolder that was marked as void...if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {value = null;}if (value != null && logger.isTraceEnabled()) {logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +Thread.currentThread().getName() + "]");}return value;}//-------------------------------------------------------------------------// Management of transaction synchronizations//-------------------------------------------------------------------------/*** Return if transaction synchronization is active for the current thread.* Can be called before register to avoid unnecessary instance creation.* @see #registerSynchronization*/public static boolean isSynchronizationActive() {return (synchronizations.get() != null);}/*** Activate transaction synchronization for the current thread.* Called by a transaction manager on transaction begin.* @throws IllegalStateException if synchronization is already active*/public static void initSynchronization() throws IllegalStateException {if (isSynchronizationActive()) {throw new IllegalStateException("Cannot activate transaction synchronization - already active");}logger.trace("Initializing transaction synchronization");synchronizations.set(new LinkedHashSet<>());}/*** Register a new transaction synchronization for the current thread.* Typically called by resource management code.* <p>Note that synchronizations can implement the* {@link org.springframework.core.Ordered} interface.* They will be executed in an order according to their order value (if any).* @param synchronization the synchronization object to register* @throws IllegalStateException if transaction synchronization is not active* @see org.springframework.core.Ordered*/public static void registerSynchronization(TransactionSynchronization synchronization)throws IllegalStateException {Assert.notNull(synchronization, "TransactionSynchronization must not be null");if (!isSynchronizationActive()) {throw new IllegalStateException("Transaction synchronization is not active");}synchronizations.get().add(synchronization);}/*** Return an unmodifiable snapshot list of all registered synchronizations* for the current thread.* @return unmodifiable List of TransactionSynchronization instances* @throws IllegalStateException if synchronization is not active* @see TransactionSynchronization*/public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {Set<TransactionSynchronization> synchs = synchronizations.get();if (synchs == null) {throw new IllegalStateException("Transaction synchronization is not active");}// Return unmodifiable snapshot, to avoid ConcurrentModificationExceptions// while iterating and invoking synchronization callbacks that in turn// might register further synchronizations.if (synchs.isEmpty()) {return Collections.emptyList();}else {// Sort lazily here, not in registerSynchronization.List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);AnnotationAwareOrderComparator.sort(sortedSynchs);return Collections.unmodifiableList(sortedSynchs);}}/*** Deactivate transaction synchronization for the current thread.* Called by the transaction manager on transaction cleanup.* @throws IllegalStateException if synchronization is not active*/public static void clearSynchronization() throws IllegalStateException {if (!isSynchronizationActive()) {throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");}logger.trace("Clearing transaction synchronization");synchronizations.remove();}//-------------------------------------------------------------------------// Exposure of transaction characteristics//-------------------------------------------------------------------------/*** Expose the name of the current transaction, if any.* Called by the transaction manager on transaction begin and on cleanup.* @param name the name of the transaction, or {@code null} to reset it* @see org.springframework.transaction.TransactionDefinition#getName()*/public static void setCurrentTransactionName(@Nullable String name) {currentTransactionName.set(name);}/*** Return the name of the current transaction, or {@code null} if none set.* To be called by resource management code for optimizations per use case,* for example to optimize fetch strategies for specific named transactions.* @see org.springframework.transaction.TransactionDefinition#getName()*/@Nullablepublic static String getCurrentTransactionName() {return currentTransactionName.get();}/*** Expose a read-only flag for the current transaction.* Called by the transaction manager on transaction begin and on cleanup.* @param readOnly {@code true} to mark the current transaction* as read-only; {@code false} to reset such a read-only marker* @see org.springframework.transaction.TransactionDefinition#isReadOnly()*/public static void setCurrentTransactionReadOnly(boolean readOnly) {currentTransactionReadOnly.set(readOnly ? Boolean.TRUE : null);}/*** Return whether the current transaction is marked as read-only.* To be called by resource management code when preparing a newly* created resource (for example, a Hibernate Session).* <p>Note that transaction synchronizations receive the read-only flag* as argument for the {@code beforeCommit} callback, to be able* to suppress change detection on commit. The present method is meant* to be used for earlier read-only checks, for example to set the* flush mode of a Hibernate Session to "FlushMode.NEVER" upfront.* @see org.springframework.transaction.TransactionDefinition#isReadOnly()* @see TransactionSynchronization#beforeCommit(boolean)*/public static boolean isCurrentTransactionReadOnly() {return (currentTransactionReadOnly.get() != null);}/*** Expose an isolation level for the current transaction.* Called by the transaction manager on transaction begin and on cleanup.* @param isolationLevel the isolation level to expose, according to the* JDBC Connection constants (equivalent to the corresponding Spring* TransactionDefinition constants), or {@code null} to reset it* @see java.sql.Connection#TRANSACTION_READ_UNCOMMITTED* @see java.sql.Connection#TRANSACTION_READ_COMMITTED* @see java.sql.Connection#TRANSACTION_REPEATABLE_READ* @see java.sql.Connection#TRANSACTION_SERIALIZABLE* @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_UNCOMMITTED* @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_COMMITTED* @see org.springframework.transaction.TransactionDefinition#ISOLATION_REPEATABLE_READ* @see org.springframework.transaction.TransactionDefinition#ISOLATION_SERIALIZABLE* @see org.springframework.transaction.TransactionDefinition#getIsolationLevel()*/public static void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) {currentTransactionIsolationLevel.set(isolationLevel);}/*** Return the isolation level for the current transaction, if any.* To be called by resource management code when preparing a newly* created resource (for example, a JDBC Connection).* @return the currently exposed isolation level, according to the* JDBC Connection constants (equivalent to the corresponding Spring* TransactionDefinition constants), or {@code null} if none* @see java.sql.Connection#TRANSACTION_READ_UNCOMMITTED* @see java.sql.Connection#TRANSACTION_READ_COMMITTED* @see java.sql.Connection#TRANSACTION_REPEATABLE_READ* @see java.sql.Connection#TRANSACTION_SERIALIZABLE* @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_UNCOMMITTED* @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_COMMITTED* @see org.springframework.transaction.TransactionDefinition#ISOLATION_REPEATABLE_READ* @see org.springframework.transaction.TransactionDefinition#ISOLATION_SERIALIZABLE* @see org.springframework.transaction.TransactionDefinition#getIsolationLevel()*/@Nullablepublic static Integer getCurrentTransactionIsolationLevel() {return currentTransactionIsolationLevel.get();}/*** Expose whether there currently is an actual transaction active.* Called by the transaction manager on transaction begin and on cleanup.* @param active {@code true} to mark the current thread as being associated* with an actual transaction; {@code false} to reset that marker*/public static void setActualTransactionActive(boolean active) {actualTransactionActive.set(active ? Boolean.TRUE : null);}/*** Return whether there currently is an actual transaction active.* This indicates whether the current thread is associated with an actual* transaction rather than just with active transaction synchronization.* <p>To be called by resource management code that wants to discriminate* between active transaction synchronization (with or without backing* resource transaction; also on PROPAGATION_SUPPORTS) and an actual* transaction being active (with backing resource transaction;* on PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, etc).* @see #isSynchronizationActive()*/public static boolean isActualTransactionActive() {return (actualTransactionActive.get() != null);}/*** Clear the entire transaction synchronization state for the current thread:* registered synchronizations as well as the various transaction characteristics.* @see #clearSynchronization()* @see #setCurrentTransactionName* @see #setCurrentTransactionReadOnly* @see #setCurrentTransactionIsolationLevel* @see #setActualTransactionActive*/public static void clear() {synchronizations.remove();currentTransactionName.remove();currentTransactionReadOnly.remove();currentTransactionIsolationLevel.remove();actualTransactionActive.remove();} }?
總結
以上是生活随笔為你收集整理的spring 多线程 事务 源码解析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: iOS AutoLayout
- 下一篇: 线程同步以及yield()、wait()