详解 Spring 事务传播机制的实现原理

 2022-09-06
原文地址:https://blog.csdn.net/m0_43448868/article/details/112115749

什么是事务传播?

假设这样一个场景:方法A上面添加了一个@Transactional注解,在该方法中去调用另一个Service的方法B,但方法B并不需要事务,但是由于A开启了事务,导致B方法的执行也处于事务范围,那么这种情况该如何处理呢?

简单来说事务传播其实就是当一个事务方法调用其它方法时,被调用的方法可以通过@Transactional注解来决定如何应对调用方的事务,是抛出异常(Propagation.NEVER)?还是挂起调用方的事务(Propagation.NOT_SUPPORTED)?还是被调用方法自己再开启一个事务(Propagation#REQUIRES_NEW)?这点也和Spring官方文档中关于事务传播行为的解释不谋而合。

202209062323550601.png

另外需要说明的是数据库没有事务传播这个概念。

底层实现

我们在使用Spring的声明式事务时需要配置一个TransactionManager,字面翻译就是事务管理器。事务传播行为就是在该接口的抽象实现-AbstractPlatformTransactionManager的getTransaction方法中实现的。

202209062323563712.png

    @Bean
    public TransactionManager transactionManager(){
    	return new DataSourceTransactionManager(dataSource());
    }

在getTransaction方法中,首先通过doGetTransaction方法来获取事务,接下来通过isExistingTransaction方法来判断是否存在事务,如果存在事务则执行handleExistingTransaction方法来处理已存在的事务。

    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    			throws TransactionException {
    
    	// Use defaults if no transaction definition given.
    	TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    
    	Object transaction = doGetTransaction();
    	boolean debugEnabled = logger.isDebugEnabled();
    
    	if (isExistingTransaction(transaction)) {
    		// Existing transaction found -> check propagation behavior to find out how to behave.
    		return handleExistingTransaction(def, transaction, debugEnabled);
    	}
    
    	// 删除与本次分析无关代码...
    	}
    }

doGetTransaction方法由AbstractPlatformTransactionManager类定义,由DataSourceTransactionManager类实现,典型的模板方法设计模式。

在该方法中,首先创建一个事务对象-DataSourceTransactionObject ,重点是接下来的设置数据库链接持有器(封装了数据库链接-Connection),通过TransactionSynchronizationManager的getResource方法来获取数据库链接持有器-ConnectionHolder,然后将获取到的数据库链接持有器设置到DataSourceTransactionObject中。 因为数据库事务是和数据库链接-Connection有关(通过Connection的rollback或commit方法来回滚事务或提交事务),因此我们只需要关注一个数据库链接如何在不同方法之间传递。

    // DataSourceTransactionManager#doGetTransaction
    protected Object doGetTransaction() {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
        txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
        ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

接下来就查看下TransactionSynchronizationManager的getResource方法,在该方法中调用TransactionSynchronizationUtils的unwrapResourceIfNecessary对传入的Key解除包装,然后调用doGetResource方法来根据解除包装后的Key获取对应的value,最后返回这个value。

    public static Object getResource(Object key) {
    	Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    	Object value = doGetResource(actualKey);
    	if (value != null && logger.isTraceEnabled()) {
    		logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
    				Thread.currentThread().getName() + "]");
    	}
    	return value;
    }

在doGetResource方法中,调用resources的get方法来获取一个Map,看上去熟悉不?通过get方法不需要传入key就能获取value,你脑海中首先想到的是什么?

    private static Object doGetResource(Object actualKey) {
    	Map<Object, Object> map = resources.get();
    	if (map == null) {
    		return null;
    	}
    	Object value = map.get(actualKey);
    	// 删除与本次分析无关的代码
    	return value;
    }

Bingo,答案就是ThreadLocal!看到这里估计已经有小伙伴能明白Spring声明式事务-@Transactional注解中事务传播行为是如何实现的。就是使用ThreadLocal来解决不同事务方法之间的数据库链接问题。

    private static final ThreadLocal<Map<Object, Object>> resources =
    			new NamedThreadLocal<>("Transactional resources");

