前言
事务可以说是我们平时工作中使用得最多的技能,下面我们来看看Spring的事务究竟是怎么实现的, 建议先看juejin.cn/post/707971…
@EnableTransactionManagement
项目中使用事务时,都需要加上@EnableTransactionManagement
这个注解,加上这个注解,代表开启事务,这个开启事务是什么意思呢?(SpringBoot是不需要加这个注解的,SpringBoot内部已经帮我们做了这个动作)
点击进去EnableTransactionManagement
,会发现EnableTransactionManagement
Import 了TransactionManagementConfigurationSelector
类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
// 强制使用cglib代理
boolean proxyTargetClass() default false;
// 拦截模式
AdviceMode mode() default AdviceMode.PROXY;
// 加载顺序
int order() default Ordered.LOWEST_PRECEDENCE;
}
在之前的Spring文章中,我曾写过,@Import这个注解可以引入三种类型的类
- ImportSelector类型
- ImportBeanDefinitionRegistrar类型
- 普通类
不管哪种类型,都是注册BeanDefinition的一种方式,此处Import的是ImportSelector,在TransactionManagementConfigurationSelector
中,会根据拦截模式加载不同的类,由于默认的拦截模式是PROXY,所以会加载 AutoProxyRegistrar
、ProxyTransactionManagementConfiguration
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
}
ProxyTransactionManagementConfiguration
这个类比较重要,这个类主要注入了BeanFactoryTransactionAttributeSourceAdvisor
这个Bean(TransactionAttributeSource
、TransactionInterceptor
是其属性)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
// 可以看到下面两个bean都是BeanFactoryTransactionAttributeSourceAdvisor的属性
advisor.setTransactionAttributeSource(transactionAttributeSource);
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
我们看看BeanFactoryTransactionAttributeSourceAdvisor
这个类的继承图,会发现它是一个Advisor,每一个增强器都会有一个Advice,TransactionInterceptor
就是它的Advice,这个通知的作用很多,后面详细讲,AnnotationTransactionAttributeSource
中很多事务相关的属性
一些重要的属性
Spring事务使用ThreadLocal来进行资源与事务的生命周期的同步管理,什么是资源的同步呢?在一个事务中,我们往往会执行多条sql(如果是单条sql其实没必要使用事务),为了保证所有sql都使用同一个数据库连接,我们需要将数据库连接这个资源来与事务进行同步,包括这个资源的获取与归还,下面看看是如何进行同步的
// 这些属性位于TransactionSynchronizationManager这个类中
// 保存事务的相关资源,一般都是保存事务的Connection
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
// 这个是用于定制一些事务执行过程中的回调行为,暂时不用理会
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
// 事务的名称
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
// 事务是否被标记成只读
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
// 事务的隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
// 是否开启了事务
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
// 当然,是否同步也是可以进行控制的,一个框架想要具有普适性,必须有开放的能力
// 这是属性transactionSynchronization的三个取值,决定着是否开启资源与事务的同步
// 任何情况下都开启(不管实际情况下有没有事务)
public static final int SYNCHRONIZATION_ALWAYS = 0;
// 实际情况下存在事务才开启
public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1;
// 任何时候都不开启
public static final int SYNCHRONIZATION_NEVER = 2;
// 我们来看一段代码感受一下是如何维护事务的同步的
// 这段方法位于AbstractPlatformTransactionManager这个类中
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
// 是否决定进行同步
if (status.isNewSynchronization()) {
// 记录是否开启了事务
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 记录事务的隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
// 记录是否被标记成只读
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 记录事务的名字
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
一些重要的类
Spring是一个框架,为了兼容不同的组件,Spring进行了统一的抽象,比如PlatformTransactionManager,它定义基本的事务操作方法,这些事务操作方法都是平台无关的,具体的实现由子类实现
事务管理器
public interface PlatformTransactionManager extends TransactionManager {
// 获取事务
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
// 提交事务
void commit(TransactionStatus status) throws TransactionException;
// 回滚事务
void rollback(TransactionStatus status) throws TransactionException;
}
// AbstractPlatformTransactionManager是实现PlatformTransactionManager的一个抽象类,可以说是一个骨架,封装了事务管理
// 中的基本流程,组件接入Spring事务时,继承这个类,实现其中的各种模版方法即可
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
// 以下几个方法都是其中比较重要的模版方法,一看方法名就知道是什么意思了
protected abstract void doBegin(Object transaction, TransactionDefinition definition)
throws TransactionException;
protected Object doSuspend(Object transaction) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
protected void doResume(@Nullable Object transaction, Object suspendedResources) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
protected boolean shouldCommitOnGlobalRollbackOnly() {
return false;
}
protected void prepareForCommit(DefaultTransactionStatus status) {
}
protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;
protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;
}
其实现类还是蛮多的
事务状态
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
boolean hasSavepoint();
@Override
void flush();
}
public interface TransactionExecution {
// 当前事务是否是新事务
boolean isNewTransaction();
// 是否已被标记为回滚
void setRollbackOnly();
boolean isRollbackOnly();
// 事务是否完成
boolean isCompleted();
}
public interface SavepointManager {
// 创建保存点
Object createSavepoint() throws TransactionException;
// 回滚到指定保存点
void rollbackToSavepoint(Object savepoint) throws TransactionException;
// 移除保存点
void releaseSavepoint(Object savepoint) throws TransactionException;
}
事务属性
// TransactionDefinition类中定义的是事务的隔离级别和事务的传播属性,下文为了更好地理解源码会列举一下,个人认为不需要记这些
// 需要理解的地方是,事务的隔离级别是数据库本身的事务功能,而事务的传播属性是Spring定义的
public interface TransactionAttribute extends TransactionDefinition {
@Nullable
String getQualifier();
Collection<String> getLabels();
boolean rollbackOn(Throwable ex);
}
源码解析
如果看过我之前关于AOP的文章,AOP使用了责任链模式,链中的拦截器会逐个触发,invoke就是触发时的入口,所以我们从invoke开始看代码
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
......
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 声明式事务走这个逻辑
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行回调,如果没有后续拦截器的话,就进入事务方法了
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 发生异常的处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 编程式事务走这个逻辑
else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果没有指定名称,使用方法名作为事务名,并使用DelegatingTransactionAttribute封装txAttr
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 根据事务属性判断是否需要开启事务,获取TransactionStatus
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 构建事务信息,将所有的事务信息统一记录在TransactionInfo中,一旦事务执行失败,Spring会通过TransactionInfo
// 对象中的信息来进行回滚等后续工作
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
// 我们看看TransactionInfo这个对象中有什么,不外乎是我们上面讲的比较重要的那几个类
protected static final class TransactionInfo {
// 事务管理器
@Nullable
private final PlatformTransactionManager transactionManager;
// 事务属性
@Nullable
private final TransactionAttribute transactionAttribute;
// 拦截了哪一个方法
private final String joinpointIdentification;
// 事务的状态
@Nullable
private TransactionStatus transactionStatus;
@Nullable
private TransactionInfo oldTransactionInfo;
}
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
// 事务方法
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
// 非事务方法
else {
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// 不管有没有事务,都会将TransactionInfo绑定到当前线程
txInfo.bindToThread();
return txInfo;
}
// 绑定到当前线程的方法也很简单,就是放在一个ThreadLocal属性中
private void bindToThread() {
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
下面的代码是对不同的事务传播属性进行不同的处理,为了让大家有一个更好的认识,先列举下不同的事务传播属性及其场景
事务传播机制 | 场景 |
---|---|
PROPAGATION_REQUIRED | Spring默认的事务隔离级别,如果当前上下⽂已经存在事务,那么加⼊到事务中执⾏,如果当前上下⽂不存在事务,则新建事务执⾏ |
PROPAGATION_SUPPORTS | 如果当前上下⽂存在事务,加⼊事务,如果没有事务,则使⽤⾮事务的⽅式执⾏ |
PROPAGATION_MANDATORY | 当前上下文必须存在事务,否则抛出异常 |
PROPAGATION_REQUIRES_NEW | 创建一个新的事物,如果当前事务存在,挂起当前事务 |
PROPAGATION_NOT_SUPPORTED | 如果上下⽂中存在事务,则挂起事务 |
PROPAGATION_NEVER | 当前上下文不能存在事务,否则抛出异常 |
PROPAGATION_NESTED | 如果当前上下文存在事务,则嵌套事务执行,如果不存在事务,则新建事务 |
这几种事务传播属性都蛮好理解的,比较容易混淆的是PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED,大家有兴趣的可以去搜索下
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 如果没有设置TransactionDefinition,则使用默认的设置
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 判断是否存在事务
if (isExistingTransaction(transaction)) {
// 事务存在的情况下,走这个分支
return handleExistingTransaction(d ef, transaction, debugEnabled);
}
// 下面的判断都是在事务不存在的情况下做的判断
// 事务超时时间验证,timeout如果为-1,表示没有时间限制
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 如果事务传播级别为PROPAGATION_MANDATORY,但是当前线程不存在事务,直接抛出异常
// PROPAGATION_MANDATORY这个事务传播级别是为了防止某段代码调用的时候,上下文漏添加事务的保证手段
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 事务传播级别为PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新建事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 当前不存在事务,挂起空事务
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 创建事务
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
// 走到这里说明事务传播级别是PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED或者PROPAGATION_NEVER
// 在没有事务的情况下,不影响逻辑
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 注意:这里事务的值传的是null
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 在当前上下文已存在事务的情况下,事务传播级别是PROPAGATION_NEVER,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 事务传播级别是PROPAGATION_NOT_SUPPORTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 将原来的事务挂起
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 第二个参数transaction传了个空事务
// 第三个参数false为旧标记
// 最后一个参数是挂起的对象
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 事务传播级别是PROPAGATION_REQUIRES_NEW
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 挂起原来的事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 创建新事务
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 事务传播级别是PROPAGATION_NESTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 如果不支持嵌套事务,直接抛出异常
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// 代码走到这里,剩下PROPAGATION_SUPPORTS、PROPAGATION_REQUIRED、PROPAGATION_MANDATORY三个事务隔离级别
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
// 自定义了隔离级别与已有事务隔离级别是否匹配
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
// 只读属性是否一致
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
// 存在事务则加入事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
看到这里,整个事务大体的框架源码都已经看完了,对不同的事务传播属性进行处理,当然,不同的传播属性也有一些共性的代码,比如挂起事务、提交事务、回滚事务,下面我们来看看
挂起事务
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 挂起当前事务,具体的实现由子类实现
suspendedResources = doSuspend(transaction);
}
// 将当前线程绑定的事务名、是否只读、隔离级别,是否有事务等信息,与刚才的suspendedSynchronizations
// 包在一起返回
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// 挂起失败,进行恢复
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// 挂起当前事务
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
// 真正挂起的动作都是子类实现的,因为不同的组件实现方式不一样,以下是JPA的一个实现,实现起来并不复杂,就是把绑定的connection
// 置空
@Override
protected Object doSuspend(Object transaction) {
JpaTransactionObject txObject = (JpaTransactionObject) transaction;
txObject.setEntityManagerHolder(null, false);
EntityManagerHolder entityManagerHolder = (EntityManagerHolder)
TransactionSynchronizationManager.unbindResource(obtainEntityManagerFactory());
txObject.setConnectionHolder(null);
ConnectionHolder connectionHolder = null;
if (getDataSource() != null && TransactionSynchronizationManager.hasResource(getDataSource())) {
connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
}
return new SuspendedResourcesHolder(entityManagerHolder, connectionHolder);
}
提交事务
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 如果事务已经完成,抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果事务链已经被标记回滚,进行回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
// 如果没有被标记回滚,此处提交
processCommit(defStatus);
}
事务回滚
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 扩展接口,回调TransactionSynchronization类中的beforeCompletion方法
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
// 最外层事务回滚,status中有属性判断这个事务是否是一个新事务,在创建事务时就定好了
// 在事务传播级别中,PROPAGATION_REQUIRES_NEW的情况下,如果子事务出问题,只会回滚那个子事务
// 正好在这里对应上
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// 整个大事务回滚
if (status.hasTransaction()) {
// globalRollbackOnParticipationFailure这个属性默认为true
// rollbackOnly 这个值默认为false
// globalRollbackOnParticipationFailure在这个值存在的情况下,内层事务出现问题,最终必然是全事务回滚的
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
......
}
......
}