跟我学RocketMQ之消息消费源码解析(1)
本章节重点讲解DefaultMQPushConsumer的代码逻辑。
DefaultMQPushConsumer使用样例
按照惯例还是先看一下DefaultMQPushConsumer的使用样例。
@PostConstruct public void init() { defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP"); defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr); // 从头开始消费 defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 消费模式:集群模式 defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING); // 注册监听器 defaultMQPushConsumer.registerMessageListener(messageListener); // 订阅所有消息 try { defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*"); defaultMQPushConsumer.start(); } catch (MQClientException e) { throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e); } LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!"); }
初始化过程中需要调用registerMessageListener将具体的消费实现Listener注入。
@Component(value = "notifySendListenerImpl") public class NotifySendListenerImpl implements MessageListenerConcurrently {
...省略部分代码... @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { // 消息解码 String message = new String(msg.getBody()); // 消费次数 int reconsumeTimes = msg.getReconsumeTimes(); String msgId = msg.getMsgId(); String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes; LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-接收到消息,message={},{}", message, logSuffix); // 请求组装 OrderResultNofityProtocol protocol = new OrderResultNofityProtocol(); protocol.decode(message); // 参数加签,获取用户privatekey String privateKey = protocol.getPrivateKey(); String notifyUrl = protocol.getMerchantNotifyUrl(); String purseId = protocol.getPurseId(); ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest(); chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId()) .setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis()))) .setOrder_status(NotifyConstant.NOTIFY_SUCCESS) .setPlat_orderid(protocol.getOrderId()) .setSign(chargeNotifyRequest.sign(privateKey)); LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-订单结果通知入参:{},{}", chargeNotifyRequest.toString(), logSuffix); // 通知发送 return sendNotifyByPost(reconsumeTimes, logSuffix, protocol, notifyUrl, purseId, chargeNotifyRequest); } } catch (Exception e) { LOGGER.error("[通知发送消息消费者]消费异常,e={}", LogExceptionWapper.getStackTrace(e)); } return ConsumeConcurrentlyStatus.RECONSUME_LATER; }
上面就是一个较为标准的在spring框架中使用RocektMQ的DefaultMQPushConsumer进行消费的主流程。
接下来我们重点分析一下源码实现。
初始化DefaultMQPushConsumer
首先看一下DefaultMQPushConsumer的初始化过程。
进入DefaultMQPushConsumer.java类,查看构造方法:
public DefaultMQPushConsumer(final String consumerGroup) { this(null, consumerGroup, null, new AllocateMessageQueueAveragely()); }
调用了它的同名构造,采用AllocateMessageQueueAveragely策略(平均散列队列算法
)
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.namespace = namespace; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); }
可以看到实际初始化是通过DefaultMQPushConsumerImpl实现的,DefaultMQPushConsumer持有一个defaultMQPushConsumerImpl的引用。
[DefaultMQPushConsumerImpl.java] public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { // 初始化DefaultMQPushConsumerImpl,将defaultMQPushConsumer的实际引用传入 this.defaultMQPushConsumer = defaultMQPushConsumer; // 传入rpcHook并指向本类的引用 this.rpcHook = rpcHook; }
注册消费监听MessageListener
我们接着看一下注册消费监听器的流程。
消费监听接口MessageListener有两个具体的实现,分别为
MessageListenerConcurrently -- 并行消费监听 MessageListenerOrderly -- 顺序消费监听
本文以MessageListenerConcurrently为主要讲解的对象。
查看MessageListenerConcurrently的注册过程。
@Override public void registerMessageListener( MessageListenerConcurrently messageListener) { // 将实现指向本类引用 this.messageListener = messageListener; // 进行真实注册 this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }
接着看defaultMQPushConsumerImpl.registerMessageListener
DefaultMQPushConsumerImpl.java public void registerMessageListener(MessageListener messageListener) { this.messageListenerInner = messageListener; }
可以看到DefaultMQPushConsumerImpl将真实的messageListener实现指向它本类的messageListener引用。
订阅topic
接着看一下订阅topic的主流程。
topic订阅主要通过方法subscribe实现,首先看一下DefaultMQPushConsumer的subscribe实现
@Override public void subscribe(String topic, String subExpression) throws MQClientException { this.defaultMQPushConsumerImpl .subscribe(withNamespace(topic), subExpression); }
可以看到是调用了DefaultMQPushConsumerImpl的subscribe方法。
public void subscribe(String topic, String subExpression) throws MQClientException { try { // 构建主题的订阅数据,默认为集群消费 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression); // 将topic的订阅数据进行保存 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { // 如果MQClientInstance不为空,则向所有的broker发送心跳包,加锁 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
看一下buildSubscriptionData代码逻辑
[FilterAPI.java] public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { // 构造一个SubscriptionData实体,设置topic、表达式(tag) SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); // 如果tag为空或者为"*",统一设置为"*",即订阅所有消息 if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { // tag不为空,则先按照‘|’进行分割 String[] tags = subString.split("\\|\\|"); if (tags.length > 0) { // 遍历tag表达式数组 for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { // 将每个tag的值设置到tagSet中 subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { // tag解析异常 throw new Exception("subString split error"); } } return subscriptionData; }
看一下sendHeartbeatToAllBrokerWithLock代码逻辑
[MQClientInstance.java] public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { // 发送心跳包 this.sendHeartbeatToAllBroker(); this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed."); } }
可以看到,同步发送心跳包给所有的broker,而该过程是通过RemotingClient统一实现的,通过调用RemotingClient.invokeSync实现心跳包的发送,底层是通过Netty实现的。具体细节本文不进行展开。
启动消费客户端
上述初始化流程执行完毕之后,通过start()方法启动消费客户端。
@Override public void start() throws MQClientException { // 设置消费者组 setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); // 启动消费客户端 this.defaultMQPushConsumerImpl.start(); // trace处理逻辑 if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
关于trace的处理逻辑,本文不再展开,感兴趣的同学可以移步 跟我学RocketMQ之消息轨迹实战与源码分析
接着看defaultMQPushConsumerImpl.start()方法逻辑
[DefaultMQPushConsumerImpl.java] public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED;
首次启动后,执行配置检查,该方法为前置校验方法,主要进行消费属性校验。
this.checkConfig();
将订阅关系配置信息进行复制
this.copySubscription();
如果当前为集群消费模式,修改实例名为pid
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); }
创建一个新的MQClientInstance实例,如果已经存在直接使用该存在的MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
为消费者负载均衡实现rebalanceImpl设置属性
// 设置消费者组 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); // 设置消费模式 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); // 设置队列分配策略 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); // 设置当前的MQClientInstance实例 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); // 注册消息过滤钩子 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
处理offset存储方式
// offsetStore不为空则使用当前的offsetStore方式 if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { // 否则根据消费方式选择具体的offsetStore方式存储offset switch (this.defaultMQPushConsumer.getMessageModel()) { // 如果是广播方式,则使用本地存储方式 case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; // 如果是集群方式,则使用远端broker存储方式存储offset case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } // 加载当前的offset this.offsetStore.load();
根据MessageListener的具体实现方式选取具体的消息拉取线程实现。
// 如果是MessageListenerOrderly顺序消费接口实现 // 消息消费服务选择:ConsumeMessageOrderlyService(顺序消息消费服务) if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } // 如果是MessageListenerConcurrently并行消息消费接口实现 // 消息消费服务选择:ConsumeMessageConcurrentlyService(并行消息消费服务) else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); }
选择并初始化完成具体的消息消费服务之后,启动消息消费服务。consumeMessageService主要负责对消息进行消费,它的内部维护了一个线程池。
// 启动消息消费服务 this.consumeMessageService.start();
接着向MQClientInstance注册消费者,并启动MQClientInstance。这里再次强调
一个JVM中所有消费者、生产者持有同一个MQClientInstance,且MQClientInstance只会启动一次
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break;
如果MQClientInstance已经启动,或者已经关闭,或者启动失败,重复调用start会报错。这里也能直观的反映出: MQClientInstance的启动只有一次
case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; }
启动完成执行后续收尾工作
// 订阅关系改变,更新Nameserver的订阅关系表 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); // 检查客户端状态 this.mQClientFactory.checkClientInBroker(); // 发送心跳包 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 唤醒执行消费者负载均衡 this.mQClientFactory.rebalanceImmediately(); }
copySubscription(),消息重试topic处理逻辑
消费者启动流程较为重要,我们接着对其中的重点方法展开讲解。这部分内容可以暂时跳过,不影响对主流程的把控。
我们研究一下copySubscription方法的实现细节。
[DefaultMQPushConsumerImpl.java] private void copySubscription() throws MQClientException { try { // 首先获取订阅信息 Map sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } // 为defaultMQPushConsumer设置具体的MessageListener实现 if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); }
根据消费类型选择是否进行重试topic订阅
switch (this.defaultMQPushConsumer.getMessageModel()) { // 如果是广播消费模式,则不进行任何处理,即无重试 case BROADCASTING: break; // 如果是集群消费模式,订阅重试主题消息 case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
如果是集群消费模式,会订阅重试主题消息
获取重试topic,规则为 RETRY_GROUP_TOPIC_PREFIX + consumerGroup
,即: “%RETRY%”+消费组名
;
为重试topic设置订阅关系,订阅所有的消息;
消费者启动的时候会自动订阅该重试主题,并参与该topic的消息队列负载过程。
小结
到此,我们就DefaultMQPushConsumer的初始化、启动、校验以及topic订阅、重试等代码实现
细节进行了较为详细的讲解。
下一章节,我将带领读者对消息消费线程 consumeMessageService
的实现进行分析,我们下篇文章见。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。