作者:京東零售 范錫軍
1、引言
spring的spring-tx模塊提供了對(duì)事務(wù)管理支持,使用spring事務(wù)可以讓我們從復(fù)雜的事務(wù)處理中得到解脫,無(wú)需要去處理獲得連接、關(guān)閉連接、事務(wù)提交和回滾等這些操作。
spring事務(wù)有編程式事務(wù)和聲明式事務(wù)兩種實(shí)現(xiàn)方式。編程式事務(wù)是通過(guò)編寫代碼來(lái)管理事務(wù)的提交、回滾、以及事務(wù)的邊界。這意味著開(kāi)發(fā)者需要在代碼中顯式地調(diào)用事務(wù)的開(kāi)始、提交和回滾。聲明式事務(wù)是通過(guò)配置來(lái)管理事務(wù),您可以使用注解或XML配置來(lái)定義事務(wù)的邊界和屬性,而無(wú)需顯式編寫事務(wù)管理的代碼。
下面我們逐步分析spring源代碼,理解spring事務(wù)的實(shí)現(xiàn)原理。
2、編程式事務(wù)
2.1 使用示例
// transactionManager是某一個(gè)具體的PlatformTransactionManager實(shí)現(xiàn)類的對(duì)象 private PlatformTransactionManager transactionManager; // 定義事務(wù)屬性 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); // 獲取事務(wù) TransactionStatus status = transactionManager.getTransaction(def); try { // 執(zhí)行數(shù)據(jù)庫(kù)操作 // ... // 提交事務(wù) transactionManager.commit(status); } catch (Exception ex) { // 回滾事務(wù) transactionManager.rollback(status); }
在使用編程式事務(wù)處理的過(guò)程中,利用 DefaultTransactionDefinition 對(duì)象來(lái)持有事務(wù)處理屬性。同時(shí),在創(chuàng)建事務(wù)的過(guò)程中得到一個(gè) TransactionStatus 對(duì)象,然后通過(guò)直接調(diào)用 transactionManager 對(duì)象 的 commit() 和 rollback()方法 來(lái)完成事務(wù)處理。
2.2 PlatformTransactionManager核心接口
?
PlatformTransactionManager是Spring事務(wù)管理的核心接口,通過(guò) PlatformTransactionManager 接口設(shè)計(jì)了一系列與事務(wù)處理息息相關(guān)的接口方法,如 getTransaction()、commit()、rollback() 這些和事務(wù)處理相關(guān)的統(tǒng)一接口。對(duì)于這些接口的實(shí)現(xiàn),很大一部分是由 AbstractTransactionManager 抽象類來(lái)完成的。
AbstractPlatformManager 封裝了 Spring 事務(wù)處理中通用的處理部分,比如事務(wù)的創(chuàng)建、提交、回滾,事務(wù)狀態(tài)和信息的處理,與線程的綁定等,有了這些通用處理的支持,對(duì)于具體的事務(wù)管理器而言,它們只需要處理和具體數(shù)據(jù)源相關(guān)的組件設(shè)置就可以了,比如在DataSourceTransactionManager中,就只需要配置好和DataSource事務(wù)處理相關(guān)的接口以及相關(guān)的設(shè)置。
2.3 事務(wù)的創(chuàng)建
PlatformTransactionManager的getTransaction()方法,封裝了底層事務(wù)的創(chuàng)建,并生成一個(gè) TransactionStatus對(duì)象。AbstractPlatformTransactionManager提供了創(chuàng)建事務(wù)的模板,這個(gè)模板會(huì)被具體的事務(wù)處理器所使用。從下面的代碼中可以看到,AbstractPlatformTransactionManager會(huì)根據(jù)事務(wù)屬性配置和當(dāng)前進(jìn)程綁定的事務(wù)信息,對(duì)事務(wù)是否需要?jiǎng)?chuàng)建,怎樣創(chuàng)建 進(jìn)行一些通用的處理,然后把事務(wù)創(chuàng)建的底層工作交給具體的事務(wù)處理器完成,如:DataSourceTransactionManager、HibernateTransactionManager。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (isExistingTransaction(transaction)) { return handleExistingTransaction(def, transaction, debugEnabled); } if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { return startTransaction(def, transaction, false, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } } private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = this.getTransactionSynchronization() != SYNCHRONIZATION_NEVER; DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); this.doBegin(transaction, definition); this.prepareSynchronization(status, definition); return status; }
事務(wù)創(chuàng)建的結(jié)果是生成一個(gè)TransactionStatus對(duì)象,通過(guò)這個(gè)對(duì)象來(lái)保存事務(wù)處理需要的基本信息,TransactionStatus的創(chuàng)建過(guò)程如下:
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
以上是創(chuàng)建一個(gè)全新事務(wù)的過(guò)程,還有另一種情況是:在創(chuàng)建當(dāng)前事務(wù)時(shí),線程中已經(jīng)有事務(wù)存在了。這種情況會(huì)涉及事務(wù)傳播行為的處理。spring中七種事務(wù)傳播行為如下:
事務(wù)傳播行為類型 | 說(shuō)明 |
PROPAGATION_REQUIRED | 如果當(dāng)前沒(méi)有事務(wù),就新建一個(gè)事務(wù),如果已經(jīng)存在一個(gè)事務(wù)中,加入到這個(gè)事務(wù)中。這是最常見(jiàn)的選擇。 |
PROPAGATION_SUPPORTS | 支持當(dāng)前事務(wù),如果當(dāng)前沒(méi)有事務(wù),就以非事務(wù)方式執(zhí)行。 |
PROPAGATION_MANDATORY | 使用當(dāng)前的事務(wù),如果當(dāng)前沒(méi)有事務(wù),就拋出異常。 |
PROPAGATION_REQUIRES_NEW | 新建事務(wù),如果當(dāng)前存在事務(wù),把當(dāng)前事務(wù)掛起。 |
PROPAGATION_NOT_SUPPORTED | 以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù),就把當(dāng)前事務(wù)掛起。 |
PROPAGATION_NEVER | 以非事務(wù)方式執(zhí)行,如果當(dāng)前存在事務(wù),則拋出異常。 |
PROPAGATION_NESTED | 如果當(dāng)前存在事務(wù),則在嵌套事務(wù)內(nèi)執(zhí)行。如果當(dāng)前沒(méi)有事務(wù),則執(zhí)行與PROPAGATION_REQUIRED類似的操作。 |
如果檢測(cè)到已存在事務(wù),handleExistingTransaction()方法將根據(jù)不同的事務(wù)傳播行為類型執(zhí)行相應(yīng)邏輯。
PROPAGATION_NEVER
即當(dāng)前方法需要在非事務(wù)的環(huán)境下執(zhí)行,如果有事務(wù)存在,那么拋出異常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); }
PROPAGATION_NOT_SUPPORTED
與前者的區(qū)別在于,如果有事務(wù)存在,那么將事務(wù)掛起,而不是拋出異常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); }
PROPAGATION_REQUIRES_NEW
新建事務(wù),如果當(dāng)前存在事務(wù),把當(dāng)前事務(wù)掛起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { SuspendedResourcesHolder suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; }
PROPAGATION_NESTED
開(kāi)始一個(gè) "嵌套的" 事務(wù), 它是已經(jīng)存在事務(wù)的一個(gè)真正的子事務(wù). 嵌套事務(wù)開(kāi)始執(zhí)行時(shí), 它將取得一個(gè) savepoint. 如果這個(gè)嵌套事務(wù)失敗, 我們將回滾到此 savepoint. 嵌套事務(wù)是外部事務(wù)的一部分, 只有外部事務(wù)結(jié)束后它才會(huì)被提交。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = newTransactionStatus( definition, transaction, false, false, true, debugEnabled, null); this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status)); try { status.createAndHoldSavepoint(); } catch (RuntimeException | Error ex) { this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex)); throw ex; } this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null)); return status; } else { return startTransaction(definition, transaction, true, debugEnabled, null); } }
2.4 事務(wù)掛起
事務(wù)掛起在AbstractTransactionManager.suspend()中處理,該方法內(nèi)部將調(diào)用具體事務(wù)管理器的doSuspend()方法。以DataSourceTransactionManager為例,將ConnectionHolder設(shè)為null,因?yàn)橐粋€(gè)ConnectionHolder對(duì)象就代表了一個(gè)數(shù)據(jù)庫(kù)連接,將ConnectionHolder設(shè)為null就意味著我們下次要使用連接時(shí),將重新從連接池獲取。
protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
unbindResource()方法最終會(huì)調(diào)用TransactionSynchronizationManager.doUnbindResource()方法,該方法將移除當(dāng)前線程與事務(wù)對(duì)象的綁定。
private static Object doUnbindResource(Object actualKey) { Map map = resources.get(); if (map == null) { return null; } Object value = map.remove(actualKey); if (map.isEmpty()) { resources.remove(); } if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) { value = null; } return value; }
而被掛起的事務(wù)的各種狀態(tài)最終會(huì)保存在TransactionStatus對(duì)象中。
2.5 事務(wù)提交&回滾
主要是對(duì)jdbc的封裝、源碼邏輯較清晰,不展開(kāi)細(xì)說(shuō)。
?
3、聲明式事務(wù)
其底層建立在 AOP 的基礎(chǔ)之上,對(duì)方法前后進(jìn)行攔截,然后在目標(biāo)方法開(kāi)始之前創(chuàng)建或者加入一個(gè)事務(wù),在執(zhí)行完目標(biāo)方法之后根據(jù)執(zhí)行情況提交或者回滾事務(wù)。通過(guò)聲明式事物,無(wú)需在業(yè)務(wù)邏輯代碼中摻雜事務(wù)管理的代碼,只需在配置文件中做相關(guān)的事務(wù)規(guī)則聲明(或通過(guò)等價(jià)的基于標(biāo)注的方式),便可以將事務(wù)規(guī)則應(yīng)用到業(yè)務(wù)邏輯中。
3.1 使用示例
配置:
/bean?> /bean?>
代碼:
@Transactional public void addOrder() { // 執(zhí)行數(shù)據(jù)庫(kù)操作 }
3.2 自定義標(biāo)簽解析
先從配置文件開(kāi)始入手,找到處理annotation-driven標(biāo)簽的類TxNamespaceHandler。TxNamespaceHandler實(shí)現(xiàn)了NamespaceHandler接口,定義了如何解析和處理自定義XML標(biāo)簽。
@Override public void init() { registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); }
AnnotationDrivenBeanDefinitionParser里的parse()方法,對(duì)XML標(biāo)簽annotation-driven進(jìn)行解析。
@Override public BeanDefinition parse(Element element, ParserContext parserContext) { registerTransactionalEventListenerFactory(parserContext); String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) { // mode="aspectj" registerTransactionAspect(element, parserContext); if (ClassUtils.isPresent("jakarta.transaction.Transactional", getClass().getClassLoader())) { registerJtaTransactionAspect(element, parserContext); } } else { // mode="proxy" AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null; }
以默認(rèn)mode配置為例,執(zhí)行configureAutoProxyCreator()方法,將在Spring容器中注冊(cè)了3個(gè)bean:
BeanFactoryTransactionAttributeSourceAdvisor、TransactionInterceptor、AnnotationTransactionAttributeSource。同時(shí)會(huì)將TransactionInterceptor的BeanName傳入到Advisor中,然后將AnnotationTransactionAttributeSource這個(gè)Bean注入到Advisor中。之后動(dòng)態(tài)代理的時(shí)候會(huì)使用這個(gè)Advisor去尋找每個(gè)Bean是否需要?jiǎng)討B(tài)代理。
// Create the TransactionAttributeSourceAdvisor definition. RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class); advisorDef.setSource(eleSource); advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName)); advisorDef.getPropertyValues().add("adviceBeanName", interceptorName); if (element.hasAttribute("order")) { advisorDef.getPropertyValues().add("order", element.getAttribute("order")); } parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef); CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource); compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName)); compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName)); compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName)); parserContext.registerComponent(compositeDef);
3.3 Advisor
回顧AOP用法,Advisor可用于定義一個(gè)切面,它包含切點(diǎn)(Pointcut)和通知(Advice),用于在特定的連接點(diǎn)上執(zhí)行特定的操作。spring事務(wù)實(shí)現(xiàn)了一個(gè)Advisor: BeanFactoryTransactionAttributeSourceAdvisor。
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor { private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut(); public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) { this.pointcut.setTransactionAttributeSource(transactionAttributeSource); } public void setClassFilter(ClassFilter classFilter) { this.pointcut.setClassFilter(classFilter); } @Override public Pointcut getPointcut() { return this.pointcut; } }
BeanFactoryTransactionAttributeSourceAdvisor其實(shí)是一個(gè)PointcutAdvisor,是否匹配到切入點(diǎn)取決于Pointcut。Pointcut的核心在于其ClassFilter和MethodMatcher。
ClassFilter:
TransactionAttributeSourcePointcut內(nèi)部私有類 TransactionAttributeSourceClassFilter,實(shí)現(xiàn)了Spring框架中的ClassFilter接口。在matches方法中,它首先檢查傳入的類clazz 否為TransactionalProxy、TransactionManager或PersistenceExceptionTranslator的子類,如果不是,則獲取當(dāng)前的 TransactionAttributeSource 并檢查其是否允許該類作為候選類。
private class TransactionAttributeSourceClassFilter implements ClassFilter { @Override public boolean matches(Class??> clazz) { if (TransactionalProxy.class.isAssignableFrom(clazz) || TransactionManager.class.isAssignableFrom(clazz) || PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) { return false; } return (transactionAttributeSource == null || transactionAttributeSource.isCandidateClass(clazz)); } }
MethodMatcher:
TransactionAttributeSourcePointcut.matches:
@Override public boolean matches(Method method, Class??> targetClass) { return (this.transactionAttributeSource == null || this.transactionAttributeSource.getTransactionAttribute(method, targetClass) != null); }
getTransactionAttribute()方法最終會(huì)調(diào)用至AbstractFallbackTransactionAttributeSource.computeTransactionAttribute()方法,該方法將先去方法上查找是否有相應(yīng)的事務(wù)注解(比如@Transactional),如果沒(méi)有,那么再去類上查找。
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class??> targetClass) { // Don't allow non-public methods, as configured. if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } // The method may be on an interface, but we need attributes from the target class. // If the target class is null, the method will be unchanged. Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // First try is the method in the target class. TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } // Second try is the transaction attribute on the target class. txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } if (specificMethod != method) { // Fallback is to look at the original method. txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // Last fallback is the class of the original method. txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null; }
3.4 TransactionInterceptor
TransactionInterceptor是spring事務(wù)提供的AOP攔截器,實(shí)現(xiàn)了AOP Alliance的MethodInterceptor接口,是一種通知(advice)。其可以用于在方法調(diào)用前后進(jìn)行事務(wù)管理。
@Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class??> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { @Override @Nullable public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } @Override public Object getTarget() { return invocation.getThis(); } @Override public Object[] getArguments() { return invocation.getArguments(); } }); }
invokeWithinTransaction()方法會(huì)根據(jù)目標(biāo)方法上的事務(wù)配置,來(lái)決定是開(kāi)啟新事務(wù)、加入已有事務(wù),還是直接執(zhí)行邏輯(如果沒(méi)有事務(wù))。其代碼簡(jiǎn)化如下(僅保留PlatformTransactionManager部分):
protected Object invokeWithinTransaction(Method method, @Nullable Class??> targetClass, final InvocationCallback invocation) { // If the transaction attribute is null, the method is non-transactional. final TransactionAttribute txAttr = getTransactionAttributeSource() .getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); ObjectretVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwableex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throwex; } finally { cleanupTransactionInfo(txInfo); } commitTransactionAfterReturning(txInfo); returnretVal; } } 審核編輯 黃宇
-
XML
+關(guān)注
關(guān)注
0文章
188瀏覽量
33084 -
代碼
+關(guān)注
關(guān)注
30文章
4788瀏覽量
68612 -
spring
+關(guān)注
關(guān)注
0文章
340瀏覽量
14343
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論