SpringBoot-事务源码分析

复习总结:

  • 注解式事务是通过aop实现的,底层是cglib动态代理实现的
    • 事务aop场景中的Advice是TransactionInterceptor
    • PointCut是TransactionAttributeSourcePointcut,它通过TransactionAttributeSource来检测@Transactional注解,因此注解可以理解为切面
  • TransactionInterceptor作为事务增强逻辑的封装类,需要持有逻辑事务管理器引用,不同的数据持久化平台都实现了自己的事务管理器
  • 项目中使用多种持久化平台时,建议在事务注解中显示的指定事务管理器

1. springboot事务自动装配

spring-boot-autoconfigurae 包中对事物有自动配置的支持,自动配置类为TransactionAutoConfiguration,在spring.factories文件中可以找到这条自动配置项,源码分析先从自动装配开始,看看spring-boot装配了哪些事务相关的bean。

TransactionAutoConfiguration中只显示定义了以下的三个bean,辅助bean

1
2
3
4
5
6
7
/**
* 这是一个PlatformTransactionManagerCustomizer列表
* springboot提供的扩展点,使用它可以对事务管理器进行一些配置,
**/
TransactionManagerCustomizers
TransactionalOperator //简化程序化事务界定和事务异常处理的操作符类。
TransactionTemplate // 简化程序化事务界定和事务异常处理的模板类。

TransactionAutoConfiguration里的重点配置是在内部类EnableTransactionManagementConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(TransactionManager.class)
@ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class)
public static class EnableTransactionManagementConfiguration {

@Configuration(proxyBeanMethods = false)
@EnableTransactionManagement(proxyTargetClass = false)
@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "false",
matchIfMissing = false)
public static class JdkDynamicAutoProxyConfiguration {

}

@Configuration(proxyBeanMethods = false)
@EnableTransactionManagement(proxyTargetClass = true)
@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "true",
matchIfMissing = true)
public static class CglibAutoProxyConfiguration {

}

}

该类对Jdk动态代理和CGlib动态代理两种方式分别作了配置,此类中并没有配置事务相关的bean,其关键是在@EnableTransactionManagement注解中

1.1 EnableTransactionManagement
1
2
3
4
5
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {...}

EnableTransactionManagement使用@Import注解导入了一个TransactionManagementConfigurationSelector,这种用法已经不陌生,常用于动态的装配bean,也就是根据应用的环境或配置装载不同类型的bean,进入该选择器代码:

1
2
3
4
5
6
7
8
9
10
11
12
// `TransactionManagementConfigurationSelector`
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}

springboot启动时会执行者这段逻辑,当adviceMode是动态代理模式,装配的是TransactionManagementConfiguration,当adviceMode是aspectj织入模式,装配的是其他的,本文只讨论动态代理的情况。

TransactionManagementConfiguration是此时真正的配置类,那么配置了哪些bean呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
...
}

@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}

@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}

第一个是BeanFactoryTransactionAttributeSourceAdvisor。它以Advisor结尾说明它是Spring AOP范畴里的东西。在AOP里,Advisor = Pointcut + Advice,Pointcut是切入点,表示要拦截的方法,Advice是增强,表示要加进去的事物功能。在BeanFactoryTransactionAttributeSourceAdvisor中有一个成员TransactionAttributeSourcePointcut,看名字就知道这是一个PointCut,它通过第三个bean—TransactionAttributeSource检测一个类的方法上是否有@Transactional注解,来确定该方法是否需要事物增强。

第二个bean是TransactionInterceptor,他就是一个Advice,因为它实现了Advice接口,包含了把事物加进去的逻辑。

1.2 TransactionInterceptor

事务拦截器TransactionInterceptor用于拦截添加了事务注解的方法或类,对其执行事务管理器txManager的相关操作,那么事务管理器又是什么时候装载的呢? 我们看到该配置类的父类中自动注入了一个TransactionManager,这里注入的是默认的事务管理器。当我们项目用到不止一个事务管理器的时候,开启事务时需要指定使用哪一个事务管理器,此时可以实现TransactionManagementConfigurer接口来指定一个默认的事务管理器,对有的系统,为了避免不必要的问题,在业务中必须要明确指定 @Transactional 的 value 值的情况下。不建议实现接口 TransactionManagementConfigurer,这样控制台会明确抛出异常,开发人员就不会忘记主动指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class AbstractTransactionManagementConfiguration implements ImportAware {

@Nullable
protected TransactionManager txManager;
@Autowired(required = false)
void setConfigurers(Collection<TransactionManagementConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one TransactionManagementConfigurer may exist");
}
TransactionManagementConfigurer configurer = configurers.iterator().next();
this.txManager = configurer.annotationDrivenTransactionManager();
}
}

