自己写分布式事务框架之实现事务回滚

上一篇文章中,我们对事务提交部分的框架逻辑及代码实现做了较为详细的讲解;本文中,我们接着分析一下事务回滚阶段的机理及代码实现逻辑。

这里主要看图的下半部分。

当事务下游应用达到最大消费次数,事务回滚被消息持久化之后,ShieldTXC的消息发送线程sendTxcMessageScheduler会扫描到待发送的回滚消息并投递到 [事务回滚队列]

第一阶段:实现回滚逻辑

事务上游应用在启动过程中初始化了ShieldTxcConsumerListenerAdapter消费适配器,并通过ShieldTxcRollbackListener实现了回滚逻辑。

@Service
public class TxConsumeService implements InitializingBean {

    @Value("${shield.event.rocketmq.nameSrvAddr}")
    String nameSerAddr;
    @Value("${shield.event.rocketmq.topicSource}")
    String topic;

    @Override
    public void afterPropertiesSet() throws Exception {
        new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcRollbackListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                System.out.println("测试消费【回滚】ShieldTxcRollbackListener");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }));
    }
}

这段代码在之前的事务提交阶段已经讲解过,事务发起端需要自行实现回滚逻辑,这样才能在异常发生时与事务下游保持数据一致性。更多的细节此处就不再赘述。

第二阶段:回滚拦截

同ShieldTxcCommitListener类似,ShieldTxcRollbackListener也实现了对回滚消费逻辑的拦截,它的声明如下:

【ShieldTxcRollbackListener】

public class ShieldTxcRollbackListener implements MessageListenerConcurrently {

    private MessageListenerConcurrently txRollbackListener;

    public ShieldTxcRollbackListener(MessageListenerConcurrently txRollbackListener) {
        this.txRollbackListener = txRollbackListener;
    }

ShieldTxcRollbackListener同样实现了MessageListenerConcurrently接口。在构造过程中将外界传入的MessageListenerConcurrently实例的引用指向了内部的MessageListenerConcurrently引用。

ShieldTxcRollbackListener同样是MessageListenerConcurrently实例,通过它的consumeMessage方法代理了外部传入的MessageListenerConcurrently实例,通过加入了切面逻辑对消费过程做了进一步的处理。

我们重点对ShieldTxcCommitListener如何对真实的消费过程进行代理做深入的分析,核心思路同样是在真实的回滚消费逻辑之前加入前置处理,在真实消费逻辑之后加入后置处理。

@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    // 测试打印消息体
    for (MessageExt msg : msgs) {

        String msgBody = new String(msg.getBody());
        String msgId = msg.getMsgId();
        LOGGER.debug("[ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId={},msgBody={}", msgId, msgBody);

获取参数进行日志打印。

ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage.decode(msgBody);

ShieldEvent rollbackEvent = new ShieldEvent();
rollbackEvent.convert(shieldTxcMessage);

BaseEventService baseEventService =
        (BaseEventService) SpringApplicationHolder.getBean("baseEventService");

将消息协议进行反序列化,并转换为消息实体,从Spring上下文中获取数据库持久化服务BaseEventService,为后续数据库操作做准备。

try {
    // 取参数
    String bizKey = shieldTxcMessage.getBizKey();
    String txType = shieldTxcMessage.getTxType();
    String eventStatua = shieldTxcMessage.getEventStatus();
    String appId = shieldTxcMessage.getAppId();
    String eventType = shieldTxcMessage.getEventType();

    // 回滚消息持久化
    rollbackEvent.setEventType(eventType)
            .setTxType(TXType.ROLLBACK.toString())
            .setEventStatus(EventStatus.CONSUME_INIT.toString())
            .setContent(shieldTxcMessage.getContent())
            .setAppId(shieldTxcMessage.getAppId())
            .setBizKey(bizKey)
            .setId(Integer.valueOf(shieldTxcMessage.getId()));

    // 入库失败回滚
    boolean insertResult = baseEventService.insertEventWithId(rollbackEvent);
    if (!insertResult) {
        LOGGER.warn("[ShieldTxcRollbackListener] insert RollbackShieldEvent Consume Message failed,msgId={}", msgId);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

上游执行回滚消息持久化,持久化成功后状态为 CONSUME_INIT (消费初始化),

// 改消费处理中
doUpdateMessageStatusProcessing(baseEventService, rollbackEvent);
// 真实消费
return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);

消息持久化后将消息状态改为消费处理中 [CONSUME_PROCESSING] ,进行真实消费过程,真实消费完成后对消费结果做后置处理。

} catch (Exception e) {
            // 幂等处理:唯一约束触发则直接进行消费
            if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
                LOGGER.debug("[ShieldTxcRollbackListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
                return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
            }
            if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
                LOGGER.debug("[ShieldTxcRollbackListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
                return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
            }
            // 其他异常重试
            LOGGER.warn("ShieldTxcRollbackListener Consume Message occurred Exception,msgId={}", msgId, e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return null;
}

这里的逻辑与ShieldTxcCommitListener的类似,对消费做幂等处理,重复消息不再入库,直接进行消费,上游回滚消费逻辑需要满足幂等性。

我们接着看一下doUpdateAfterRollbackConsumed方法如何进行消费后置处理。

/**
 * 拦截真实消费结果,根据消费结果更新消息状态
 *
 * @param consumeConcurrentlyStatus
 * @param baseEventService
 * @param rollbackEvent
 * @return
 */
private ConsumeConcurrentlyStatus doUpdateAfterRollbackConsumed(BaseEventService baseEventService,
                                                                ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
                                                                ShieldEvent rollbackEvent) {
    if (ConsumeConcurrentlyStatus.RECONSUME_LATER == consumeConcurrentlyStatus) {
        // 消费失败,消费状态仍旧处理中
        return consumeConcurrentlyStatus;
    }
    if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus) {
        // 消费成功,处理中改完成,更新前状态:消费处理中
        rollbackEvent.setBeforeUpdateEventStatus(rollbackEvent.getEventStatus());
        // 更新后状态:消费处理中
        rollbackEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
        boolean updateBefore = baseEventService.updateEventStatusById(rollbackEvent);
        if (!updateBefore) {
            // 更新失败,幂等重试.此时必定是系统依赖组件出问题了
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

拦截真实回滚消费逻辑获取消费结果,如果是消费成功 [CONSUME_SUCCESS] ,更改消息状态为消费完成 [CONSUME_PROCESSED] 。如果是消费失败,则会进行重试。

上游的回滚逻辑通过重试保证成功,如果达到MQ最大重试次数(RocketMQ是16次)会进死信,此时需要人工介入进行补偿操作。

在上线之前只要进行了充分的测试,保证业务无严重bug,确保线上MQ集群、应用集群的高可用,一般通过重试的方式都能够达成最终一致。如果出现大量数据不一致的情况,那么大概率就是应用存在bug或者MQ集群不稳定,此时需要人工介入进行排错。

小结

到这里我们就完成了整个框架原理图与代码实现的分析。

实现告一段落,我们接着看一个简单的应用demo,直观感受一下ShieldTXC分布式事务框架的魅力。

版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。