前言

  • 在实际项目开发过程中经常会遇到需要事务提交后处理逻辑,比如保存后执行发消息逻辑,或者通知下游,针对此类情况Spring提供了TransactionSynchronizationManager来帮我们实现这个功能

正文

场景使用

1、下面可以可以看到TransactionSynchronizationManager提供了注册同步代码逻辑方法,重写afterCommit即可完成方法逻辑的实现

1
2
3
4
5
6
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
LOGGER.warn("事务后执行逻辑");
}
});

2、既然知道怎么用了,我们也可以举一反三,如果是事务提交前要额外处理某些事呢,我们来看看这个TransactionSynchronization对象的接口定义,可以看到也提供了很多方法定义,从方法名可以看出来如果你想在事务执行周期节点上做一些自己的额外逻辑,只要实现对应方法即可

原理解析

  • 上面是知道怎么用,那具体逻辑是怎么实现的呢,通过对spring源码的通用套路,我们可以猜测其实就是在事务的各个节点中进行埋雷,当程序执行到某个逻辑就会触发想要的逻辑

  • 我们进入到TransactionSynchronizationManager.registerSynchronization,可以看到其实就是内部类立马定义了一个线程本地变量,里面有个Set集合来存放我们的额外逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");


/**
* Register a new transaction synchronization for the current thread.
* Typically called by resource management code.
* <p>Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
*/
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {

Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchronizations.get().add(synchronization);
}
  • 对这个synchronizations变量进行方法调用引用,可以看到再包装了一下Collections.unmodifiableList,哈哈防止被别人替换了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Return an unmodifiable snapshot list of all registered synchronizations
* for the current thread.
* @return unmodifiable List of TransactionSynchronization instances
* @throws IllegalStateException if synchronization is not active
* @see TransactionSynchronization
*/
public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
// Return unmodifiable snapshot, to avoid ConcurrentModificationExceptions
// while iterating and invoking synchronization callbacks that in turn
// might register further synchronizations.
if (synchs.isEmpty()) {
return Collections.emptyList();
}
else {
// Sort lazily here, not in registerSynchronization.
List<TransactionSynchronization> sortedSynchs = new ArrayList<TransactionSynchronization>(synchs);
AnnotationAwareOrderComparator.sort(sortedSynchs);
return Collections.unmodifiableList(sortedSynchs);
}
}
  • getSynchronizations() 调用引用可以看到trigger雷的节点
  • org.springframework.transaction.support.TransactionSynchronizationUtils#invokeAfterCommit,逻辑很简单就是拿到雷后依次踩一下
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Actually invoke the {@code afterCommit} methods of the
* given Spring TransactionSynchronization objects.
* @param synchronizations List of TransactionSynchronization objects
* @see TransactionSynchronization#afterCommit()
*/
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
  • 下面是org.springframework.transaction.support.AbstractPlatformTransactionManager#processCommit执行事务的过程,注意trigger方法开头的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* Process an actual commit.
* Rollback-only flags have already been checked and applied.
* @param status object representing the transaction
* @throws TransactionException in case of commit failure
*/
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}

// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 事务执行后
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}

}
finally {
cleanupAfterCompletion(status);
}
}

异常场景

在实际项目中还会遇到这样一种场景就是事务提交后触发额外逻辑,但这个额外逻辑又开启了一个新事务,新事务里面再次registerSynchronization会无效

  • 我们来看看里面的代码逻辑,里面有这样一个判断status.isNewSynchronization(),里面有个逻辑是就是判断一个boolean值,是否是新的同步

org.springframework.transaction.support.AbstractPlatformTransactionManager#triggerAfterCommit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final boolean newSynchronization;

/**
* Return if a new transaction synchronization has been opened
* for this transaction.
*/
public boolean isNewSynchronization() {
return this.newSynchronization;
}

/**
* Trigger {@code afterCommit} callbacks.
* @param status object representing the transaction
*/
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering afterCommit synchronization");
}
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
  • 继续跟踪这个变量的取值,这里第一个事务进来的时候synchronizations.get()是为空的,那么newSynchronization则为true,当第二个事务进来的时候synchronizations.get()是不为空的,所以这里isNewSynchronization则为false,所以就不会执行trigger方法了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Create a TransactionStatus instance for the given arguments.
*/
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, Object suspendedResources) {

boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();

return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}

/**
* Return if transaction synchronization is active for the current thread.
* Can be called before register to avoid unnecessary instance creation.
* @see #registerSynchronization
*/
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}

问题总结

  • 虽然synchronizations集合会累加同一个线程不同事务的回调对象,但是当第一个事务未回调完成前,这里synchronizations集合还是没被清空的,开启第二个事务时status.isNewSynchronization()都是false

解决方案

  • 将当前的回调方法都添加到本地线程集合中,后续回调均按此集合顺序执行回调逻辑,TransactionCommitExecutor为单例Bean,这样不论注册多少次,均为一个实例对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 事务提交后逻辑
* 1、关于在第一个事务的afterCommit回调方法中开启新事务并再次registerSynchronization无效原因
* 2、old:
* TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
* @Override
* public void afterCommit() {
* LOGGER.warn("doInsert.afterCommit回调完成");
* }
* });
* @author songshuiyang
* @date 2022/12/16 11:08
*/
@Component
public class TransactionCommitExecutor extends TransactionSynchronizationAdapter implements Executor {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCommitExecutor.class);

private static final ThreadLocal<List<Runnable>> THREAD_LOCAL_RUNNABLE = new ThreadLocal<List<Runnable>>();

@Override
public void execute(Runnable runnable) {
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
runnable.run();
return;
}
List<Runnable> threadRunnables = THREAD_LOCAL_RUNNABLE.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<Runnable>();
THREAD_LOCAL_RUNNABLE.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}

@Override
public void afterCommit() {
List<Runnable> threadRunnables = THREAD_LOCAL_RUNNABLE.get();
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
try {
runnable.run();
} catch (RuntimeException e) {
LOGGER.error("AfterCommit to execute runnable fail " + runnable, e);
}
}
}

@Override
public void afterCompletion(int status) {
THREAD_LOCAL_RUNNABLE.remove();
}


}