当没有实现TransactionManagementConfigurer指定默认管理器时,上面的自动注入可能为空。

实时上关于事务管理器,不管是JPA还是JDBC等都实现自接口 PlatformTransactionManager 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager 实例。如果你添加的是 spring-boot-starter-data-jpa 依赖,框架会默认注入 JpaTransactionManager 实例。

TransactionInterceptor的核心逻辑是invoke()方法,具体的执行逻辑我会在下文执行流程中分析。

2. spring事物的执行流程

spring-boot完成了事务bean的自动装配之后,下面来分析事务方法执行时,事务管理器的执行过程。

定义事务管理器的接口是PlatformTransactionManager,这个接口定义了三个方法,官方给出的注释很详细:

1
2
3
4
5
6
7
8
9
public interface PlatformTransactionManager extends TransactionManager {
//根据指定的传播行为,返回当前活动的事务或创建新的事务。
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
//提交给定事务的状态。如果事务以编程方式标记为仅回滚,则执行回滚。
void commit(TransactionStatus status) throws TransactionException;
//执行给定事务的回滚。
void rollback(TransactionStatus status) throws TransactionException;
}

接口PlatformTransactionManager的一个抽象实现类就是AbstractPlatformTransactionManager,这个类实现了事务管理的整个逻辑关系流程,但是把与具体事务打交道的东西又定义为抽象方法让子类去实现(模板设计模式)。

2.1 事务管理器涉及的关键对象

事务定义 TransactionDefinition

这是一个接口,然而并没有需要实现的方法,接口中定义了spring事务支持的传播属性、隔离级别、超时时间、只读标记等事务属性的可选值,并制定了默认值。其派生接口为TransactionAttribute

事务对象 SmartTransactionObject

不同数据源框架定义的事务对象不同,SmartTransactionObject只是spring-tx包定义的接口标准,不同框架对其都有自己的实现,这里我们以DataSourceTransactionObject为例,它是jdbc定义的事务对象

DataSourceTransactionObjectDataSourceTransactionManager的内部类,看下面简单的类继承图

这个事务对象中最重要的成员是抽象类JdbcTransactionObjectSupport中定义的ConnectionHolder对象,它存储了该执行逻辑事务的数据库连接,也就是物理事务。

事务状态 TransactionStatus

TransactionStatus是一个接口,它继承了其他的三个接口:

常用的实现类是DefaultTransactionStatus,继承自AbstractTransactionStatus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DefaultTransactionStatus extends AbstractTransactionStatus {
@Nullable
private final Object transaction; // 事务对象
private final boolean newTransaction; // 标记--是否为新创建的事务
private final boolean newSynchronization; // 标记--是否开启了新事务的同步
private final boolean readOnly; // 标记--是否为只读事务
private final boolean debug;
@Nullable
private final Object suspendedResources; // 当事务被挂起后存放挂起事务状态
// 继承的属性
private boolean rollbackOnly = false; //事务是否只支持回滚
private boolean completed = false; // 事务是否已完成
@Nullable
private Object savepoint; // 保存点
}

重要的属性:

  • 持有一个事务对象的引用Object transaction,还记录了一些事务的状态信息
  • Object suspendedResources ,保存被挂起事务的状态,后文会详细分析
2.2 事务执行流程分析
2.2.1 aop动态代理阶段

当调用的方法注解了事务,调用之前会通过cglib产生一个代理对象,cglib的动态代理的关键是实现MethodInterceptor接口,找到这个类,它是CglibAopProxy的一个私有的内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// CglibAopProxy
private static class DynamicAdvisedInterceptor implements MethodInterceptor, Serializable {
private final AdvisedSupport advised;
public DynamicAdvisedInterceptor(AdvisedSupport advised) {
this.advised = advised;
}

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy)
throws Throwable{
...
retVal = new CglibMethodInvocation(proxy, target, method, args,
targetClass, chain, methodProxy).proceed();
}
}

intercept() 是spring aop 进行拦截的方法,spring事务基于aop实现,所以事务增强会被aop拦截,执行intercept方法

