Spring声明式事务源码剖析


事务包含范围较广不仅仅包括数据库事务也包括事务消息。事务是使有限操作满足ACID属性,A:原子性,C:一致性,I:隔离性,D:持久性,严格遵循ACID规则的叫做刚性事务,事务执行的中间状态可以暂时不支持ACID的叫柔性事务。

关于MySQL的MVCC事务隔离级别可以走向我的另一篇博文MySQL事务系统解析

事务基本概念

事务基本属性

  • A(原子性):事务要么全部成功,要么全部失败
  • C(一致性):不管什么时候,数据都是一致的,不会读到中间状态的数据
  • I(隔离性):多个事务之间相互不影响
  • D(持久性):在事务执行过程中,不管怎么样数据都不能丢,会持久化

事务传播行为

行为 不存在嵌套事务 存在嵌套事务
REQUIRED 开启新的事务 融合到外部事务
REQUIRES_NEW 开启新的事务 挂起外部事务
NESTED 开启新的事务 融合到外部事务中,SavePoint机制,外层影响内层, 内层不会影响外层
SUPPORTS 不开启新的事务 融合到外部事务
NOT_SUPPORTED 不开启新的事务 挂起外部事务
NEVER 不开启新的事务 抛出异常
MANDATORY 抛出异常 融合到外部事务

默认方式是REQUIRED,使用方式,要开启事务方法上面注解@Transactional(propagation = Propagation.REQUIRED)

Demo

配置类

@EnableTransactionManagement
@EnableAspectJAutoProxy(exposeProxy = true)
@ComponentScan(basePackages = {"com.tuling"})
public class MainConfig {

    /**
     *  配置数据源
     */
    @Bean
    public DataSource dataSource() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        dataSource.setUrl("jdbc:mysql://xx.xx.xx.xx:3306/test");
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        return dataSource;
    }

    /**
     *  配置JdbcTemplate
     */
    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    /**
     *  配置事务管理器
     */
    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}

配置类写好只需在需要开启事务的方法/接口/类上加上注解@Transactional(propagation = Propagation.REQUIRED)即可

这里有个点要注意,如果A方法,B方法在同一个类中,且2个方法都开启了事务,这时候A方法如果调用B方法,B的事务是不会生效的,如果生效需要把B方法重新声明一个类,或者开启AOP配置exposeProxy = true暴露代理到线程然后通过AopContext.currentProxy().methodB();方式调用。

因为spring事务也是用了AOP的方式,动态代理分为Jdk动态代理和Cglib动态代理

jdk动态代理本身就有这么个特性同一个类的方法互相调用不会再生成代理

Cglib动态代理中是通过责任链调用增强代码中显式调用methodA->methodB,这时候的调用是责任链调用而不是代理类调用

spring声明式事务与AOP的关系

如果要实现事务的功能一般想到的都是在要执行方法前开启事务,捕获执行方法异常里面回滚事务,最后执行提交事务。spring如果要实现这个很明显使用AOP就可以了。

我们进到@EnableTransactionManagement注解中可以跟到注册Bean的方法(下面会具体跟代码),发现它注册了InfrastructureAdvisorAutoProxyCreator方法。

我们在看下AOP注册的Bean方法

发现他们之间公用一个基类AbstractAdvisorAutoProxyCreator,而大部分后置处理器都是在这个基类上实现的

所以基本可以判定事务是在AOP基础上实现的。

spring事务源码分析

了解spring事务源码之前可以先看下我之前写的一篇springAOP源码解析 TODO

从@EnableTransactionManagement跟入发现标注了@Import从import的方法跟入,发现实现了ImportSelector接口,这时跟入到selectImports方法

@Override
protected String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
        // 为我们的容器中导入了二个组件 一个是AutoProxyRegistrar
        //一个是ProxyTransactionManagementConfiguration
        case PROXY:
            return new String[] {AutoProxyRegistrar.class.getName(),
                                 ProxyTransactionManagementConfiguration.class.getName()};
        case ASPECTJ:
            return new String[] {
                TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME};
        default:
            return null;
    }
}

selectImports返回的是一个String[]表示我们要注册的Bean。所以这段代码表示我们的容器需要注册AutoProxyRegistrar,ProxyTransactionManagementConfiguration这2个组件

注意:这里有个switch…case,ASPECTJ分支不要care,这里基本不会走,这里表示的我们不使用动态代理而是使用AspectJ,AspectJ需要新的编译器,基本不会有人把mode设为ASPECTJ

