前言

  • 我们知道在JDBC中处理事务,都是通过Connection完成的,同一事务中所有的操作,都在使用同一个Connection对象,Connection的三个方法与事务有关,万变不离其宗,Spring的事务也是基于JDBC来实现的,那么Spring是怎么来写这些代码的呢,这就是本章要介绍的内容。
1
2
3
4
5
6
7
8
9
10
11
try {
// 设置是否为自动提交事务,如果true(默认值为true)表示自动提交,也就是每条执行的SQL语句都是一个单独的事务,
// 如果设置为false,那么相当于开启了事务了;con.setAutoCommit(false) 表示开启事务。
con.setAutoCommit(false);
......
// 提交事务
con.commit();
} catch() {
// 回滚事务
con.rollback();
}
  • 上一章节介绍了<tx:annotation-driven>标签是开启事务的开关,配置了这个就可以使用注解@Transactional来开启事务了,下面通过一个例子看看Spring事务的执行过程

  • 下面的updateUserByRuntimeException()方法添加了@Transactional注解,这个方法先是插入一条记录,然后再更新另一条记录,如果没有开启事务的话,会执行第一条sql语句,第二条sql语句不会执行

1
2
3
4
5
6
7
8
9
10
11
12
// org.springframework.iframe.service.impl.UserServiceImpl

@Transactional
public void updateUserByRuntimeException (IUser iUser) throws NullPointerException{
log.info("开启事务");
if (userMapper.insertSelective(iUser) == 1) {
throw new NullPointerException("运行时异常");
}
IUser iUser2 = userMapper.selectByPrimaryKey(1);
iUser2.setAge(iUser2.getAge() + 1);
userMapper.updateByPrimaryKeySelective(iUser2);
}
  • 测试类及配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// org.springframework.iframe.test.transaction.TransactionTests

@Slf4j
public class TransactionTests {

private final ClassPathXmlApplicationContext xmlApplicationContext = new ClassPathXmlApplicationContext("beans/applicationContext.xml");

/**
* 运行时异常事务处理
* 事务会回滚
*/
@Test
public void runtimeExceptionTransactionTest() throws Exception {
UserService userService = xmlApplicationContext.getBean(UserService.class);
IUser iUser = new IUser();
iUser.setUsername("运行时异常 事务会回滚");
iUser.setAge(1);
userService.updateUserByRuntimeException(iUser);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// beans/applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

<!-- ComponentScanBeanDefinitionParser-->
<context:component-scan base-package = "org.springframework.iframe.*"/>

<aop:aspectj-autoproxy/>

<import resource="applicationContext-dao.xml"/>

</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// beans/applicationContext-dao.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 数据库连接池 -->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/iframe?useUnicode=true&amp;characterEncoding=UTF-8"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="initialSize" value="1"/>
<property name="minIdle" value="1"/>
<property name="maxActive" value="20"/>
<property name="maxWait" value="60000"/>
<property name="timeBetweenEvictionRunsMillis" value="60000"/>
<property name="minEvictableIdleTimeMillis" value="300000"/>
<property name="validationQuery" value="SELECT 1 FROM DUAL"/>
<property name="testWhileIdle" value="true"/>
<property name="testOnBorrow" value="false"/>
<property name="testOnReturn" value="false"/>
<property name="poolPreparedStatements" value="true"/>
<property name="maxPoolPreparedStatementPerConnectionSize" value="20"/>
<property name="filters" value="stat,wall,log4j"/>
<property name="connectionProperties">
<value>clientEncoding=UTF-8</value>
</property>
</bean>

<bean id="sqlSession" class="org.mybatis.spring.SqlSessionTemplate">
<constructor-arg index="0" ref="sqlSessionFactory" />
<constructor-arg index="1" value="BATCH" />
</bean>

<!-- 配置SqlSessionFactory对象 -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="configLocation" value="classpath:mybatis-config.xml"/>
<property name="typeAliasesPackage" value="org.springframework.iframe.entity"/>
<property name="mapperLocations" value="classpath:mapper/*.xml"/>
</bean>

<!-- 配置扫描Dao接口包,动态实现Dao接口,注入到spring容器中 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
<property name="basePackage" value="org.springframework.iframe.mapper"></property>
</bean>

<!-- (事务管理)transaction manager, use JtaTransactionManager for global tx -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>

<tx:annotation-driven proxy-target-class="false" transaction-manager="transactionManager" />
</beans>

调试

  • 现在来执行测试类,执行UserService userService = xmlApplicationContext.getBean(UserService.class);见下图可以看到userServiceJdkDynamicAopProxy对象,由此可以得到Spring事务是通过AOP来实现的,由AOP来完成事务方法的织入及执行
  • 进入userService.updateUserByRuntimeException(iUser);方法,会跳到JdkDynamicAopProxy类的invoke()方法,这些逻辑和我们之前介绍AOP章节的内容是一致的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
* Implementation of {@code InvocationHandler.invoke}.
* <p>Callers will see exactly the exception thrown by the target,
* unless a hook method throws an exception.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
MethodInvocation invocation;
Object oldProxy = null;
boolean setProxyContext = false;

TargetSource targetSource = this.advised.targetSource;
Class<?> targetClass = null;
Object target = null;

try {
// 如果被代理的目标对象要执行的方法是equal则执行JdkDynamicAopProxy(即代理对象的equal)方法
if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
// The target does not implement the equals(Object) method itself.
return equals(args[0]);
}
// 如果被代理的目标对象要执行的方法是hashCode则执行JdkDynamicAopProxy(即代理对象的hashCode)方法
else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
// The target does not implement the hashCode() method itself.
return hashCode();
}
else if (method.getDeclaringClass() == DecoratingProxy.class) {
// There is only getDecoratedClass() declared -> dispatch to proxy config.
return AopProxyUtils.ultimateTargetClass(this.advised);
}
else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
method.getDeclaringClass().isAssignableFrom(Advised.class)) {
// Service invocations on ProxyConfig with the proxy config...
return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
}