intercept方法中不会直接开启事务或关闭事务,因为事务不一定是方法上唯一的增强点(可能存在其他增强,比如记录审计日志、开启分布式锁等等),所以事务增强逻辑肯定是封装成了一个Advisor,交给aop统一管理。

那么我们就去找这个Advisor

上面的代码创建了CglibMethodInvocation对象后执行了proceed()方法,进入该方法发现他调用了父类ReflectiveMethodInvocationproceed(),这是一个反射方法调用器的执行过程,

由于在同一个切点可能会存在多个增强,所以动态代理也可能代理多次,每次代理都将增强的逻辑存放到一个容器中,在执行时逐个执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ReflectiveMethodInvocation
public Object proceed() throws Throwable {
// interceptorsAndDynamicMethodMatchers是拦截器链,同一个切点可能被多个advice拦截
// currentInterceptorIndex初始为-1,这条判断其实是若拦截器链中没有拦截器,直接调用invokeJoinpoint
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
// currentInterceptorIndex先加1,再获取拦截器,此处获取到的就是事务拦截器
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
...
}
else {
// 事务拦截会执行到这里
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}

跟代码可以看到获取到的拦截器是TransactionInterceptor类型的,中间的判断都不满足,直接运行到代码的最后一行,调用Invoke(this)

注意:TransactionInterceptor 实现的MethodInterceptor接口与上面cglib的不是同一个:

  • cglib的MethodInterceptor接口是为了让代理对象执行代理逻辑,需要实现的是intercept方法
  • 此处的MethodInterceptor接口是为了调用代理逻辑中的增强方法,需要实现的是invoke方法,因为增强逻辑是被封装过的。
2.2.2 TransactionInterceptor.invoke方法
1
2
3
4
5
6
7
8
9
// TransactionInterceptor
public Object invoke(MethodInvocation invocation) throws Throwable {
// 参数invocation就是上面创建的CglibMethodInvocation
// 获取被代理对象的类型
Class<?> targetClass = (invocation.getThis() != null ?
AopUtils.getTargetClass(invocation.getThis()) : null);
// 终于开始进入正题了,在事务中调用方法>>
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

invokeWithinTransaction方法是定义在父类TransactionAspectSupport中的,此方法代码有点长,这里节选一些关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
...
// 获取注解属性源,此处获取到的是AnnotationTransactionAttributeSource对象
TransactionAttributeSource tas = getTransactionAttributeSource();
// 从属性源中拿到事务的属性,RuleBasedTransactionAttribute对象
final TransactionAttribute txAttr = (tas != null ?
tas.getTransactionAttribute(method, targetClass) : null);
// 根据事务属性,创建事务管理器,如果没有配置事务管理器,自适应创建
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// com.pd.service.AccountService.transferAccount 就是方法的名称
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务,开启事务 >>
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

Object retVal;
try {
// 调用方法逻辑
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 目标方法抛出异常,结束事务。可能回滚,可能啥也不做
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清理事务现场
cleanupTransactionInfo(txInfo);
}
...
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
...
}

在这里找到事务创建的方法createTransactionIfNecessary,跟进方法:

DelegatingTransactionAttributeTransactionAttribute的代理实现类,这里用静态代理。为什么要用代理类?因为TransactionAttribute是接口,不能实例化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 事务如果没有指定名称,用之前生成的方法切入点id作为事务名称
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 就是在这里创建事务开启事务了,事务的属性传进去
status = tm.getTransaction(txAttr);
}
... // 日志操作
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

跟进方法getTransaction,该方法将返回一个TransactionStatus类型的结果,这个叫事务状态的返回结果持有创建的事务对象(逻辑事务),以及该事务对象的一些状态信息。

那我们来看一下这个获取事务的方法的整一个流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
TransactionDefinition def = (definition != null ? definition :
TransactionDefinition.withDefaults());
// doGetTransaction()从当前上下文中获取事务对象,这是一个抽象方法,子类实现 >>
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
//若被调用的事务方法已处于事务之中(事务方法A调用事务方法B)
if (isExistingTransaction(transaction)) {
// 根据当前事务的传播属性,来决定下一步处理 >>
return handleExistingTransaction(def, transaction, debugEnabled);
}

//-----------------------当前还没有事务会往下走-----------------------
// 检查超时设置是否合理
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 若配置的事务传播属性是MANDATORY(该配置表示当前存在事务则加入,否则),抛异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, def); // 开启一个新事务
prepareSynchronization(status, def);
return status;
}
...
}else
...
}

