Spring源码解析-事务

 2023-02-04
原文作者:Xiao镔 原文地址:https://juejin.cn/post/7095263419552497672

前言

事务可以说是我们平时工作中使用得最多的技能,下面我们来看看Spring的事务究竟是怎么实现的, 建议先看juejin.cn/post/707971…

@EnableTransactionManagement

项目中使用事务时,都需要加上@EnableTransactionManagement这个注解,加上这个注解,代表开启事务,这个开启事务是什么意思呢?(SpringBoot是不需要加这个注解的,SpringBoot内部已经帮我们做了这个动作)

点击进去EnableTransactionManagement,会发现EnableTransactionManagement Import 了TransactionManagementConfigurationSelector

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(TransactionManagementConfigurationSelector.class)
    public @interface EnableTransactionManagement {
       // 强制使用cglib代理
       boolean proxyTargetClass() default false;
       // 拦截模式
       AdviceMode mode() default AdviceMode.PROXY;
       // 加载顺序
       int order() default Ordered.LOWEST_PRECEDENCE;
    ​
    }

在之前的Spring文章中,我曾写过,@Import这个注解可以引入三种类型的类

  • ImportSelector类型
  • ImportBeanDefinitionRegistrar类型
  • 普通类

不管哪种类型,都是注册BeanDefinition的一种方式,此处Import的是ImportSelector,在TransactionManagementConfigurationSelector中,会根据拦截模式加载不同的类,由于默认的拦截模式是PROXY,所以会加载 AutoProxyRegistrarProxyTransactionManagementConfiguration

    public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
    ​
       @Override
       protected String[] selectImports(AdviceMode adviceMode) {
          switch (adviceMode) {
             case PROXY:
                return new String[] {AutoProxyRegistrar.class.getName(),
                      ProxyTransactionManagementConfiguration.class.getName()};
             case ASPECTJ:
                return new String[] {determineTransactionAspectClass()};
             default:
                return null;
          }
       }
    }

ProxyTransactionManagementConfiguration 这个类比较重要,这个类主要注入了BeanFactoryTransactionAttributeSourceAdvisor这个Bean(TransactionAttributeSourceTransactionInterceptor是其属性)

    public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
      
       @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
       @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
       public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
             TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
    ​
          BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
          // 可以看到下面两个bean都是BeanFactoryTransactionAttributeSourceAdvisor的属性
          advisor.setTransactionAttributeSource(transactionAttributeSource);
          advisor.setAdvice(transactionInterceptor);
          if (this.enableTx != null) {
             advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
          }
          return advisor;
       }
    ​
       @Bean
       @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
       public TransactionAttributeSource transactionAttributeSource() {
          return new AnnotationTransactionAttributeSource();
       }
    ​
       @Bean
       @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
       public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
          TransactionInterceptor interceptor = new TransactionInterceptor();
          interceptor.setTransactionAttributeSource(transactionAttributeSource);
          if (this.txManager != null) {
             interceptor.setTransactionManager(this.txManager);
          }
          return interceptor;
       }
    ​
    }

我们看看BeanFactoryTransactionAttributeSourceAdvisor这个类的继承图,会发现它是一个Advisor,每一个增强器都会有一个Advice,TransactionInterceptor就是它的Advice,这个通知的作用很多,后面详细讲,AnnotationTransactionAttributeSource 中很多事务相关的属性

202301012021395791.png

一些重要的属性