AutoProxyRegistrar类

发现里面的registerBeanDefinitions方法,似乎找到了注册BeanDefinition的方法

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
   boolean candidateFound = false;
   Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
   for (String annType : annTypes) {
      AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
      if (candidate == null) {
         continue;
      }
      Object mode = candidate.get("mode");
      Object proxyTargetClass = candidate.get("proxyTargetClass");
      if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
            Boolean.class == proxyTargetClass.getClass()) {
         candidateFound = true;
         if (mode == AdviceMode.PROXY) {
            // 注册BeanDefinition
            AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
            if ((Boolean) proxyTargetClass) {
               AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
               return;
            }
         }
      }
   }
}

这个方法其实主要就是调用了registerAutoProxyCreatorIfNecessary,从这继续往里面跟发现registerOrEscalateApcAsRequired方法

这个方法有点熟悉,因为AOP中也是调用的这个方法进行的注册,跟进去看看。

private static BeanDefinition registerOrEscalateApcAsRequired(
            Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) {

    Assert.notNull(registry, "BeanDefinitionRegistry must not be null");

    // 这里表明了同时注解事务和AOP,会怎么操作
    if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
        // 拿到之前注册的BeanDefinition
        BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
        // 之前的BeanDefinition和现在是否一致,不一致说明同时注册了事务和AOP
        if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
            // 这里的索引
            // 0-InfrastructureAdvisorAutoProxyCreator--事务的
            // 1-AspectJAwareAdvisorAutoProxyCreator
            // 2-AnnotationAwareAspectJAutoProxyCreator--AOP的
            // 拿到之前的BeanDefinition的索引
            int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
            // 拿到现在的BeanDefinition的索引
            int requiredPriority = findPriorityForClass(cls);
            // 谁索引大谁就是bean定义,这里AOP的bean会覆盖事务的bean
            if (currentPriority < requiredPriority) {
                apcDefinition.setBeanClassName(cls.getName());
            }
        }
        return null;
    }

    RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
    beanDefinition.setSource(source);
    beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);
    beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    // 注册Bean定义name是org.springframework.aop.config.internalAutoProxyCreator
    registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
    return beanDefinition;
}

这里的代码解释了AOP注解事务注解同时注册到底听谁的,是同时注册还是某一个成功

同时注册不可能,2个bean的beanName是一样的,所以不可能同时注册,必定hi覆盖,谁覆盖谁呢

AOP优先级大于事务,2者都注册就注册AOP

ProxyTransactionManagementConfiguration类

这个方法注册了几个Bean,定义了Advisor

public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

    /**
     * 定义了关于事务的切面信息
     * org.springframework.transaction.config.internalTransactionAdvisor
     */
    @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
        BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
        // 定义了获取事务的解析器,可以理解成这里是为了找到标注了@Transactional的方法
         advisor.setTransactionAttributeSource(transactionAttributeSource());
         // 定义了切面
        advisor.setAdvice(transactionInterceptor());
        if (this.enableTx != null) {
            advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
        }
        return advisor;
    }

    /**
     * 事务属性源对象:用于获取事务属性对象,可以理解成这里是为了找到标注了@Transactional的方法
     */
    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionAttributeSource transactionAttributeSource() {
        return new AnnotationTransactionAttributeSource();
    }

    /**
     * 用户拦截事务方法执行的
     */
    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionInterceptor transactionInterceptor() {
        TransactionInterceptor interceptor = new TransactionInterceptor();
        interceptor.setTransactionAttributeSource(transactionAttributeSource());
        if (this.txManager != null) {
            interceptor.setTransactionManager(this.txManager);
        }
        return interceptor;
    }

}

走到这里我们大致就清楚应该怎么看事务的源码了,首先我们应该还是要和看AOP的方式去看事务源码,首先会解析切面(实例化之前的后置处理器执行)->创建代理(初始化之后的后置处理器执行,循环依赖创建早期对象)->调用(代理调用)

这里我们看InfrastructureAdvisorAutoProxyCreator还是AnnotationAwareAspectJAutoProxyCreator都可以

和AOP一样我们看AnnotationAwareAspectJAutoProxyCreator方法,AOP看过的都不在看了

切面解析

我们直接进到postProcessBeforeInstantiation->shouldSkip(子类)->findCandidateAdvisors(子类)->findCandidateAdvisors(父类)->findAdvisorBeans

findAdvisorBeans方法

