欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

深入理解Spring Transaction (事务管理)源码剖析(第一篇)

最编程 2024-02-22 16:14:51
...

引言

  Spring提供的声明式事务想必大家都很熟悉了,简简单单一个@Transactional注解便能提供如此强大的功能,那么它是如何实现的呢?带着这点好奇心,我们一起扒拉扒拉spring-tx-5.2.6.RELEASE的源码吧。

  所谓工欲善其事,必先利其器,深入源码之前了解清楚spring-tx的相关概念还是很有必要的。本篇算是Spring AOP源码解析的姊妹篇,毕竟声明式事务就是AOP思想的一个实际应用嘛。

Spring Tx Concepts

PlatformTransactionManager

  PlatformTransactionManager是Spring事务管理的核心接口,它规范了应用程序操作事务的方式。

public interface PlatformTransactionManager extends TransactionManager {
    /**
     * 获取事务的状态信息(依据传播行为的不同,可能返回一个已激活的事务或创建一个新的独立事务)
     */
    TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;

    /**
     * 提交当前事务,依据事务的当前状态,也可能会进行回滚,比如标记为rollback-only的事务
     */
    void commit(TransactionStatus status) throws TransactionException;

    /**
     * 回滚当前事务,依据传播行为的不同,非独立的内部事务仅会打上rollback-only标记
     */
    void rollback(TransactionStatus status) throws TransactionException;
}

TransactionDefinition

  TransactionDefinition描述了事务的相关属性,比如事务的隔离级别、传播行为,亦或是超时时长是多少等等。

public interface TransactionDefinition {

    /**
     * 返回事务的传播行为
     */
    default int getPropagationBehavior() {
        return PROPAGATION_REQUIRED;
    }

    /**
     * 返回事务的隔离级别
     */
    default int getIsolationLevel() {
        return ISOLATION_DEFAULT;
    }

    /**
     * 获取事务的超时时长
     */
    default int getTimeout() {
        return TIMEOUT_DEFAULT;
    }

    /**
     * 是否是只读事务
     */
    default boolean isReadOnly() {
        return false;
    }

    /**
     * 获取事务名称,多用于debug
     */
    @Nullable
    default String getName() {
        return null;
    }
}

  事务的隔离级别想必不用多做解释,传播行为(Propagation Behavior)是什么概念呢?简单来说,当一个事务方法被另一个事务方法调用时,传播行为可以控制是否需要创建事务以及如何创建事务,spring-tx中定义了7种事务传播行为:

  1. PROPAGATION_REQUIRED: 表示方法必须运行在事务中,如果当前事务存在,方法将会在该事务中运行,否则将开启一个新的独立事务
  2. PROPAGATION_SUPPORTS:表示方法支持在事务中运行,如果当前事务存在,方法将会在该事务中运行,否则以非事务方式运行
  3. PROPAGATION_MANDATORY:表示方法必须运行在事务中,如果当前事务不存在,抛出异常
  4. PROPAGATION_REQUIRES_NEW:表示方法必须运行在独立事务中,无论当前是否存在事务,该级别总会开启一个新的独立事务
  5. PROPAGATION_NOT_SUPPORTED:表示方法不支持在事务中运行,如果当前事务存在,挂起当前事务从而以非事务方式运行
  6. PROPAGATION_NEVER:表示方法不支持在事务中运行,如果当前事务存在,抛出异常
  7. PROPAGATION_NESTED:表示方法必须运行在事务中,如果当前事务存在,开启一个嵌套事务(Savepoint),否则等同于PROPAGATION_REQUIRED

  TransactionDefition更多的是表达通用的概念,它的子接口TransactionAttribute添加了基于AOP的rollbackOn(...)操作。

public interface TransactionAttribute extends TransactionDefinition {
   /**
    * 用于在Spring IoC Container中获取PlatformTransactionManager
    */
   @Nullable
   String getQualifier();

   /**
    * 判断是否在遇到指定类型的异常时进行回滚
    */
   boolean rollbackOn(Throwable ex);
}

再看一眼@Transactional注解的定义。

public @interface Transactional {

    @AliasFor("transactionManager")
    String value() default "";

    @AliasFor("value")
    String transactionManager() default "";

    Propagation propagation() default Propagation.REQUIRED;

    Isolation isolation() default Isolation.DEFAULT;

    int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;

    boolean readOnly() default false;