Spring事务使用ThreadLocal来进行资源与事务的生命周期的同步管理,什么是资源的同步呢?在一个事务中,我们往往会执行多条sql(如果是单条sql其实没必要使用事务),为了保证所有sql都使用同一个数据库连接,我们需要将数据库连接这个资源来与事务进行同步,包括这个资源的获取与归还,下面看看是如何进行同步的

    // 这些属性位于TransactionSynchronizationManager这个类中
    ​
    // 保存事务的相关资源,一般都是保存事务的Connection
    private static final ThreadLocal<Map<Object, Object>> resources =
          new NamedThreadLocal<>("Transactional resources");
    // 这个是用于定制一些事务执行过程中的回调行为,暂时不用理会
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
          new NamedThreadLocal<>("Transaction synchronizations");
    // 事务的名称
    private static final ThreadLocal<String> currentTransactionName =
          new NamedThreadLocal<>("Current transaction name");
    // 事务是否被标记成只读
    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
          new NamedThreadLocal<>("Current transaction read-only status");
    // 事务的隔离级别
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
          new NamedThreadLocal<>("Current transaction isolation level");
    // 是否开启了事务
    private static final ThreadLocal<Boolean> actualTransactionActive =
          new NamedThreadLocal<>("Actual transaction active");
    ​
    // 当然,是否同步也是可以进行控制的,一个框架想要具有普适性,必须有开放的能力
    // 这是属性transactionSynchronization的三个取值,决定着是否开启资源与事务的同步
    ​
    // 任何情况下都开启(不管实际情况下有没有事务)
    public static final int SYNCHRONIZATION_ALWAYS = 0;
    // 实际情况下存在事务才开启
    public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1;
    // 任何时候都不开启
    public static final int SYNCHRONIZATION_NEVER = 2;
    ​
    ​
    // 我们来看一段代码感受一下是如何维护事务的同步的
    // 这段方法位于AbstractPlatformTransactionManager这个类中
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
      // 是否决定进行同步
      if (status.isNewSynchronization()) {
        // 记录是否开启了事务
        TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
        // 记录事务的隔离级别
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
          definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
          definition.getIsolationLevel() : null);
        // 记录是否被标记成只读
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
        // 记录事务的名字
        TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
        TransactionSynchronizationManager.initSynchronization();
      }
    }

一些重要的类

Spring是一个框架,为了兼容不同的组件,Spring进行了统一的抽象,比如PlatformTransactionManager,它定义基本的事务操作方法,这些事务操作方法都是平台无关的,具体的实现由子类实现

事务管理器
    public interface PlatformTransactionManager extends TransactionManager {
        // 获取事务
        TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
        // 提交事务
        void commit(TransactionStatus status) throws TransactionException;
        // 回滚事务
        void rollback(TransactionStatus status) throws TransactionException;
    }
    ​
    ​
    ​
    // AbstractPlatformTransactionManager是实现PlatformTransactionManager的一个抽象类,可以说是一个骨架,封装了事务管理
    // 中的基本流程,组件接入Spring事务时,继承这个类,实现其中的各种模版方法即可
    public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
      // 以下几个方法都是其中比较重要的模版方法,一看方法名就知道是什么意思了
      protected abstract void doBegin(Object transaction, TransactionDefinition definition)
          throws TransactionException;
      
      protected Object doSuspend(Object transaction) throws TransactionException {
        throw new TransactionSuspensionNotSupportedException(
            "Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
      }
    ​
      protected void doResume(@Nullable Object transaction, Object suspendedResources) throws TransactionException {
        throw new TransactionSuspensionNotSupportedException(
            "Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
      }
    ​
      protected boolean shouldCommitOnGlobalRollbackOnly() {
        return false;
      }
    ​
      protected void prepareForCommit(DefaultTransactionStatus status) {
      }
    ​
      protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;
    ​
      protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;
    }
    ​
    ​
    ​

其实现类还是蛮多的

202301012021402342.png

事务状态
    public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
    ​
      boolean hasSavepoint();
      @Override
      void flush();
    ​
    }
    ​
    public interface TransactionExecution {
    ​
      // 当前事务是否是新事务
      boolean isNewTransaction();
      // 是否已被标记为回滚
      void setRollbackOnly();
      boolean isRollbackOnly();
      // 事务是否完成
      boolean isCompleted();
    ​
    }
    ​
    public interface SavepointManager {
        // 创建保存点
        Object createSavepoint() throws TransactionException;
        // 回滚到指定保存点
        void rollbackToSavepoint(Object savepoint) throws TransactionException;
        // 移除保存点
        void releaseSavepoint(Object savepoint) throws TransactionException;
    }
事务属性
    // TransactionDefinition类中定义的是事务的隔离级别和事务的传播属性,下文为了更好地理解源码会列举一下,个人认为不需要记这些
    // 需要理解的地方是,事务的隔离级别是数据库本身的事务功能,而事务的传播属性是Spring定义的
    public interface TransactionAttribute extends TransactionDefinition {
    ​
      @Nullable
      String getQualifier();
    ​
      Collection<String> getLabels();
    ​
      boolean rollbackOn(Throwable ex);
    ​
    }