在分析以上代码之前,首先需要明白几个概念,最终要搞清楚几个问题:

  1. 物理事务就是到数据库的一个物理链接(数据库连接),那么这个链接是什么时候获取的?建立连接后又是如何保存起来的?
  2. 逻辑事务就是一个带有spring事务注解的方法,它需要关联到一个物理事务上。那它们之间如何进行关联?
  3. 多个逻辑事务可以映射到一个物理事务上,逻辑事务是各自提交的,如何处理逻辑事务提交和物理事务提交间的关系呢,至少所有的逻辑事务都提交了才可以提交物理事务。

带着问题来分析获取事务的方法:

2.2.3 doGetTransaction

接下来的分析都是以数据源事务管理器为列进行的,首先来看doGetTransaction()这个方法,这个方法首先会创建一个逻辑事务对象DataSourceTransactionObject

1
2
3
4
5
6
7
8
9
10
11
12
13
// DataSourceTransactionManager
@Override
protected Object doGetTransaction() {
// 创建事务对象(逻辑事务)
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 从线程私有变量中获取物理事务,没有事务嵌套,获取到的是null
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 将获取到的物理事务 绑定给 逻辑事务对象 false表示这个物理事务不是新创建的
txObject.setConnectionHolder(conHolder, false);
return txObject;
}

逻辑事务对象中到底有哪些内容呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {
// 新连接标志,当事务对象新获取一个数据库连接是,设为true
private boolean newConnectionHolder;
// 当数据库连接开启自动提交,逻辑事务关闭它之前,会先保存这个状态
private boolean mustRestoreAutoCommit;

/*------------- 继承到的属性----------------*/
@Nullable // 数据库连接封装类对象
private ConnectionHolder connectionHolder;
@Nullable // 隔离级别
private Integer previousIsolationLevel;
//是否允许设置保存点
private boolean savepointAllowed = false;
}

数据源事务对象中最重要的成员变量是就是ConnectionHolder,它是数据库连接的封装类,创建逻辑事务时,它为null。也就是说逻辑事务创建的时候并没有关联到物理事务(数据库连接)

现在回到doGetTransaction方法,接下来的关键点是尝试获取线程中已经获得的物理事务,在事务同步管理器TransactionSynchronizationManager中,有几个线程私有的变量(ThreadLocal):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class TransactionSynchronizationManager {
// 存储事务资源 map的key是数据源DataSource,value是ConnectionHolder(内含数据库连接)
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
}
  • 事务性资源是存储在Map<Object, Object>里,key就是DataSource对象,value就是ConnectionHolder对象,那么是怎么知道key和value的类型的呢?往下看会看到map的put操作

  • 事务同步这个集合Set,只有在多个数据源的分布式事务时才使用。

  • 剩下的是四个和事务相关的变量,事务名称/是否只读/隔离级别/是否激活。

此处重点关注叫resources 的ThreadLocal变量,该变量一个Map,key是DataSource对象,value是ConnectionHolder对象,那么这就意味着每个线程在同一个数据源上只能获取一个连接绑定到这个变量中

TransactionSynchronizationManager.getResource(obtainDataSource())这一句执行完如果能够获取到一个连接说明什么?说明当前已经有一个逻辑事务获取了同一个数据源的连接并绑定到了map中,说明现在正在获取的逻辑事务在另一个逻辑事务之中。

如果没有事务嵌套的话,这一句代码执行获取到的肯定是null。


2.2.4 handleExistingTransaction

获取事务对象之后,继续往下将执行isExistingTransaction()方法,该方法判断获取到的事务对象是否已经存在

1
2
3
4
5
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() &&
txObject.getConnectionHolder().isTransactionActive());
}

这里主要是检查事务对象中是否设置了ConnectionHolder,并判断该物理事务是不是已经被激活。