    Class<? extends Throwable>[] rollbackFor() default {};

    String[] rollbackForClassName() default {};

    Class<? extends Throwable>[] noRollbackFor() default {};

    String[] noRollbackForClassName() default {};

}

想必你也看出来了,TransactionAttribute正是对运行时获取到的@Transactional注解的封装。

TransctionStatus

  TransctionStatus描述了某一时间点上事务的状态信息,比如是否是新开启的独立事务、是否已完成以及是否打上了rollback-only标记等等,并且为了支持嵌套事务,TransctionStatus还额外提供了对保存点(Savepoint)的支持。

// 管理Savepoint
public interface SavepointManager {
    /**
     * 创建一个新的Savepoint,后续可以回滚到指定的SavePoint
     */
    Object createSavepoint() throws TransactionException;

    /**
     * 回滚到指定的Savepoint
     */
    void rollbackToSavepoint(Object savepoint) throws TransactionException;

    /**
     * 释放指定Savepoint
     */
    void releaseSavepoint(Object savepoint) throws TransactionException;
}

// 代表事务的当前状态
public interface TransactionExecution {
    /**
     * 检查当前事务是否是一个独立事务,返回false表示是加入的一个已存在的事务或以非事务方式运行
     */
    boolean isNewTransaction();

    /**
     * 给当前事务打上rollback-only标记,被标记为rollback-only的事务只能被回滚而不会被提交
     */
    void setRollbackOnly();

    /**
     * 检查当前事务是否打上了rollback-only标记
     */
    boolean isRollbackOnly();

    /**
   * 检查当前事务是否已完成,已提交或已回滚都认为是已完成
     */
    boolean isCompleted();
}

public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
    /**
     * 检查当前事务是否携带有Savepoint,也就是说是否创建了嵌套事务
     */
    boolean hasSavepoint();

    /**
     * 如果底层TxManager支持的话就有用,比如Hibernate Session,
     * 而对于JDBC DataSource/Connection来讲是没有flush这个概念的,基本上是no-op
     */
    @Override
    void flush();
}

Related Concepts

ResourceHolder

  当一个事务方法 A 调用另一个事务方法 B 的时候,如何保证这两个方法运行在同一个事务中呢?如果方法 A 和 B 使用不同的java.sql.Connection来操作数据库,能保证它们运行在同一个事务中吗?很明显,是不能的。多个方法运行在同一个事务中的前提是它们必须使用同一个java.sql.Connection,以伪代码的形式就是:

try {
  connection.setAutoCommit(false)
    // 方法 A 执行 sql
  methodA(connection);
    // 方法 B 执行 sql
  methodB(connection);
    connection.commit() 
} catch (Exception ex) {
  connection.rollback() 
}

java.sql.Connection实例必须传递给方法 A 和方法 B,这样才能保证它们使用同一个连接对象。实际开发中,我们并没有像这样传递过连接对象,spring-tx将我们从这些细节中解放了出来。

  ResouceHolder就是设计来包裹底层连接资源的,spring-tx内部会使用线程私有存储ThreadLocal在同一个线程中进行传递,对方法 A 和 方法 B 来说,只要它们运行在同一个线程中,就能使用上同一个连接对象。当然了,ResourceHolder并非只能携带java.sql.Connection,对于使用MyBatis的用户来说,它携带的就是SqlSession了。

public interface ResourceHolder {
    /**
     * 重置
     */
    void reset();

    /**
     * 解绑连接资源
     */
    void unbound();

    /**
   * 检查此Holder是否携带有连接资源
     */
    boolean isVoid();
}
TransactionSynchronization

  由于spring-tx全盘接管了事务管理,那么它自然可以管理事务的生命周期。 TransactionSynchronization就是这样一个回调接口,它为我们揭示了事务运行时的各个阶段,如果我们需要在事务执行前后做一些额外的操作,使用它就再好不过了。

public interface TransactionSynchronization extends Flushable {
    /** 事务已提交 */
    int STATUS_COMMITTED = 0;
    /** 事务已回滚 */
    int STATUS_ROLLED_BACK = 1;
    /** 状态未知 */
    int STATUS_UNKNOWN = 2;

    /**
     * 事务被挂起时回调
     */
    default void suspend() {
    }

    /**
   * 事务恢复时回调
     */
    default void resume() {
    }

