阿里开源分布式事务组件 seata :demo 环境搭建以及运行流程简析

案例设计

seata 官方给出了一系列 demo 样例,不过我在用的过程中发现总有这个那个的问题,所以自己维护了一份基于 dubbo 的 demo 在 github 上,适配的 seata 版本是 0.8.0。
案例的设计直接参考官方 quick start给出的案例:


整个案例分为三个服务,分别是存储服务、订单服务和账户服务,这些服务通过 dubbo 进行发布和调用,内部调用逻辑如上面图所示。  

整个 demo 的工程样例如下所示:

undo_log 表

这个案例除了在数据库需要建立业务表以外,还要额外建立一张 undo_log 表,这个表的主要作用是记录事务的前置镜像和后置镜像。

全局事务进行到提交阶段,则删除该表对应的记录,全局事务如果需要回滚,则会利用这个表里记录的镜像数据,恢复数据。

undo_log 表里的数据实际上是“朝生夕死”的,数据不需要在表里存活太久。表结构如下所示:

CREATE TABLE `undo_log` (

                          `id` bigint(20) NOT NULL AUTO_INCREMENT,

                          `branch_id` bigint(20) NOT NULL,

                          `xid` varchar(100) NOT NULL,

                          `context` varchar(128) NOT NULL,

                          `rollback_info` longblob NOT NULL,

                          `log_status` int(11) NOT NULL,

                          `log_created` datetime NOT NULL,

                          `log_modified` datetime NOT NULL,

                          `ext` varchar(100) DEFAULT NULL,

                          PRIMARY KEY (`id`),

                          UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

服务逻辑

每个服务都对应了一个 starter 类,这个类主要用来在 spring 环境下,将该服务启动,并通过 dubbo 发布出去,以账户服务为例:

/**

 * The type Dubbo account service starter.

 */

public class DubboAccountServiceStarter {

    /**

     * 2. Account service is ready . A buyer register an account: U100001 on my e-commerce platform

     *

     * @param args the input arguments

     */

    public static void main(String[] args) {

        ClassPathXmlApplicationContext accountContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-account-service.xml"});

        accountContext.getBean("service");

        JdbcTemplate accountJdbcTemplate = (JdbcTemplate) accountContext.getBean("jdbcTemplate");

        accountJdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");

        accountJdbcTemplate.update("insert into account_tbl(user_id, money) values ('U100001', 999)");


new ApplicationKeeper(accountContext).keep(); } }


首先通过  

ClassPathXmlApplicationContext

 
读取 dubbo-account-service.xml 这个 spring 配置文件并启动 spring 容器环境,并通过 spring 的 jdbc template 对账户表的数据进行初始化。  

dubbo-account-service.xml 配置文件中进行了各类 bean 的配置,包括 dubbo 与 spring 结合时的标准配置:

    <bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">

        <constructor-arg ref="accountDataSource" />

    </bean>


<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="accountDataSourceProxy" /> </bean>
<dubbo:application name="dubbo-demo-account-service" /> <dubbo:registry address="zookeeper://localhost:2181" /> <dubbo:protocol name="dubbo" port="20881" /> <dubbo:service interface="io.seata.samples.dubbo.service.AccountService" ref="service" timeout="10000"/>
<bean id="service" class="io.seata.samples.dubbo.service.impl.AccountServiceImpl"> <property name="jdbcTemplate" ref="jdbcTemplate"/> </bean>
<bean class="io.seata.spring.annotation.GlobalTransactionScanner"> <constructor-arg value="dubbo-demo-account-service"/> <constructor-arg value="my_test_tx_group"/> </bean>

这份配置里主要有两个需要引起注意的关键点

  1. jdbcTemplate 这个 bean 所依赖的数据源 bean,是一个类名为 io.seata.rm.datasource.DataSourceProxy 的数据源类,通过它的名字可以很明显地看出这是一个代理模式的应用,因为 seata 为完成全局事务的逻辑,需要在普通的 sql 操作前后添加一些逻辑,比如说 sql 执行前对 sql 进行语法解析,生成前置镜像,sql 执行后生成后置镜像,通过代理的方式,可以方便地对 connection,statement 等进行代理包装,在调用的时候进行拦截,加入自己的逻辑。
  2. 配置文件中还有一个 io.seata.spring.annotation.GlobalTransactionScanner 类型的 bean,这个 bean 是支撑 seata 能在 spring 环境中通过注解的方式来划定事务边界的基础。在 spring 容器启动时,会扫描
    @GlobalTransactional
     
    注解是否存在,这个注解标识了全局事务的开始和结束,也就是我们常说的“事务的边界”

业务逻辑

业务逻辑的具体详情在
BusinessServiceImpl
 
类中可以看到:

    @Override

    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")

    public void purchase(String userId, String commodityCode, int orderCount) {

        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());

        storageService.deduct(commodityCode, orderCount);

        orderService.create(userId, commodityCode, orderCount);

        // throw new RuntimeException("xxx");

    }