Object retVal;
// 有时候目标对象内部的自我调用将无法实施切面中的增强则需要通过此属性暴露代理
if (this.advised.exposeProxy) {
// Make invocation available if necessary.
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}

// May be null. Get as late as possible to minimize the time we "own" the target,
// in case it comes from a pool.
target = targetSource.getTarget();
if (target != null) {
targetClass = target.getClass();
}

// Get the interception chain for this method.
// 获取当前方法的拦截器链
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

// Check whether we have any advice. If we don't, we can fallback on direct
// reflective invocation of the target, and avoid creating a MethodInvocation.
if (chain.isEmpty()) {
// We can skip creating a MethodInvocation: just invoke the target directly
// Note that the final invoker must be an InvokerInterceptor so we know it does
// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
// 如果没有发现任何拦截器那么直接调用切点方法
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
// We need to create a method invocation...
// 将拦截器封装在ReflectiveMethodInvocation,以便于使用其proceed进行链接表用拦截器
invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
// Proceed to the joinpoint through the interceptor chain.
// 执行拦截器链
retVal = invocation.proceed();
}

// Massage return value if necessary.
Class<?> returnType = method.getReturnType();
if (retVal != null && retVal == target &&
returnType != Object.class && returnType.isInstance(proxy) &&
!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
// Special case: it returned "this" and the return type of the method
// is type-compatible. Note that we can't help if the target sets
// a reference to itself in another returned object.
retVal = proxy;
}
// 返回结果
else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
return retVal;
}
finally {
if (target != null && !targetSource.isStatic()) {
// Must have come from TargetSource.
targetSource.releaseTarget(target);
}
if (setProxyContext) {
// Restore old proxy.
AopContext.setCurrentProxy(oldProxy);
}
}
}
  • 上面要注意的是List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);,获取当前方法的拦截器链,由下图可以看到这里得到的是TransactionInterceptor事务拦截器,是不是有些眉目了
  • 得到拦截器之后就是执行invocation.proceed();方法了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