    @Override
    default void flush() {
    }

    /**
     * 事务提交前回调
     */
    default void beforeCommit(boolean readOnly) {
    }

    /**
   * 事务完成前回调,也就是在事务管理器 commit/rollback 之前
     */
    default void beforeCompletion() {
    }

    /**
     * 事务成功提交后回调
     */
    default void afterCommit() {
    }

    /**
     * 事务完成后回调,status揭示了事务当前状态
     */
    default void afterCompletion(int status) {
    }
}
TransactionSynchronizationManager

  spring-txResourceHolder的绑定和传递、TransactionSynchronization的注册和获取,均是代理给TransactionSynchronizationManager来完成的。

public abstract class TransactionSynchronizationManager {

   private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);

   // ResourceHolder的绑定关系
   // 比如在DataSourceTransactionManager中key是java.sql.DataSource,value是ConnectionHolder
   private static final ThreadLocal<Map<Object, Object>> resources =
         new NamedThreadLocal<>("Transactional resources");
     // 已注册的TransactionSynchronization
   private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
         new NamedThreadLocal<>("Transaction synchronizations");
     // 当前的事务名称
   private static final ThreadLocal<String> currentTransactionName =
         new NamedThreadLocal<>("Current transaction name");
     // 当前事务是否是只读事务
   private static final ThreadLocal<Boolean> currentTransactionReadOnly =
         new NamedThreadLocal<>("Current transaction read-only status");
   // 当前事务的隔离级别
   private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
         new NamedThreadLocal<>("Current transaction isolation level");
     // 事务是否真的被激活
   private static final ThreadLocal<Boolean> actualTransactionActive =
         new NamedThreadLocal<>("Actual transaction active");

   //-------------------------------------------------------------------------
   // Management of transaction-associated resource handles
   //-------------------------------------------------------------------------

   /**
    * 根据提供的key找寻底层资源
    */
   @Nullable
   public static Object getResource(Object key) {
      Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
      Object value = doGetResource(actualKey);
      if (value != null && logger.isTraceEnabled()) {
         logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
               Thread.currentThread().getName() + "]");
      }
      return value;
   }

   @Nullable
   private static Object doGetResource(Object actualKey) {
      Map<Object, Object> map = resources.get();
      if (map == null) {
         return null;
      }
      Object value = map.get(actualKey);
      // 对于ResourceHolder#isVid()的情况,认为其没有实际绑定上任何资源
      if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
         map.remove(actualKey);
         if (map.isEmpty()) {
            resources.remove();
         }
         value = null;
      }
      return value;
   }

   /**
    * 绑定一个资源
    */
   public static void bindResource(Object key, Object value) throws IllegalStateException {
      Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
      Assert.notNull(value, "Value must not be null");
      Map<Object, Object> map = resources.get();
        if (map == null) {
         map = new HashMap<>();
         resources.set(map);
      }
      Object oldValue = map.put(actualKey, value);
      // ResourceHolder#isVid()等同于不存在任何资源
      if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
         oldValue = null;
      }
      // TransactionSynchronizationManager中的任何操作,都只能先解除再操作,而不能进行覆盖
      if (oldValue != null) {
         throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
               actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
      }
      if (logger.isTraceEnabled()) {
         logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
               Thread.currentThread().getName() + "]");
      }
   }

   /**
        * 解绑资源,如果资源不存在抛出异常
    */
   public static Object unbindResource(Object key) throws IllegalStateException {
      Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
      Object value = doUnbindResource(actualKey);
      if (value == null) {
         throw new IllegalStateException(
               "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
      }
      return value;
   }

   /**
        * 解绑资源,资源不存在等于no-op
    */
   @Nullable
   public static Object unbindResourceIfPossible(Object key) {
      Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
      return doUnbindResource(actualKey);
   }

   @Nullable
   private static Object doUnbindResource(Object actualKey) {
      Map<Object, Object> map = resources.get();
      if (map == null) {
         return null;
      }
      Object value = map.remove(actualKey);
      if (map.isEmpty()) {
         resources.remove();
      }
      // ResourceHolder#isVid()等同于不存在任何资源
      if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
         value = null;
      }
      if (value != null && logger.isTraceEnabled()) {
         logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
               Thread.currentThread().getName() + "]");
      }
      return value;
   }

   //-------------------------------------------------------------------------
   // Management of transaction synchronizations
   //-------------------------------------------------------------------------

   /**
    * 检查transaction synchronization是否已激活
    */
   public static boolean isSynchronizationActive() {
      return (synchronizations.get() != null);
   }

   /**
        * 激活transaction synchronization,如果已激活,抛出异常
    */
   public static void initSynchronization() throws IllegalStateException {
      if (isSynchronizationActive()) {
         throw new IllegalStateException("Cannot activate transaction synchronization - already active");
      }
      logger.trace("Initializing transaction synchronization");
      synchronizations.set(new LinkedHashSet<>());
   }

   /**
        * 注册一个TransactionSynchronization,如果transaction synchronization未激活,抛出异常
    */
   public static void registerSynchronization(TransactionSynchronization synchronization)
         throws IllegalStateException {
      Assert.notNull(synchronization, "TransactionSynchronization must not be null");
      Set<TransactionSynchronization> synchs = synchronizations.get();
      if (synchs == null) {
         throw new IllegalStateException("Transaction synchronization is not active");
      }
      synchs.add(synchronization);
   }

   /**
    * 返回已注册的所有TransactionSynchronization,如果transaction synchronization未激活,抛出异常
    */
   public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
      Set<TransactionSynchronization> synchs = synchronizations.get();
      if (synchs == null) {
         throw new IllegalStateException("Transaction synchronization is not active");
      }
      if (synchs.isEmpty()) {
         return Collections.emptyList();
      }
      else {
         List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
         // 排序,TransactionSynchronization可实现Ordered接口或使用@Order注解指定优先级
         AnnotationAwareOrderComparator.sort(sortedSynchs);
         return Collections.unmodifiableList(sortedSynchs);
      }
   }

   /**
        * 关闭transaction synchronization
    */
   public static void clearSynchronization() throws IllegalStateException {
      if (!isSynchronizationActive()) {
         throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
      }
      logger.trace("Clearing transaction synchronization");
      synchronizations.remove();
   }

   //-------------------------------------------------------------------------
   // Exposure of transaction characteristics
   //-------------------------------------------------------------------------

   /**
        * 记录当前事务名称
    */
   public static void setCurrentTransactionName(@Nullable String name) {
      currentTransactionName.set(name);
   }

   /**
        * 返回当前事务名称
    */
   @Nullable
   public static String getCurrentTransactionName() {
      return currentTransactionName.get();
   }

   /**
        * 设置当前事务为只读事务
    */
   public static void setCurrentTransactionReadOnly(boolean readOnly) {
      currentTransactionReadOnly.set(readOnly ? Boolean.TRUE : null);
   }

   /**
        * 检查当前事务是否为只读事务
    */
   public static boolean isCurrentTransactionReadOnly() {
      return (currentTransactionReadOnly.get() != null);
   }

   /**
        * 设置当前事务的隔离级别
    */
   public static void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) {
      currentTransactionIsolationLevel.set(isolationLevel);
   }

   /**
        * 返回当前事务的隔离级别
    */
   @Nullable
   public static Integer getCurrentTransactionIsolationLevel() {
      return currentTransactionIsolationLevel.get();
   }

   /**
    * 设置当前事务已开启
    */
   public static void setActualTransactionActive(boolean active) {
      actualTransactionActive.set(active ? Boolean.TRUE : null);
   }

   /**
    * 检查当前事务已开启,也就是说是否真的存在事务而不仅仅只有transaction synchronization
    */
   public static boolean isActualTransactionActive() {
      return (actualTransactionActive.get() != null);
   }

   /**
        * 完全清除transaction synchronization各种状态信息
    */
   public static void clear() {
      synchronizations.remove();
      currentTransactionName.remove();
      currentTransactionReadOnly.remove();
      currentTransactionIsolationLevel.remove();
      actualTransactionActive.remove();
   }
}

注意,TransactionSynchronizationManager在操作ResourceHolder时是不允许直接覆盖的,旧的资源必须先解绑才能绑定新的资源。同时TransactionSynchronization只能在transaction synchronization激活时才能绑定,为此TransactionSynchronizationManager提供了initSynchronization()clearSynchronization()来分别开启开启和关闭transaction synchronization

结语

  抽象是编程的先决条件,编码不过是对抽象的具体实现。好的抽象才能带出好的代码,spring-tx也是如此,下一篇让我们一起钻到具体的实现细节里去吧~~