RocketMQ源码之消息轨迹

一般生产环境都不会自动创建Topic,可以让运维同事帮忙提前创建好消息轨迹topic。
消息轨迹主要就是创建一个Topic来存放我们的消息轨迹数据,其它数据存储方式是跟原来的一致,所以我们关注此次核心的地方即可。
如果我们引进消息轨迹这个特性,那么是否需要将我们的Broker集群都开启消息轨迹呢?
如果我们没指定Broker开启消息轨迹又会怎么样呢?
从上面的源码我们知道,如果没有找到对应Topic的Broker信息,那么我们会往全部Broker发送轨迹消息数据。
但是,这样就会增加全部Broker的压力,因为消息轨迹数据跟业务消息数据是没有什么强关联的,一般也只是辅助我们查看消息的发送情况等。
所以,为了降低我们业务处理的Broker压力,官方建议我们,升级使用消息轨迹时,可以增加一台Broker机器,而且只有它开启消息轨迹功能traceTopicEnable即可,这样只有这台新机器会存放轨迹消息,而不会原有业务Broker造成太大的压力。
四、消费消息轨迹流程
消费者在消息轨迹的初始化和启动流程完全跟生产者消息轨迹的流程一模一样,在此就不重复记录了。
我们都知道,消费消息有两种方式:pull和push,那么相应的处理也是有处理钩子的地方,这里也暂不去探究了,只要知道有两个地方会触发钩子的执行,有兴趣的可以去自己去看下吧。
但是,在具体的钩子Hook实现是不一样的,那么现在我们看下消费端是如何做的呢?

public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
private TraceDispatcher localDispatcher;
public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "ConsumeMessageTraceHook";
}

// 消息拉在客户端,并且在消费业务消息之前所处理
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
// 消息列表为空则不处理
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext traceContext = new TraceContext();
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.SubBefore);// 轨迹类型:SubBefore
traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
List beans = new ArrayList();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
// 从消息属性中获取是否禁用轨迹消息开关,如果设置false,则不处理
if (traceOn != null && traceOn.equals("false")) {
continue;
}
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());//
traceBean.setKeys(msg.getKeys());//
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
if (beans.size() > 0) {
traceContext.setTraceBeans(beans);
traceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(traceContext);
}
}

// 业务消息处理完之后进行处理
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
// 消息列表为空则不处理
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// subbefore为空或其TraceBean列表为空,则不处理
return;
}
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);// 轨迹类型为:SubAfter
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//

// 计算耗时:(当前时间-subBefore时间戳)/ 消息列表大小,所以这个值也只是平均值,不是绝对的
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}
localDispatcher.append(subAfterContext);
}
}


费端消费轨迹流程大致跟生产者消息轨迹流程基本一致,不同的是消费端包含SubBefore和SubAfter两种轨迹类型,这是因为消息到消费端还需要等待业务逻辑进行处理,这样处理完成之后,就可以计算消费耗时了。

下图是关于消息轨迹的核心图,来自github上zongtanghu在issue上提供的。


五、总结
RocketMQ消息轨迹实现起来并不是十分复杂,通过源码我们也了解了大致的实现原理和流程处理。
1)RocketMQ消息轨迹是基于Hook钩子机制实现。
2)消息轨迹类型包含:Pub、SubBefore、SubAfter。
3)使用一台Broker来启用消息轨迹存储,减少其它业务Broker消息处理压力。

参考资料

源码分析RocketMQ消息轨迹

https://blog.csdn.net/prestigeding/article/details/98376981

RocketMQ消息轨迹-设计篇

https://blog.csdn.net/prestigeding/article/details/95922489