自己写分布式事务框架之实现事务回滚
上一篇文章中,我们对事务提交部分的框架逻辑及代码实现做了较为详细的讲解;本文中,我们接着分析一下事务回滚阶段的机理及代码实现逻辑。
这里主要看图的下半部分。
当事务下游应用达到最大消费次数,事务回滚被消息持久化之后,ShieldTXC的消息发送线程sendTxcMessageScheduler会扫描到待发送的回滚消息并投递到 [事务回滚队列] 。
第一阶段:实现回滚逻辑
事务上游应用在启动过程中初始化了ShieldTxcConsumerListenerAdapter消费适配器,并通过ShieldTxcRollbackListener实现了回滚逻辑。
@Service public class TxConsumeService implements InitializingBean { @Value("${shield.event.rocketmq.nameSrvAddr}") String nameSerAddr; @Value("${shield.event.rocketmq.topicSource}") String topic; @Override public void afterPropertiesSet() throws Exception { new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcRollbackListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.println("测试消费【回滚】ShieldTxcRollbackListener"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } })); } }
这段代码在之前的事务提交阶段已经讲解过,事务发起端需要自行实现回滚逻辑,这样才能在异常发生时与事务下游保持数据一致性。更多的细节此处就不再赘述。
第二阶段:回滚拦截
同ShieldTxcCommitListener类似,ShieldTxcRollbackListener也实现了对回滚消费逻辑的拦截,它的声明如下:
【ShieldTxcRollbackListener】
public class ShieldTxcRollbackListener implements MessageListenerConcurrently { private MessageListenerConcurrently txRollbackListener; public ShieldTxcRollbackListener(MessageListenerConcurrently txRollbackListener) { this.txRollbackListener = txRollbackListener; }
ShieldTxcRollbackListener同样实现了MessageListenerConcurrently接口。在构造过程中将外界传入的MessageListenerConcurrently实例的引用指向了内部的MessageListenerConcurrently引用。
ShieldTxcRollbackListener同样是MessageListenerConcurrently实例,通过它的consumeMessage方法代理了外部传入的MessageListenerConcurrently实例,通过加入了切面逻辑对消费过程做了进一步的处理。
我们重点对ShieldTxcCommitListener如何对真实的消费过程进行代理做深入的分析,核心思路同样是在真实的回滚消费逻辑之前加入前置处理,在真实消费逻辑之后加入后置处理。
@Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { // 测试打印消息体 for (MessageExt msg : msgs) { String msgBody = new String(msg.getBody()); String msgId = msg.getMsgId(); LOGGER.debug("[ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId={},msgBody={}", msgId, msgBody);
获取参数进行日志打印。
ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage(); shieldTxcMessage.decode(msgBody); ShieldEvent rollbackEvent = new ShieldEvent(); rollbackEvent.convert(shieldTxcMessage); BaseEventService baseEventService = (BaseEventService) SpringApplicationHolder.getBean("baseEventService");
将消息协议进行反序列化,并转换为消息实体,从Spring上下文中获取数据库持久化服务BaseEventService,为后续数据库操作做准备。
try { // 取参数 String bizKey = shieldTxcMessage.getBizKey(); String txType = shieldTxcMessage.getTxType(); String eventStatua = shieldTxcMessage.getEventStatus(); String appId = shieldTxcMessage.getAppId(); String eventType = shieldTxcMessage.getEventType(); // 回滚消息持久化 rollbackEvent.setEventType(eventType) .setTxType(TXType.ROLLBACK.toString()) .setEventStatus(EventStatus.CONSUME_INIT.toString()) .setContent(shieldTxcMessage.getContent()) .setAppId(shieldTxcMessage.getAppId()) .setBizKey(bizKey) .setId(Integer.valueOf(shieldTxcMessage.getId())); // 入库失败回滚 boolean insertResult = baseEventService.insertEventWithId(rollbackEvent); if (!insertResult) { LOGGER.warn("[ShieldTxcRollbackListener] insert RollbackShieldEvent Consume Message failed,msgId={}", msgId); return ConsumeConcurrentlyStatus.RECONSUME_LATER; }
上游执行回滚消息持久化,持久化成功后状态为 CONSUME_INIT (消费初始化),
// 改消费处理中 doUpdateMessageStatusProcessing(baseEventService, rollbackEvent); // 真实消费 return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
消息持久化后将消息状态改为消费处理中 [CONSUME_PROCESSING] ,进行真实消费过程,真实消费完成后对消费结果做后置处理。
} catch (Exception e) { // 幂等处理:唯一约束触发则直接进行消费 if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) { LOGGER.debug("[ShieldTxcRollbackListener::UNIQUE INDEX], message has existed,msgId={}", msgId); return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent); } if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) { LOGGER.debug("[ShieldTxcRollbackListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId); return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent); } // 其他异常重试 LOGGER.warn("ShieldTxcRollbackListener Consume Message occurred Exception,msgId={}", msgId, e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return null; }
这里的逻辑与ShieldTxcCommitListener的类似,对消费做幂等处理,重复消息不再入库,直接进行消费,上游回滚消费逻辑需要满足幂等性。
我们接着看一下doUpdateAfterRollbackConsumed方法如何进行消费后置处理。
/** * 拦截真实消费结果,根据消费结果更新消息状态 * * @param consumeConcurrentlyStatus * @param baseEventService * @param rollbackEvent * @return */ private ConsumeConcurrentlyStatus doUpdateAfterRollbackConsumed(BaseEventService baseEventService, ConsumeConcurrentlyStatus consumeConcurrentlyStatus, ShieldEvent rollbackEvent) { if (ConsumeConcurrentlyStatus.RECONSUME_LATER == consumeConcurrentlyStatus) { // 消费失败,消费状态仍旧处理中 return consumeConcurrentlyStatus; } if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus) { // 消费成功,处理中改完成,更新前状态:消费处理中 rollbackEvent.setBeforeUpdateEventStatus(rollbackEvent.getEventStatus()); // 更新后状态:消费处理中 rollbackEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString()); boolean updateBefore = baseEventService.updateEventStatusById(rollbackEvent); if (!updateBefore) { // 更新失败,幂等重试.此时必定是系统依赖组件出问题了 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
拦截真实回滚消费逻辑获取消费结果,如果是消费成功 [CONSUME_SUCCESS] ,更改消息状态为消费完成 [CONSUME_PROCESSED] 。如果是消费失败,则会进行重试。
上游的回滚逻辑通过重试保证成功,如果达到MQ最大重试次数(RocketMQ是16次)会进死信,此时需要人工介入进行补偿操作。
在上线之前只要进行了充分的测试,保证业务无严重bug,确保线上MQ集群、应用集群的高可用,一般通过重试的方式都能够达成最终一致。如果出现大量数据不一致的情况,那么大概率就是应用存在bug或者MQ集群不稳定,此时需要人工介入进行排错。
小结
到这里我们就完成了整个框架原理图与代码实现的分析。
实现告一段落,我们接着看一个简单的应用demo,直观感受一下ShieldTXC分布式事务框架的魅力。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。