那么TransactionSynchronizationManager是什么东西呢?查看该类源码可以发现就是定义了六个ThreadLocal用于存放和事务相关的数据,例如前面我们看到的存放事务对象的-resources,存放当前事务名称的currentTransactionName 等等,其就是一个事物同步管理器。

    public abstract class TransactionSynchronizationManager {
    
    	private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
    	// 用于存放当前事务
    	private static final ThreadLocal<Map<Object, Object>> resources =
    			new NamedThreadLocal<>("Transactional resources");
    	// 用于存放同步回调接口实现类
    	private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
    			new NamedThreadLocal<>("Transaction synchronizations");
    	// 当前事务名称
    	private static final ThreadLocal<String> currentTransactionName =
    			new NamedThreadLocal<>("Current transaction name");
    	// 当前事务是否只读
    	private static final ThreadLocal<Boolean> currentTransactionReadOnly =
    			new NamedThreadLocal<>("Current transaction read-only status");
    	// 当前事务的隔离级别
    	private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
    			new NamedThreadLocal<>("Current transaction isolation level");
    	// 当前事务是否活跃
    	private static final ThreadLocal<Boolean> actualTransactionActive =
    			new NamedThreadLocal<>("Actual transaction active");
    	// ......
    }

202209062323589563.png

OK,明白Spring是如何将一个数据库链接在不同方法之间传递后,那再阅读isExistingTransaction(判断是否存在事务)和handleExistingTransaction(处理已存在事务)方法就会很简单。

在isExistingTransaction方法中就是通过判断传入的DataSourceTransactionObject对象(该对象由前面的doGetTransaction方法返回)持有的ConnectionHolder是否不等于空并且是否活跃。如果两个条件都满足则返回true,否则false。

     // DataSourceTransactionManager#isExistingTransaction
     protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        return txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive();
    }

handleExistingTransaction方法需要着重分析下,因为在该方法中完成了事务传播行为的实现,首先根据事务定义信息-TransactionDefinition (根据@Transactional注解创建)来获取事务传播行为。

    // AbstractPlatformTransactionManager#handleExistingTransaction
    private TransactionStatus handleExistingTransaction(
    		TransactionDefinition definition, Object transaction, boolean debugEnabled)
    		throws TransactionException {
    	// 当前方法上的@Transactional(propagation = Propagation.NEVER)配置是这样,则抛出异常
    	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    		throw new IllegalTransactionStateException(
    				"Existing transaction found for transaction marked with propagation 'never'");
    	}
    	// 当前方法上的@Transactional(propagation = Propagation.NOT_SUPPORTED)配置是这样
    	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    		if (debugEnabled) {
    			logger.debug("Suspending current transaction");
    		}
    		Object suspendedResources = suspend(transaction); // 调用suspend方法挂起事务
    		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    		return prepareTransactionStatus(
    				definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    	}
    	// 当前方法上的@Transactional(propagation = Propagation.REQUIRES_NEW)配置是这样
    	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    		if (debugEnabled) {
    			logger.debug("Suspending current transaction, creating new transaction with name [" +
    					definition.getName() + "]");
    		}
    		SuspendedResourcesHolder suspendedResources = suspend(transaction); // 挂起已经存在的事务
    		try { // 调用startTransaction方法开启一个新的事务,其实就是重新从数据源中获取一个数据库链接,不使用原有的数据库链接
    			return startTransaction(definition, transaction, debugEnabled, suspendedResources);
    		} catch (RuntimeException | Error beginEx) {
    			resumeAfterBeginException(transaction, suspendedResources, beginEx);
    			throw beginEx;
    		}
    	}
    
    	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    		// 嵌套事务处理这里就不再展开分析
    	}
    
    	// 删除与本次分析无关源码...
    }

这里我们就以事务传播行为中Propagation.REQUIRES_NEW来分析下如果存在当前事务,是如何挂起当前事务的,其实就是分析suspend方法的执行逻辑。

    // AbstractPlatformTransactionManager#suspend
    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    	if (TransactionSynchronizationManager.isSynchronizationActive()) {
    		List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
    		try {
    			Object suspendedResources = null;
    			if (transaction != null) {
    				suspendedResources = doSuspend(transaction); // 暂停当前事务其实就是从ThreadLocal中移除当前事务
    			}
    			String name = TransactionSynchronizationManager.getCurrentTransactionName();
    			TransactionSynchronizationManager.setCurrentTransactionName(null); // 将当前事务名称设置为null
    			boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
    			TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);// 将当前事务是否只读设置为false
    			Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);// 将当前事务隔离级别设置为null
    			boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
    			TransactionSynchronizationManager.setActualTransactionActive(false);// 将当前事务活跃状态设置为false
    			return new SuspendedResourcesHolder(
    					suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);// 根据要挂起的事务定义信息来构建一个SuspendedResourcesHolder对象
    		}
    		catch (RuntimeException | Error ex) {
    			// doSuspend failed - original transaction is still active...
    			doResumeSynchronization(suspendedSynchronizations);
    			throw ex;
    		}
    	} else if (transaction != null) {
    		Object suspendedResources = doSuspend(transaction);
    		return new SuspendedResourcesHolder(suspendedResources);
    	} else {
    		return null;
    	}
    }