源码解析

如果看过我之前关于AOP的文章,AOP使用了责任链模式,链中的拦截器会逐个触发,invoke就是触发时的入口,所以我们从invoke开始看代码

    @Override
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
       // Work out the target class: may be {@code null}.
       // The TransactionAttributeSource should be passed the target class
       // as well as the method, which may be from an interface.
       Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    ​
       // Adapt to TransactionAspectSupport's invokeWithinTransaction...
       return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
          @Override
          @Nullable
          public Object proceedWithInvocation() throws Throwable {
             return invocation.proceed();
          }
          @Override
          public Object getTarget() {
             return invocation.getThis();
          }
          @Override
          public Object[] getArguments() {
             return invocation.getArguments();
          }
       });
    }
    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
          final InvocationCallback invocation) throws Throwable {
    ​
       // If the transaction attribute is null, the method is non-transactional.
       TransactionAttributeSource tas = getTransactionAttributeSource();
       final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
       final TransactionManager tm = determineTransactionManager(txAttr);
    ​
       ......
    ​
       PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
       final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    ​
       // 声明式事务走这个逻辑
       if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
          // Standard transaction demarcation with getTransaction and commit/rollback calls.
          TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    ​
          Object retVal;
          try {
             // 执行回调,如果没有后续拦截器的话,就进入事务方法了
             retVal = invocation.proceedWithInvocation();
          }
          catch (Throwable ex) {
             // 发生异常的处理
             completeTransactionAfterThrowing(txInfo, ex);
             throw ex;
          }
          finally {
             cleanupTransactionInfo(txInfo);
          }
    ​
          if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
             // Set rollback-only in case of Vavr failure matching our rollback rules...
             TransactionStatus status = txInfo.getTransactionStatus();
             if (status != null && txAttr != null) {
                retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
             }
          }
          // 提交事务
          commitTransactionAfterReturning(txInfo);
          return retVal;
       }
       // 编程式事务走这个逻辑
       else {
          Object result;
          final ThrowableHolder throwableHolder = new ThrowableHolder();
    ​
          // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
          try {
             result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
                TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
                try {
                   Object retVal = invocation.proceedWithInvocation();
                   if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                      // Set rollback-only in case of Vavr failure matching our rollback rules...
                      retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                   }
                   return retVal;
                }
                catch (Throwable ex) {
                   if (txAttr.rollbackOn(ex)) {
                      // A RuntimeException: will lead to a rollback.
                      if (ex instanceof RuntimeException) {
                         throw (RuntimeException) ex;
                      }
                      else {
                         throw new ThrowableHolderException(ex);
                      }
                   }
                   else {
                      // A normal return value: will lead to a commit.
                      throwableHolder.throwable = ex;
                      return null;
                   }
                }
                finally {
                   cleanupTransactionInfo(txInfo);
                }
             });
          }
          catch (ThrowableHolderException ex) {
             throw ex.getCause();
          }
          catch (TransactionSystemException ex2) {
             if (throwableHolder.throwable != null) {
                logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                ex2.initApplicationException(throwableHolder.throwable);
             }
             throw ex2;
          }
          catch (Throwable ex2) {
             if (throwableHolder.throwable != null) {
                logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
             }
             throw ex2;
          }
    ​
          // Check result state: It might indicate a Throwable to rethrow.
          if (throwableHolder.throwable != null) {
             throw throwableHolder.throwable;
          }
          return result;
       }
    }
    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
          @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    ​
       // 如果没有指定名称,使用方法名作为事务名,并使用DelegatingTransactionAttribute封装txAttr
       if (txAttr != null && txAttr.getName() == null) {
          txAttr = new DelegatingTransactionAttribute(txAttr) {
             @Override
             public String getName() {
                return joinpointIdentification;
             }
          };
       }
    ​
       TransactionStatus status = null;
       if (txAttr != null) {
          if (tm != null) {
             // 根据事务属性判断是否需要开启事务,获取TransactionStatus
             status = tm.getTransaction(txAttr);
          }
          else {
             if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                      "] because no transaction manager has been configured");
             }
          }
       }
       // 构建事务信息,将所有的事务信息统一记录在TransactionInfo中,一旦事务执行失败,Spring会通过TransactionInfo
       // 对象中的信息来进行回滚等后续工作
       return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
    // 我们看看TransactionInfo这个对象中有什么,不外乎是我们上面讲的比较重要的那几个类
    protected static final class TransactionInfo {
        // 事务管理器
        @Nullable
        private final PlatformTransactionManager transactionManager;
        // 事务属性
        @Nullable
        private final TransactionAttribute transactionAttribute;
        // 拦截了哪一个方法
        private final String joinpointIdentification;
        // 事务的状态
        @Nullable
        private TransactionStatus transactionStatus;
        @Nullable
        private TransactionInfo oldTransactionInfo;
    }
    ​
    ​
    ​
    protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
          @Nullable TransactionAttribute txAttr, String joinpointIdentification,
          @Nullable TransactionStatus status) {
    ​
       TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
       // 事务方法
       if (txAttr != null) {
          // We need a transaction for this method...
          if (logger.isTraceEnabled()) {
             logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
          }
          // The transaction manager will flag an error if an incompatible tx already exists.
          txInfo.newTransactionStatus(status);
       }
       // 非事务方法
       else {
          if (logger.isTraceEnabled()) {
             logger.trace("No need to create transaction for [" + joinpointIdentification +
                   "]: This method is not transactional.");
          }
       }
    ​
       // 不管有没有事务,都会将TransactionInfo绑定到当前线程
       txInfo.bindToThread();
       return txInfo;
    }
    ​
    // 绑定到当前线程的方法也很简单,就是放在一个ThreadLocal属性中
    private void bindToThread() {
      this.oldTransactionInfo = transactionInfoHolder.get();
      transactionInfoHolder.set(this);
    }