public List<Advisor> findAdvisorBeans() {

   // 缓存cachedAdvisorBeanNames 是用来保存我们的Advisor全类名会在第一个单实例bean的中会去把这个advisor名称解析出来
   String[] advisorNames = this.cachedAdvisorBeanNames;
   if (advisorNames == null) {
      // 去我们的容器中获取到实现了Advisor接口的实现类,这里其实之前我们注册了一个实现了Advisor接口的实现类,就是在 ProxyTransactionManagementConfiguration类中BeanFactoryTransactionAttributeSourceAdvisor这个Bean,bean名字是org.springframework.transaction.config.internalTransactionAdvisor
      advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
            this.beanFactory, Advisor.class, true, false);
      this.cachedAdvisorBeanNames = advisorNames;
   }
   //若在容器中没有找到,直接返回一个空的集合
   if (advisorNames.length == 0) {
      return new ArrayList<>();
   }

   List<Advisor> advisors = new ArrayList<>();
   //ioc容器中找到了我们配置的BeanFactoryTransactionAttributeSourceAdvisor
   for (String name : advisorNames) {
      //判断他是不是一个合适的 是我们想要的
      if (isEligibleBean(name)) {
         //BeanFactoryTransactionAttributeSourceAdvisor是不是正在创建的bean
         if (this.beanFactory.isCurrentlyInCreation(name)) {
            if (logger.isDebugEnabled()) {
               logger.debug("Skipping currently created advisor '" + name + "'");
            }
         }
         //不是的话
         else {
            try {
               //显示的调用getBean方法方法创建我们的BeanFactoryTransactionAttributeSourceAdvisor返回
               advisors.add(this.beanFactory.getBean(name, Advisor.class));
            }
            catch (BeanCreationException ex) {
               Throwable rootCause = ex.getMostSpecificCause();
               if (rootCause instanceof BeanCurrentlyInCreationException) {
                  BeanCreationException bce = (BeanCreationException) rootCause;
                  String bceBeanName = bce.getBeanName();
                  if (bceBeanName != null && this.beanFactory.isCurrentlyInCreation(bceBeanName)) {
                     if (logger.isDebugEnabled()) {
                        logger.debug("Skipping advisor '" + name +
                              "' with dependency on currently created bean: " + ex.getMessage());
                     }
                     // Ignore: indicates a reference back to the bean we're trying to advise.
                     // We want to find advisors other than the currently created bean itself.
                     continue;
                  }
               }
               throw ex;
            }
         }
      }
   }
   return advisors;
}

创建代理

大部分跟AOP差不多主要差别就在匹配那部分代码,我们直接来到匹配的代码

我们直接进到postProcessAfterInitialization->wrapIfNecessary->getAdvicesAndAdvisorsForBean->findEligibleAdvisors->findAdvisorsThatCanApply-> AopUtils.findAdvisorsThatCanApply->canApply

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
   if (advisor instanceof IntroductionAdvisor) {
      return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
   }
   //判断我们事务的增强器BeanFactoryTransactionAttributeSourceAdvisor是否实现了PointcutAdvisor
   else if (advisor instanceof PointcutAdvisor) {
      //转为PointcutAdvisor类型
      PointcutAdvisor pca = (PointcutAdvisor) advisor;
      //找到真正能用的增强器
      return canApply(pca.getPointcut(), targetClass, hasIntroductions);
   }
   else {
      // It doesn't have a pointcut so we assume it applies.
      return true;
   }
}

canApply(pca.getPointcut(), targetClass, hasIntroductions)