如果此方法返回true,表明当前获取到的物理事务已经存在,从而表明此处肯定存在逻辑事务的嵌套,这时候应该根据配置的事务传播属性来进行处理,handleExistingTransaction处理逻辑如下:

  • 如果此时事务传播特性是NEVER,则抛出异常。(never表示以非事务的方式执行,存在事务则抛异常

  • 如果此时事务的传播特性是NOT_SUPPORTED,则调用suspend(transaction)挂起当前事务,将被挂起的资源suspendedResources放入事务状态里。(not_supported表示以非事务方式运行,若存在事务则挂起事务

  • 如果此时事务状态是REQUIRES_NEW,则调用suspend(transaction)挂起当前事务,将事务对象transaction和被挂起的资源suspendedResources放入事务状态里。然后调用doBegin(transaction, definition)方法去真正打开事务。最后调用prepareSynchronization(status, definition)方法准备一下事务同步。

  • 如果此时事务的传播特性是NESTED,又分三种情况:

    • 如果不允许嵌套事务,直接抛出异常。
    • 如果使用保存点(Savepoint)来实现嵌套事务,那直接使用当前事务,创建一个保存点就可以了。
    • 如果使用新的事务来实现嵌套事务,那就调用doBegin(transaction, definition)开启新的事务,此时不需要挂起当前事务。
  • 对于剩下三种传播特性REQUIRED/MANDATORY/SUPPORTS,则不需要创建新事务,直接使用当前事务就可以了。


2.2.5 dobegin()

上面也分析了,在首次执行事务方法时,事务肯定是不存在的,因为从线程的ThreadLocal里没有取出ConnectionHolder对象。那此时就要新开一个物理事务,新开物理事务是在doBegin()方法中进行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try { // 若事务对象没有绑定物理资源 或者 持有的物理资源还没有和事务对象同步
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 创建新连接,封装成物理资源赋值给事务对象
Connection newCon = obtainDataSource().getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel =
DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 若物理连接开启了自动提交(默认开启)
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true); // 存储物理连接的自动提交状态
con.setAutoCommit(false); // 关闭物理连接自动提交
}
prepareTransactionalConnection(con, definition);
// 激活物理事务
txObject.getConnectionHolder().setTransactionActive(true);
...
// 将物理事务绑定到ThreadLocal变量,这里是map的put操作
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource
(obtainDataSource(), txObject.getConnectionHolder());
}
}catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException(
"Could not open JDBC Connection for transaction", ex);
}
}

新事务开启之后,继续回到getTransaction()方法,这个方法返回的是一个事务状态对象,继续看返回值创建的代码:

1
2
3
4
5
6
7
8
9
10
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}

这个对象有很重要的三点:

  1. 它需要包含逻辑事务对象(已关联了物理事务)。

  2. 它需要表明这个事务是一个新开启的物理事务,还是参与到已有的物理事务。

  3. 它需要包含被挂起的(上一个)物理事务连接(如果有的话)。

2.3 物理事务与逻辑事务的对应关系
  • 当事务A调用事务B并且传播属性允许两个逻辑事务合并时,用一个物理事务执行就可以执行两个逻辑事务,此时物理事务与逻辑事务是1对多的关系。

  • 事务A中调用事务B,B的传播属性为Propagation.REQUIRES_NEW,物理事务与逻辑事务是1对1的关系

  • 不存在1个逻辑事务对应多个物理事务的情况。
2.4 事务的挂起

在一些事务的传播模式下,可能会挂起当前的事务,举例:

事务A中调用事务B,B的传播属性为Propagation.REQUIRES_NEW,这时候需要将事务A挂起,创建事务B运行,创建事务B的时候会将挂起的事务A资源放到事务B的TransactionStatus对象中,事务B结束(提交或回滚)之后,恢复事务A继续执行

挂起做了哪些工作?

1
2
3
4
5
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
  • 将事务对象中的ConnectionHolder清空
  • 将ThreadLocal中的数据源对应的ConnectionHolder清除,并返回被清除的ConnectionHolder
  • 将返回的ConnectionHolder封装成SuspendedResourcesHolder,保存到创建的新事务对象中

为什么要这么做?

ThreadLocal资源虽然是一个map,但是这个map的数据源是key,这意味着每个线程在同一个数据源上只能使用一个连接(可以持有多个连接,但同一时间只能使用一个,单线程)。事务A在挂起的时候将物理连接保存起来,然后事务B再重新获取一个连接执行操作,事务B结束之后,再取出事务A对应的物理资源,继续执行事务A。

因此可以得出结论:逻辑事务没有结束之前,对应的物理事务是不能执行其他事务操作的,物理事务上应该是保存了事务的状态信息(未提交),不能串着用

https://mp.weixin.qq.com/s/i0QmrEDZ6aTsZFFahYC_ow

https://mp.weixin.qq.com/s/sysm3AY7PG9MV9584UTasg