先调用存储服务,减少库存,然后调用订单服务,新建订单。这两个动作属于一个整体的事务,任何一个动作失败,都需要撤销所有的操作。

这个方法也有两个需要注意的点:

  1. 该方法上声明了 @GlobalTransactional(timeoutMills = 300000, name = “dubbo-demo-tx”) 这样的注解,用于让上文提到的 GlobalTransactionScanner 扫描的时候发现这是一个全局事务。
  2. 方法的最后有一行代码抛出了 RuntimeException,这主要是为了模仿全局事务的失败,并让 seata 走全局事务回滚逻辑。

事务扫描与边界定义

上文提到的 GlobalTransactionScanner 类,会在 spring 容器启动的时候,也被初始化。

在它的 afterPropertiesSet 方法被调用时,会触发 seata client 的初始化

    @Override

    public void afterPropertiesSet() {

        if (disableGlobalTransaction) {

            if (LOGGER.isInfoEnabled()) {

                LOGGER.info("Global transaction is disabled.");

            }

            return;

        }

        initClient();


}

关于 seata client 的初始化的细节,可以看看我写的另外一篇文章
《阿里开源分布式事务组件 seata :
seata client 通信层解析》

 


初始化客户端做的事情主要是建立与 seata server 的连接,并注册 TM 和 RM。接下来,在 wrapIfNecessary 方法里,实现对注解的扫描,并对添加了注解的方法添加 interceptor。  

这篇文章里我们暂时不讨论 TCC 模式,只讨论 AT 模式,也暂不讨论全局事务锁 GlobalLock 的实现,先忽略这些有关的逻辑,只关注事务处理逻辑。

                    Class serviceInterface = SpringProxyUtils.findTargetClass(bean);

                    Class[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                    if (!existsAnnotation(new Class[] {serviceInterface})

                        && !existsAnnotation(interfacesIfJdk)) {

                        return bean;

                    }

                    if (interceptor == null) {

                        interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);

                    }

在这里,interceptor 的实现是 GlobalTransactionalInterceptor,也就是说,以上文的案例为例子,当 BusinessServiceImpl 的 purchase 方法被调用的时候,实际上这个方法会被拦截器拦截,执行拦截器里的逻辑:

    @Override

    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {

        Class targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);

        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);


final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } else { return methodInvocation.proceed(); } }
private Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable { try { return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); }
public String name() { String name = globalTrxAnno.name(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); }
@Override public TransactionInfo getTransactionInfo() { TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(globalTrxAnno.timeoutMills()); transactionInfo.setName(name()); Set rollbackRules = new LinkedHashSet(); for (Class rbRule : globalTrxAnno.rollbackFor()) { rollbackRules.add(new RollbackRule(rbRule)); } for (String rbRule : globalTrxAnno.rollbackForClassName()) { rollbackRules.add(new RollbackRule(rbRule)); } for (Class rbRule : globalTrxAnno.noRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule)); } for (String rbRule : globalTrxAnno.noRollbackForClassName()) { rollbackRules.add(new NoRollbackRule(rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getCause()); throw e.getCause(); default: throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code); } } }

在执行 handleGlobalTransaction 方法时,实际上采用模板模式,委托给了 TransactionalTemplate 类去执行标准的事务处理流程。如下所示:

   /**

     * Execute object.

     *

     * @param business the business

     * @return the object

     * @throws TransactionalExecutor.ExecutionException the execution exception

     */

    public Object execute(TransactionalExecutor business) throws Throwable {

        // 1. get or create a transaction

        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();


// 1.1 get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try {
// 2. begin transaction beginTransaction(txInfo, tx);
Object rs = null; try {
// Do Your Business rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback. completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; }
// 4. everything is fine, commit. commitTransaction(tx);
return rs; } finally { //5. clear triggerAfterCompletion(); cleanUp(); } }


事务处理逻辑实际上是一种模板,将事务相关的处理逻辑放在 try 块里,发现异常后执行回滚,正常执行则执行提交。  


在这里有个需要注意的地方是,seata 不把提交这个动作放在 try 块里,因为在 seata 里,全局事务的提交实际上是可以异步执行的。  

因为全局事务如果进行到提交这一阶段,那么意味着各个分支事务已经执行过本地提交,全局事务的提交阶段仅仅是删除 undo_log 里的记录,这个记录删除或者不删除,实际上不会改变全局事务已经正常完成的事实。所以它可以用程序异步去做,或者以人工介入的方式去做,所以 seata 认为,全局事务提交失败,不需要执行回滚流程。