// 这里的pointcut已经跟AOP不一样了,是BeanFactoryTransactionAttributeSourceAdvisor中的pointCut是TransactionAttributeSourcePointcut
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
   Assert.notNull(pc, "Pointcut must not be null");
   // 分析可知这里走的是StaticMethodMatcherPointcut,是TransactionAttributeSourcePointcut的父类
   // pc.getClassFilter() = ClassFilter.TRUE = TrueClassFilter.INSTANCE
   // 又TrueClassFilter.matches 始终为 true
   // 所以这个判断基本作废
   if (!pc.getClassFilter().matches(targetClass)) {
      return false;
   }
 
   //如果pc.getMethodMatcher()返回TrueMethodMatcher则匹配所有方法
   MethodMatcher methodMatcher = pc.getMethodMatcher();
   if (methodMatcher == MethodMatcher.TRUE) {
      // No need to iterate the methods if we're matching any method anyway...
      return true;
   }

   //事务不用管这里 只有AspectJExpressionPointCut才会实现这个接口
   IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
   if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
      introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
   }

   //创建一个集合用于保存targetClass 的class对象
   Set<Class<?>> classes = new LinkedHashSet<>();
   //判断当前class是不是代理的class对象
   if (!Proxy.isProxyClass(targetClass)) {
      //加入到集合中去
      classes.add(ClassUtils.getUserClass(targetClass));
   }
   //获取到targetClass所实现的接口的class对象,然后加入到集合中
   classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));

   //循环所有的class对象
   for (Class<?> clazz : classes) {
      //通过class获取到所有的方法
      Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
      //循环我们的方法
      for (Method method : methods) {
         //通过methodMatcher.matches来匹配我们的方法 introductionAwareMethodMatcher肯定是空走methodMatcher.matches(method, targetClass)方法,TransactionAttributeSourcePointcut.matches方法
         if (introductionAwareMethodMatcher != null ?
               // 通过切点表达式进行匹配 AspectJ方式
               introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
               // 通过方法匹配器进行匹配 内置aop接口方式
               methodMatcher.matches(method, targetClass)) {
            // 只要有1个方法匹配上了就创建代理
            return true;
         }
      }
   }

   return false;
}

TransactionAttributeSourcePointcut.matches

@Override
public boolean matches(Method method, @Nullable Class<?> targetClass) {
   if (targetClass != null && TransactionalProxy.class.isAssignableFrom(targetClass)) {
      return false;
   }
   // @EnableTransactionManagement注解导入的ProxyTransactionManagementConfiguration配置类中的TransactionAttributeSource对象这里是AnnotationTransactionAttributeSource
   TransactionAttributeSource tas = getTransactionAttributeSource();
   // 通过getTransactionAttribute看是否有@Transactional注解
   return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

AnnotationTransactionAttributeSource.getTransactionAttribute方法

@Override
    @Nullable
    public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
        //判断method所在的class 是不是Object类型
        if (method.getDeclaringClass() == Object.class) {
            return null;
        }

        //构建我们的缓存key
        Object cacheKey = getCacheKey(method, targetClass);
        //先去我们的缓存中获取
        TransactionAttribute cached = this.attributeCache.get(cacheKey);
        //缓存中不为空
        if (cached != null) {
            //判断缓存中的对象是不是空事务属性的对象
            if (cached == NULL_TRANSACTION_ATTRIBUTE) {
                return null;
            }
            //不是的话 就进行返回
            else {
                return cached;
            }
        }
        else {
            //我们需要查找我们的事务注解 --解析这块重要代码--
            TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
            // 若解析出来的事务注解属性为空
            if (txAttr == null) {
                //往缓存中存放空事务注解属性
                this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
            }
            else {
                //我们执行方法的描述符 全类名+方法名
                String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
                //把方法描述设置到事务属性上去
                if (txAttr instanceof DefaultTransactionAttribute) {
                    ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
                }
                //加入到缓存
                this.attributeCache.put(cacheKey, txAttr);
            }
            return txAttr;
        }
    }

computeTransactionAttribute方法

protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
   //判断我们的事务方法上的修饰符是不是public的
   if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
      return null;
   }

   // The method may be on an interface, but we need attributes from the target class.
   // If the target class is null, the method will be unchanged.
   Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

   //第一步,我们先去目标class的方法上去找我们的事务注解 这里面其实也找了接口方法,父类方法
   TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
   if (txAttr != null) {
      return txAttr;
   }

   //第二步:去我们实现类上找事务注解 这里面其实也找了接口类和父类
   txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
   if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
      return txAttr;
   }
   // 具体方法不是当前的方法说明 当前方法是接口方法 ,其实这里是走不到的接口相关的在上面就已经解析了,后面spring好像优化掉了,上面的逻辑不会再查接口方法接口类,我这是spring4暂时是这样的
   if (specificMethod != method) {
      //去我们的实现类的接口上的方法去找事务注解
      txAttr = findTransactionAttribute(method);
      if (txAttr != null) {
         return txAttr;
      }
      //去我们的实现类的接口上去找事务注解
      txAttr = findTransactionAttribute(method.getDeclaringClass());
      if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
         return txAttr;
      }
   }

   return null;
}

从上面逻辑看大致是实现类方法->实现类->接口方法->接口类,其实不是这样的findTransactionAttribute这个方法里面会把接口类,接口方法也给查了,所以大致匹配逻辑是实现类方法->接口方法->实现类->接口类