下面的代码是对不同的事务传播属性进行不同的处理,为了让大家有一个更好的认识,先列举下不同的事务传播属性及其场景

事务传播机制 场景
PROPAGATION_REQUIRED Spring默认的事务隔离级别,如果当前上下⽂已经存在事务,那么加⼊到事务中执⾏,如果当前上下⽂不存在事务,则新建事务执⾏
PROPAGATION_SUPPORTS 如果当前上下⽂存在事务,加⼊事务,如果没有事务,则使⽤⾮事务的⽅式执⾏
PROPAGATION_MANDATORY 当前上下文必须存在事务,否则抛出异常
PROPAGATION_REQUIRES_NEW 创建一个新的事物,如果当前事务存在,挂起当前事务
PROPAGATION_NOT_SUPPORTED 如果上下⽂中存在事务,则挂起事务
PROPAGATION_NEVER 当前上下文不能存在事务,否则抛出异常
PROPAGATION_NESTED 如果当前上下文存在事务,则嵌套事务执行,如果不存在事务,则新建事务

这几种事务传播属性都蛮好理解的,比较容易混淆的是PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED,大家有兴趣的可以去搜索下

    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
          throws TransactionException {
    ​
       // 如果没有设置TransactionDefinition,则使用默认的设置
       TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    ​
       Object transaction = doGetTransaction();
       boolean debugEnabled = logger.isDebugEnabled();
       // 判断是否存在事务
       if (isExistingTransaction(transaction)) {
          // 事务存在的情况下,走这个分支
          return handleExistingTransaction(d ef, transaction, debugEnabled);
       }
       // 下面的判断都是在事务不存在的情况下做的判断
       // 事务超时时间验证,timeout如果为-1,表示没有时间限制
       if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
          throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
       }
    ​
       // 如果事务传播级别为PROPAGATION_MANDATORY,但是当前线程不存在事务,直接抛出异常
       // PROPAGATION_MANDATORY这个事务传播级别是为了防止某段代码调用的时候,上下文漏添加事务的保证手段
       if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
          throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
       }
       // 事务传播级别为PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新建事务
       else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
          // 当前不存在事务,挂起空事务
          SuspendedResourcesHolder suspendedResources = suspend(null);
          if (debugEnabled) {
             logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
          }
          try {
             // 创建事务
             return startTransaction(def, transaction, debugEnabled, suspendedResources);
          }
          catch (RuntimeException | Error ex) {
             resume(null, suspendedResources);
             throw ex;
          }
       }
       // 走到这里说明事务传播级别是PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED或者PROPAGATION_NEVER
       // 在没有事务的情况下,不影响逻辑
       else {
          // Create "empty" transaction: no actual transaction, but potentially synchronization.
          if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
             logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                   "isolation level will effectively be ignored: " + def);
          }
          boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
          // 注意:这里事务的值传的是null
          return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
       }
    }
    private TransactionStatus handleExistingTransaction(
          TransactionDefinition definition, Object transaction, boolean debugEnabled)
          throws TransactionException {
       // 在当前上下文已存在事务的情况下,事务传播级别是PROPAGATION_NEVER,抛出异常
       if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
          throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
       }
       // 事务传播级别是PROPAGATION_NOT_SUPPORTED
       if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
          if (debugEnabled) {
             logger.debug("Suspending current transaction");
          }
          // 将原来的事务挂起
          Object suspendedResources = suspend(transaction);
          boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
          // 第二个参数transaction传了个空事务
          // 第三个参数false为旧标记
          // 最后一个参数是挂起的对象
          return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
       }
       // 事务传播级别是PROPAGATION_REQUIRES_NEW
       if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
          if (debugEnabled) {
             logger.debug("Suspending current transaction, creating new transaction with name [" +
                   definition.getName() + "]");
          }
          // 挂起原来的事务
          SuspendedResourcesHolder suspendedResources = suspend(transaction);
          try {
             // 创建新事务
             return startTransaction(definition, transaction, debugEnabled, suspendedResources);
          }
          catch (RuntimeException | Error beginEx) {
             resumeAfterBeginException(transaction, suspendedResources, beginEx);
             throw beginEx;
          }
       }
       // 事务传播级别是PROPAGATION_NESTED
       if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
          // 如果不支持嵌套事务,直接抛出异常
          if (!isNestedTransactionAllowed()) {
             throw new NestedTransactionNotSupportedException(
                   "Transaction manager does not allow nested transactions by default - " +
                   "specify 'nestedTransactionAllowed' property with value 'true'");
          }
          if (debugEnabled) {
             logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
          }
          if (useSavepointForNestedTransaction()) {
             // Create savepoint within existing Spring-managed transaction,
             // through the SavepointManager API implemented by TransactionStatus.
             // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
             DefaultTransactionStatus status =
                   prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
             status.createAndHoldSavepoint();
             return status;
          }
          else {
             // Nested transaction through nested begin and commit/rollback calls.
             // Usually only for JTA: Spring synchronization might get activated here
             // in case of a pre-existing JTA transaction.
             return startTransaction(definition, transaction, debugEnabled, null);
          }
       }
    ​
       // 代码走到这里,剩下PROPAGATION_SUPPORTS、PROPAGATION_REQUIRED、PROPAGATION_MANDATORY三个事务隔离级别
       if (debugEnabled) {
          logger.debug("Participating in existing transaction");
       }
       if (isValidateExistingTransaction()) {
          // 自定义了隔离级别与已有事务隔离级别是否匹配
          if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
             Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
             if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                      definition + "] specifies isolation level which is incompatible with existing transaction: " +
                      (currentIsolationLevel != null ?
                            isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                            "(unknown)"));
             }
          }
          // 只读属性是否一致
          if (!definition.isReadOnly()) {
             if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                      definition + "] is not marked as read-only but existing transaction is");
             }
          }
       }
       // 存在事务则加入事务
       boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
       return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