// 执行完所有增强后执行切点方法,从索引为-1的拦截器开始,并递增,如果拦截器迭代调用完成,则调用目标方法
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
// 获取下一个要执行的拦截器 沿着拦截器链执行
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
// 对方法进行动态匹配,切点的匹配就在这里进行
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
// 不匹配则跳过这个拦截器调用下一个
return proceed();
}
}
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
// 这是一个拦截器,直接调用它,将this作为参数传递以保证当前实例中调用链的执行
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}
  • 进入((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this)方法执行拦截器的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// org.springframework.transaction.interceptor.TransactionInterceptor#invoke

@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
  • 执行invokeWithinTransaction()进入到TransactionInterceptor的父类中TransactionAspectSupport,下面的方法可以说是Spring事务的核心代码了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction

/**
* General delegate for around-advice-based subclasses, delegating to several other template
* methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
* as well as regular {@link PlatformTransactionManager} implementations.
* @param method the Method being invoked
* @param targetClass the target class that we're invoking the method on
* @param invocation the callback to use for proceeding with the target invocation
* @return the return value of the method, if any
* @throws Throwable propagated from the target invocation
*/
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {

// If the transaction attribute is null, the method is non-transactional.
// 获取对应的事务属性
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// 获取beanFactory中的transactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 构造方法唯一标识(service.UserServiceImpl.save)
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

// 声明式事务处理
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 创建事物 创建TransactionInfo 完成了目标方法运行前的事务准备工作
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 方法调用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 回滚事务 注意:只对RuntimeException回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}

else {
// 编程式事务处理
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
return new ThrowableHolder(ex);
}
}
finally {
cleanupTransactionInfo(txInfo);
}
}
});

// Check result: It might indicate a Throwable to rethrow.
if (result instanceof ThrowableHolder) {
throw ((ThrowableHolder) result).getThrowable();
}
else {
return result;
}
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}
  • 上面的逻辑分为声明式事务处理及编程式事务处理,我们这里关注声明式事务处理,由上面可以看到主要逻辑分为下面几点,是不是有些上面JDBC事务代码的影子了
    • 1、创建事务
    • 2、方法调用
    • 3、如有异常回滚事务
    • 4、提交事务

1、创建事务

  • TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);这行代码是创建事务的逻辑,可以看到是封装了一个TransactionInfo对象,现在先进入createTransactionIfNecessary()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

// If no name specified, apply method identification as transaction name.
// 如果没有名称指定则使用方法唯一标识,并使用DelegatingTransactionAttribute封装txAttr
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 核心 获取TransactionStatus 这里有建立事务连接
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 根据指定的属性与status准备一个TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
  • 关注status = tm.getTransaction(txAttr);方法,这里是建立事务连接关键
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction

@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();

// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();

if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}

// 判断当前线程是否存在事务,判断依据为当前线程记录的连接不为空且连接中(connectionHolder)中的transactionActive属性不为空
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// 当前线程已经存在事务
return handleExistingTransaction(definition, transaction, debugEnabled);
}

// Check definition settings for new transaction.
// 事务超时设置验证
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}

// No existing transaction found -> check propagation behavior to find out how to proceed.
// 如果当前线程不存在事务,propagationBehavior声明为PROPAGATION_MANDATORY 抛异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 需要新建事务
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
/**
* 构建transaction ,包括设置ConnectionHolder、隔离级别、timout
*/
doBegin(transaction, definition);
// 新同步事务的设置,针对与当前线程的设置,将事务信息记录在当前线程中
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
  • 进入doBegin(transaction, definition);方法,可以看到Connection newCon = this.dataSource.getConnection();这里获取了JDBCConnection,然后就是通过con.setAutoCommit(false);来开启事务了,获取数据库连接之后就是通过TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());来将连接绑定到当前线程中(使用ThreadLocal来实现)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 构建transaction,包括设置ConnectionHolder、隔离级别、timeout
* 如果是新连接,绑定到当前线程,这个函数已经开始尝试了对数据库连接的获取
*
* This implementation sets the isolation level but ignores the timeout.
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
// 尝试获取连接,当然并不是每次都获取新的连接,如果当前线程中ConnectionHolder已经存在,则不需要再次获取
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

// 设置隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);

// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 更改Connection自动提交设置,由Spring控制提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}

prepareTransactionalConnection(con, definition);
// 设置判断当前线程是否存在事务的依据
txObject.getConnectionHolder().setTransactionActive(true);

// 设置过期时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
// 将当前获取到的连接绑定到当前线程
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}

catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

2、方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建事物 创建TransactionInfo 完成了目标方法运行前的事务准备工作
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 方法调用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 回滚事务 注意:只对RuntimeException回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
  • 上面获取了TransactionInfo之后就是来执行方法了,进入invocation.proceedWithInvocation();,可以看到又回到了上面的org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction的方法中,这里执行的是invocation.proceed();方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
  • 进入invocation.proceed();方法,又回到了ReflectiveMethodInvocation#proceed方法,只不过现在是执行的invokeJoinpoint();方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// org.springframework.aop.framework.ReflectiveMethodInvocation#proceed