代理调用

这里我们依然是看Jdk动态代理

来到JdkDynamicAopProxy.invoke->invocation.proceed->ReflectiveMethodInvocation.proceed->TransactionInterceptor.invoke(TransactionInterceptor是事务注解注入的配置类中的Advisor中的增强)->invokeWithinTransaction

invokeWithinTransaction

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

    // 获取我们的事务属源对象 在配置类中添加的
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取解析后的事务属性信息
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 获取我们配置的事务管理器对象
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // 从txAttr属性对象中获取出标注了@Transactionl的方法描述符
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    //处理声明式事务
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        //有没有必要创建事务
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            //调用钩子函数进行回调目标方法
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            //抛出异常进行回滚处理
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            //清空我们的线程变量中transactionInfo的值
            cleanupTransactionInfo(txInfo);
        }
        //提交事务
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
    // 编程式事务:(回调偏向)
    else {
        final ThrowableHolder throwableHolder = new ThrowableHolder();

        // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
        try {
            Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                try {
                    return invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                    if (txAttr.rollbackOn(ex)) {
                        // A RuntimeException: will lead to a rollback.
                        if (ex instanceof RuntimeException) {
                            throw (RuntimeException) ex;
                        }
                        else {
                            throw new ThrowableHolderException(ex);
                        }
                    }
                    else {
                        // A normal return value: will lead to a commit.
                        throwableHolder.throwable = ex;
                        return null;
                    }
                }
                finally {
                    cleanupTransactionInfo(txInfo);
                }
            });

            // Check result state: It might indicate a Throwable to rethrow.
            if (throwableHolder.throwable != null) {
                throw throwableHolder.throwable;
            }
            return result;
        }
        catch (ThrowableHolderException ex) {
            throw ex.getCause();
        }
        catch (TransactionSystemException ex2) {
            if (throwableHolder.throwable != null) {
                logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                ex2.initApplicationException(throwableHolder.throwable);
            }
            throw ex2;
        }
        catch (Throwable ex2) {
            if (throwableHolder.throwable != null) {
                logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
            }
            throw ex2;
        }
    }
}

createTransactionIfNecessary方法(创建事务就在这)

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

   // 如果还没有定义名字,把连接点的ID定义成事务的名称
   if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
         @Override
         public String getName() {
            return joinpointIdentification;
         }
      };
   }

   TransactionStatus status = null;
   if (txAttr != null) {
      if (tm != null) {
         //获取一个事务状态,这里的tm就是事务管理器是在配置类中定义的DataSourceTransactionManager
         status = tm.getTransaction(txAttr);
      }
      else {
         if (logger.isDebugEnabled()) {
            logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                  "] because no transaction manager has been configured");
         }
      }
   }
   //把事务状态和事务属性等信息封装成一个TransactionInfo对象
   return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