看到这里,整个事务大体的框架源码都已经看完了,对不同的事务传播属性进行处理,当然,不同的传播属性也有一些共性的代码,比如挂起事务、提交事务、回滚事务,下面我们来看看

挂起事务
    @Nullable
    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
       if (TransactionSynchronizationManager.isSynchronizationActive()) {
          List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
          try {
             Object suspendedResources = null;
             if (transaction != null) {
                // 挂起当前事务,具体的实现由子类实现
                suspendedResources = doSuspend(transaction);
             }
             // 将当前线程绑定的事务名、是否只读、隔离级别,是否有事务等信息,与刚才的suspendedSynchronizations
             // 包在一起返回
             String name = TransactionSynchronizationManager.getCurrentTransactionName();
             TransactionSynchronizationManager.setCurrentTransactionName(null);
             boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
             TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
             Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
             TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
             boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
             TransactionSynchronizationManager.setActualTransactionActive(false);
             return new SuspendedResourcesHolder(
                   suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
          }
          catch (RuntimeException | Error ex) {
             // 挂起失败,进行恢复
             doResumeSynchronization(suspendedSynchronizations);
             throw ex;
          }
       }
       else if (transaction != null) {
          // 挂起当前事务
          Object suspendedResources = doSuspend(transaction);
          return new SuspendedResourcesHolder(suspendedResources);
       }
       else {
          // Neither transaction nor synchronization active.
          return null;
       }
    }
    // 真正挂起的动作都是子类实现的,因为不同的组件实现方式不一样,以下是JPA的一个实现,实现起来并不复杂,就是把绑定的connection
    // 置空
    @Override
    protected Object doSuspend(Object transaction) {
       JpaTransactionObject txObject = (JpaTransactionObject) transaction;
       txObject.setEntityManagerHolder(null, false);
       EntityManagerHolder entityManagerHolder = (EntityManagerHolder)
             TransactionSynchronizationManager.unbindResource(obtainEntityManagerFactory());
       txObject.setConnectionHolder(null);
       ConnectionHolder connectionHolder = null;
       if (getDataSource() != null && TransactionSynchronizationManager.hasResource(getDataSource())) {
          connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
       }
       return new SuspendedResourcesHolder(entityManagerHolder, connectionHolder);
    }