public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
// 执行完所有增强后执行切点方法,从索引为-1的拦截器开始,并递增,如果拦截器迭代调用完成,则调用目标方法
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
// 获取下一个要执行的拦截器 沿着拦截器链执行
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
// 对方法进行动态匹配,切点的匹配就在这里进行
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
// 不匹配则跳过这个拦截器调用下一个
return proceed();
}
}
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
// 这是一个拦截器,直接调用它,将this作为参数传递以保证当前实例中调用链的执行
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
  • 进入invokeJoinpoint();方法,下面的使用了反射(method.invoke(target, args);)执行了我们的org.springframework.iframe.service.impl.UserServiceImpl#updateUserByRuntimeException方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected Object invokeJoinpoint() throws Throwable {
return AopUtils.invokeJoinpointUsingReflection(this.target, this.method, this.arguments);
}

public static Object invokeJoinpointUsingReflection(Object target, Method method, Object[] args)
throws Throwable {

// 使用反射执行方法
try {
ReflectionUtils.makeAccessible(method);
return method.invoke(target, args);
}
catch (InvocationTargetException ex) {
// Invoked method threw a checked exception.
// We must rethrow it. The client won't see the interceptor.
throw ex.getTargetException();
}
catch (IllegalArgumentException ex) {
throw new AopInvocationException("AOP configuration seems to be invalid: tried calling method [" +
method + "] on target [" + target + "]", ex);
}
catch (IllegalAccessException ex) {
throw new AopInvocationException("Could not access method [" + method + "]", ex);
}
}
  • 进入org.springframework.iframe.service.impl.UserServiceImpl#updateUserByRuntimeException方法,这里插入一条记录之后就抛出一个异常,之后的逻辑就是下面的了
1
2
3
4
5
6
7
8
9
10
11
@Override
@Transactional
public void updateUserByRuntimeException (IUser iUser) throws NullPointerException{
log.info("开启事务");
if (userMapper.insertSelective(iUser) == 1) {
throw new NullPointerException("运行时异常");
}
IUser iUser2 = userMapper.selectByPrimaryKey(1);
iUser2.setAge(iUser2.getAge() + 1);
userMapper.updateByPrimaryKeySelective(iUser2);
}

3、如有异常回滚事务

  • 上面的throw new NullPointerException("运行时异常");抛出了一个空指针异常,所以被我们下面的逻辑给catch到了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 创建事物 创建TransactionInfo 完成了目标方法运行前的事务准备工作
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 方法调用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 回滚事务 注意:只对RuntimeException回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
  • 关注completeTransactionAfterThrowing(txInfo, ex);方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing

/**
* Handle a throwable, completing the transaction.
* We may commit or roll back, depending on the configuration.
* @param txInfo information about the current transaction
* @param ex throwable encountered
*/
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
// 当抛出异常时首先判断当前是否存在事务,这是基础依据
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 这里判断是否回滚默认的依据是抛出的异常是否是 (ex instanceof RuntimeException || ex instanceof Error),我们熟悉的Exception默认是不处理的
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 核心 根据TransactionStatus信息进行回滚处理
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
}
else {
// 如果不满足回滚条件即使抛出异常也同样会提交
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by commit error", ex);
throw err;
}
}
}
}
  • 下面先判断当前是否存在事务,可以看到Spring的严谨,然后就是判断抛出的异常是否是RuntimeException及Error,之后就是rollback

    • 判断抛出的异常是否是RuntimeException及Error

      • 进入txInfo.transactionAttribute.rollbackOn(ex)方法一直进入直到看到下面的代码,可以看到这里的异常判断只处理了RuntimeException及Error异常,另一个编译型异常Exception是不处理的,这个在开发过程中需要特别注意
        1
        2
        3
        4
        5
        ...      

        public boolean rollbackOn(Throwable ex) {
        return (ex instanceof RuntimeException || ex instanceof Error);
        }
    • rollback

      • 进入txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());方法之后一直进入直到进入下面的代码,可以看到这里使用txObject.getConnectionHolder().getConnection();获取了我们之前新建的Connection,得到之后就是执行con.rollback();方法了
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        // org.springframework.jdbc.datasource.DataSourceTransactionManager#doRollback

        @Override
        protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
        logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
        }
        try {
        con.rollback();
        }
        catch (SQLException ex) {
        throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
        }