DataSourceTransactionManager.getTransaction(事务传播行为就在这处理的)

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    //尝试获取一个事务对象
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    //判断从上一个方法传递进来的事务属性是不是为空
    if (definition == null) {

        definition = new DefaultTransactionDefinition();
    }

    //判断是不是已经存在了事务对象,这里判断的是存不存在connectionHolder这个属性,存在这个属性表示事务开启,挂起suspend还是开启事务doBegin都会对这个属性进行操作(事务嵌套)
    if (isExistingTransaction(transaction)) {
        //处理存在的事务
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    //检查事务设置的超时时间
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    //若当前的事务属性 PROPAGATION_MANDATORY 表示必须运行在事务中,若当前没有事务就抛出异常
    //由于isExistingTransaction(transaction)说明当前是不存在事务的,那么就会抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    /**
     * PROPAGATION_REQUIRED 开启事务,存在事务嵌套融合事务
     * PROPAGATION_REQUIRES_NEW 开启事务,存在事务嵌套挂起事务
     * PROPAGATION_NESTED: PROPAGATION_NESTED 开启事务
     * 表示如果当前正有一个事务在运行中,则该方法应该运行在 一个嵌套的事务中,
     * 被嵌套的事务可以独立于封装事务进行提交或者回滚(保存点)
     * 其实走到这里,表明是没有事务嵌套情况,所以判断里面只要开启事务即可
     */
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        /**
         * 走到这里了,经过了上面的isExistingTransaction(transaction) 判断当前是不存在事务的
         * 所有再这里是挂起当前事务传递一个null进去
         */
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            // 意思是可以进行同步
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 构造事务状态对象,newTransaction=true代表是一个新事务
            DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 开启一个新的事务
            doBegin(transaction, definition);
            // 把当前的事务信息绑定到线程变量去
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else { 
        //创建一个空的事务
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}

handleExistingTransaction方法(处理嵌套事务)

private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {

        // NEVER 存在外部事务 抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }

        // NOT_SUPPORTED 存在外部事务 挂起外部事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            // 挂起存在的事务
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            // 创建一个新的非事务状态(保存了上一个存在事务状态的属性)
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }

        // REQUIRES_NEW 存在外部事务 挂起外部事务,创建新的事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            // 挂起已经存在的事务
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                // 是否需要新开启同步
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                // 创建一个新的事务状态(包含了挂起的事务的属性)
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                // 开启新的事务
                doBegin(transaction, definition);
                // 把新的事务状态设置到当前的线程变量中去
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }

        // NESTED 存在外部事务 融合到外部事务中
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            // 是否支持保存点:非JTA事务走这个分支。
             // AbstractPlatformTransactionManager默认是true,JtaTransactionManager复写了该方法false,DataSourceTransactionManager没有复写,还是true,
            if (useSavepointForNestedTransaction()) {
                // 开启一个新的事务
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                // 为事务设置一个回退点
                //savepoint可以在一组事务中,设置一个回滚点,点以上的不受影响,点以下的回滚.(外层影响内层,内层不会影响外层)
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                 // JTA事务走这个分支,创建新事务
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        // 省略代码
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

不管是在处理顶层事务还是嵌套事务,只要是开启事务就会调用doBegin方法,挂起事务就会执行suspend方法

doBegin方法

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    //强制转化事务对象
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        //判断事务对象没有数据库连接持有器
        if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            //通过数据源获取一个数据库连接对象
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            //把我们的数据库连接包装成一个ConnectionHolder对象 然后设置到我们的txObject对象中去
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        //标记当前的连接是一个同步事务
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        // 设置isReadOnly、隔离级别(null表示默认的事务隔离级别是数据库的事务隔离级别)
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);

        //setAutoCommit 默认为true,即每条SQL语句在各自的一个事务中执行。
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false); // 开启事务
        }

        //事务为只读事务即设置为只读事务否则跳过
        prepareTransactionalConnection(con, definition);
        //设置事务激活
        txObject.getConnectionHolder().setTransactionActive(true);

        //设置事务超时时间
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // 绑定我们的数据源和连接到我们的同步管理器上,把数据源作为key,数据库连接作为value,设置到线程变量中
        if (txObject.isNewConnectionHolder()) {
            // 在doGetTransaction中会获取
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    }

    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            //释放数据库连接
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

suspend方法

这里执行的是挂起事务,主要操作是将原有事务对象放到暂存资源持有器SuspendedResourcesHolder,原有事务对象值全部置为默认值,重新开启事务会放入新的事务对象,在当前事务commit或rollback的时候在吧这个事务对象重新放回事务同步管理器TransactionSynchronizationManager

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    // 嵌套事务 已经激活,在doBegin后激活
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                //connectionHolder置为空,解绑我们的数据源和同步管理器
                suspendedResources = doSuspend(transaction);
            }
            //获取已存在的事务的名称
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            //清空线程变量的
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            //获取出只读事务的名称
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            //清空线程变量的
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            //获取已存在事务的隔离级别
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            //清空隔离级别
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            //获取激活标志
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            //清空标记Actual-外层事务
            TransactionSynchronizationManager.setActualTransactionActive(false);
            //把上诉从线程变量中获取出来的存在事务属性封装为挂起的事务属性返回出去
            return new SuspendedResourcesHolder(
                suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    }
    else if (transaction != null) {
        // Transaction active but no synchronization active.
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 上一篇
SpringBoot启动原理 SpringBoot启动原理
前言SpringBoot 的启动区别于传统的Spring需要搭建tomcat等相关的容器,SpringBoot默认是通过内置Tomcat启动,只需简单的运行java -jar xxx即可简单的启动一个SpringBoot工程。这里我们深入S
2022-04-17
下一篇 
Spring AOP原理解析 Spring AOP原理解析
Spring AOP 具体应用就是面向切面编程。AOP就是在我们原先的目标类上面进行一定程度的封装,在方法执行前、方法执行后、方法抛出异常,方法返回后调用某个方法。Spring AOP是基于动态代理实现的,默认使用JDK动态代理(可以强制使
2022-03-10
  目录