提交事务
    @Override
    public final void commit(TransactionStatus status) throws TransactionException {
       // 如果事务已经完成,抛出异常
       if (status.isCompleted()) {
          throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
       }
        
       DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
       // 如果事务链已经被标记回滚,进行回滚
       if (defStatus.isLocalRollbackOnly()) {
          if (defStatus.isDebug()) {
             logger.debug("Transactional code has requested rollback");
          }
          processRollback(defStatus, false);
          return;
       }
    ​
       if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
          if (defStatus.isDebug()) {
             logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
          }
          processRollback(defStatus, true);
          return;
       }
       // 如果没有被标记回滚,此处提交
       processCommit(defStatus);
    }
事务回滚
    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
       try {
          boolean unexpectedRollback = unexpected;
    ​
          try {
             // 扩展接口,回调TransactionSynchronization类中的beforeCompletion方法
             triggerBeforeCompletion(status);
    ​
             if (status.hasSavepoint()) {
                if (status.isDebug()) {
                   logger.debug("Rolling back transaction to savepoint");
                }
                status.rollbackToHeldSavepoint();
             }
             // 最外层事务回滚,status中有属性判断这个事务是否是一个新事务,在创建事务时就定好了
             // 在事务传播级别中,PROPAGATION_REQUIRES_NEW的情况下,如果子事务出问题,只会回滚那个子事务
             // 正好在这里对应上
             else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                   logger.debug("Initiating transaction rollback");
                }
                doRollback(status);
             }
             else {
                // 整个大事务回滚
                if (status.hasTransaction()) {
                   // globalRollbackOnParticipationFailure这个属性默认为true
                   // rollbackOnly 这个值默认为false
                   // globalRollbackOnParticipationFailure在这个值存在的情况下,内层事务出现问题,最终必然是全事务回滚的
                   if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                      if (status.isDebug()) {
                         logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                      }
                      doSetRollbackOnly(status);
                   }
                   else {
                      if (status.isDebug()) {
                         logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                      }
                   }
                }
                else {
                   logger.debug("Should roll back transaction but cannot - no transaction available");
                }
                // Unexpected rollback only matters here if we're asked to fail early
                if (!isFailEarlyOnGlobalRollbackOnly()) {
                   unexpectedRollback = false;
                }
             }
          }
          ......
       }
       ......
    }

参考资料

www.cnblogs.com/micrari/p/7…

www.cnblogs.com/java-wang/p…