通过查看suspend方法,我们知道所谓挂起事务其实就是先从六个ThreadLocal中获取到当前事务的定义信息,并将这六个ThreadLocal重置为初始状态,根据当前事务的定义信息创建SuspendedResourcesHolder并返回。

那么接下来的startTransaction方法是如何处理上一步挂起的事务呢?

在startTransaction方法入参中,有一个值得注意的参数就是SuspendedResourcesHolder,它就是被挂起的当前事务定义信息。

    // AbstractPlatformTransactionManager#startTransaction
    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
    			boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    	
    	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    	// 创建一个DefaultTransactionStatus实例,该对象保存了SuspendedResourcesHolder 
    	DefaultTransactionStatus status = newTransactionStatus(
    			definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    	doBegin(transaction, definition);// 开启事务
    	prepareSynchronization(status, definition);//预刷新事务同步
    	return status;
    }

如果让你来设计,你应该如何重新开启一个新的事务呢?

    // DataSourceTransactionManager#doBegin
    protected void doBegin(Object transaction, TransactionDefinition definition) {
         DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
         Connection con = null;
    
         try {
             if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                 Connection newCon = this.obtainDataSource().getConnection(); // 根据数据源对象重新获取一个数据库链接
                 if (this.logger.isDebugEnabled()) {
                     this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                 }
    			 // 将新获取到的数据连接设置到事务对象中
                 txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
             }
    
             txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
             con = txObject.getConnectionHolder().getConnection();
             Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); // 设置事务隔离界别
             txObject.setPreviousIsolationLevel(previousIsolationLevel);
             txObject.setReadOnly(definition.isReadOnly()); // 设置事务是否只读
             if (con.getAutoCommit()) {
                 txObject.setMustRestoreAutoCommit(true);
                 if (this.logger.isDebugEnabled()) {
                     this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                 }
                 con.setAutoCommit(false); // 关闭事务自动提交
             }
    
             this.prepareTransactionalConnection(con, definition);
             txObject.getConnectionHolder().setTransactionActive(true); // 设置事务活跃状态
             int timeout = this.determineTimeout(definition);
             if (timeout != -1) {
                 txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
             }
    
             if (txObject.isNewConnectionHolder()) {
                 TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder()); // 将当前事务对象设置到事务同步管理器中
             }
         } catch (Throwable var7) {
             if (txObject.isNewConnectionHolder()) {
                 DataSourceUtils.releaseConnection(con, this.obtainDataSource());
                 txObject.setConnectionHolder((ConnectionHolder)null, false);
             }
             throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
         }
     }

答案很简单,就是重新获取一个数据库链接。

附录

@Transactional注解propagation方法返回值和TransactionDefinition映射关系。详见下文:

    @Target({ElementType.TYPE, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    @Documented
    public @interface Transactional {
    	// ......
    	Propagation propagation() default Propagation.REQUIRED;
    	// ......
    }
    
    public enum Propagation {
    	// 需要事务,如果当前存在事务就是用当前事务,否则新建事务
    	REQUIRED(TransactionDefinition.PROPAGATION_REQUIRED),
    	// 支持事务,如果当前存在事务就使用当前事务,否则以非事务方式运行
    	SUPPORTS(TransactionDefinition.PROPAGATION_SUPPORTS),
    	// 支持事务,如果当前没有事务则抛出异常
    	MANDATORY(TransactionDefinition.PROPAGATION_MANDATORY),
    	// 无论当前存不存在事务,都新建一个事务
    	REQUIRES_NEW(TransactionDefinition.PROPAGATION_REQUIRES_NEW),
    	// 不支持事务,如果当前存在事务,则挂起当前事务
    	NOT_SUPPORTED(TransactionDefinition.PROPAGATION_NOT_SUPPORTED),
    	// 绝不支持事务,如果当前存在事务则抛出异常
    	NEVER(TransactionDefinition.PROPAGATION_NEVER),
    	// 支持嵌套事务
    	NESTED(TransactionDefinition.PROPAGATION_NESTED);
    
    	private final int value;
    
    	Propagation(int value) {
    		this.value = value;
    	}
    
    	public int value() {
    		return this.value;
    	}
    
    }

总结

所谓Spring事务中的传播行为其实就是当事务方法调用其它方法时,被调用的方法可以通过@Transactional来决定如何应对调用方法的事务。

熟悉JDBC的小伙伴都明白,事务是和数据库链接-Connection相关,所以无论Spring声明式事务外表多么华丽,繁杂,其本质还是基于Connection来完成,这点是毋庸置疑的,Spring也无法超脱于JDBC来另起炉灶。为了解决数据库链接跨方法传递,Spring使用了ThreadLocal来解决。