4、提交事务

  • 上面可以知道如果有异常的话是执行completeTransactionAfterThrowing(txInfo, ex);方法进行事务回滚,然后抛异常之后的逻辑都不执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 创建事物 创建TransactionInfo 完成了目标方法运行前的事务准备工作
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 方法调用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 回滚事务 注意:只对RuntimeException回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
  • 如果没有异常的话会执行commitTransactionAfterReturning(txInfo);方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

// 进入 org.springframework.transaction.support.AbstractPlatformTransactionManager#commit

@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus);
// Throw UnexpectedRollbackException only at outermost transaction boundary
// or if explicitly asked to.
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
return;
}
// 事务提交
processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
// 预留口子方法执行额外逻辑
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);

beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
// 如果存在保存点则清除保存点信息
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
// 如果时独立事务直接提交
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 提交过程中出现异常则回滚
doRollbackOnCommitException(status, err);
throw err;
}

// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}

}
finally {
cleanupAfterCompletion(status);
}
}
  • 关注核心方法doCommit(status);这里是提交事务的关键,当然如果提交事务的时候发生了异常也会执行回滚操作,由下图可以看到Spring由那些类实现了此方法
  • 我们这里进入的是org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit方法,终于看到我们熟悉的con.commit();方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit

@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
// 提交事务
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
  • 在此Spring事务的执行完成!!!

其他

  • 为什么在 Spring 事务中不能切换数据源?
    • 在 Spring 的事务管理中所使用的数据库连接会和当前线程所绑定,即使我们设置了另外一个数据源,使用的还是当前的数据源连接。
  • 什么是事务的传播级别?分成哪些传播级别?

    • 事务的传播行为,指的是当前带有事务配置的方法,需要怎么处理事务。
      • 例如:方法可能继续在现有事务中运行,也可能开启一个新事务,并在自己的事务中运行。
      • 有一点需要注意,事务的传播级别,并不是数据库事务规范中的名词,而是 Spring 自身所定义的。通过事务的传播级别,Spring 才知道如何处理事务,是创建一个新事务呢,还是继续使用当前的事务。
    • TransactionDefinition 接口中,定义了三类七种传播级别。代码如下

      • TransactionDefinition.java

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        // TransactionDefinition.java

        // ========== 支持当前事务的情况 ==========

        /**
        * 如果当前存在事务,则使用该事务。
        * 如果当前没有事务,则创建一个新的事务。
        */
        int PROPAGATION_REQUIRED = 0;
        /**
        * 如果当前存在事务,则使用该事务。
        * 如果当前没有事务,则以非事务的方式继续运行。
        */
        int PROPAGATION_SUPPORTS = 1;
        /**
        * 如果当前存在事务,则使用该事务。
        * 如果当前没有事务,则抛出异常。
        */
        int PROPAGATION_MANDATORY = 2;

        // ========== 不支持当前事务的情况 ==========

        /**
        * 创建一个新的事务。
        * 如果当前存在事务,则把当前事务挂起。
        */
        int PROPAGATION_REQUIRES_NEW = 3;
        /**
        * 以非事务方式运行。
        * 如果当前存在事务,则把当前事务挂起。
        */
        int PROPAGATION_NOT_SUPPORTED = 4;
        /**
        * 以非事务方式运行。
        * 如果当前存在事务,则抛出异常。
        */
        int PROPAGATION_NEVER = 5;

        // ========== 其他情况 ==========

        /**
        * 如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行。
        * 如果当前没有事务,则等价于 {@link TransactionDefinition#PROPAGATION_REQUIRED}
        */
        int PROPAGATION_NESTED = 6;
      • 分类之后,其实还是比较好记的。当然,绝大数场景,我们只用 PROPAGATION_REQUIRED 传播级别。

        总结

  • 事务说白了就是“绑架”一个数据库连接,更改数据库的自动提交功能,使用手动提交机制。Spring把这一系列操作属性及数据库连接放到了ThreadLocal中,而ThreadLocalkey为“当前线程的值”,可以说一个线程一个连接一个事务`
  • 声明式事务管理建立在AOP之上的,其本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务
  • 默认情况下Spring中的事务处理只对RuntimeException运行时异常进行回滚,对Exception编译性异常不会进行回滚
  • 通过使用声明式事务,使业务代码和事务管理的逻辑分离,更加清晰。

参考