RocketMQ源码分析之消息拉取流程

  在《RocketMQ源码分析之RebalanceService》中回答了消费者在第一次启动后是如何来获取消息这个问题,那么在构建PullRequest(消息拉取任务)后,消费者与broker之间是如何交互来完成消息拉取任务?本篇文章就来分析消息拉取流程。在consumer端与消息拉取流程相关的服务主要是RebalanceService和PullMessageService,RebalanceService主要负责consumer端消息队列负载均衡及构建PullRequest,PullMessageService主要负责consumer端消息拉取。下面从PullMessageService入手来分析。
  PullMessageService是在consumer启动过程中启动MQClientInstance实例时启动的,具体如下:

public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

  PullMessageService继承ServiceThread,其本质是一个线程,在执行this.pullMessageService.start()时会执行其run方法,run方法的实现逻辑是:从pullRequestQueue中获取一个PullRequest,如果pullRequestQueue为空,则线程将会阻塞,直到有任务被放入,然后调用pullMessage方法进行消息拉取。

@Override
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

  接着再来看pullMessage(final PullRequest pullRequest)方法:在这个方法中会根据consumerGroup来获取消费者的内部实现MQConsumerInner,然后将其强制转换为DefaultMQPushConsumerImpl,最后会调用DefaultMQPushConsumerImpl的pullMessage方法。在这里面我们也不难发现PullMessageService只为PUSH模式服务。

private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}

  接着来看DefaultMQPushConsumerImpl的pullMessage方法:

public void pullMessage(final PullRequest pullRequest) {
//从pullRequest中获取其ProcessQueue,如果ProcessQueue没有被丢弃则将其lastPullTimestamp属性更新为当前时间
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
/*
判断消费者的状态是否正常,如果消费状态异常则将拉取任务pullRequest延迟3s再次放入到PullMessageService的拉取任务队列中
结束本次消息拉取
*/

this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
//如果消费者被挂起则将拉取任务pullRequest延迟1s再次放入到PullMessageService的拉取任务队列中
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
//消息拉取流控
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}

pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
/*
根据pullRequest中的topic信息,从topic的订阅信息中获取其对应的订阅信息,
如果订阅信息为空则将拉取任务pullRequest延迟3s再次放入到PullMessageService的拉取任务队列中并结束本次消息拉取
*/

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}

final long beginTimestamp = System.currentTimeMillis();
//构建回调pullCallback,当broker端返回response给consumer端时会执行这个回调
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset);
}

break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
//从内存中获取pullRequest中MessageQueue的消费进度
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}

classFilter = sd.isClassFilterMode();
}
//构建消息拉取的系统标记
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
//与broker端交互
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}

  pullKernelImpl方法具体如下:

public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
/*
根据brokerName、brokerId从mQClientFactory中获取broker的地址
在RocketMQ中相同名称的broker会有多个(主broker和从broker),但是brokerId会不一样
在每次拉取消息后会给出下次拉取消息时的建议,即从主broker上拉取还是从从broker上拉取
*/

FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
//如果findBrokerResult为空,则首先会更新客户端topic路由信息表
//然后再次执行findBrokerAddressInSubscribe方法获取broker的地址
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}

if (findBrokerResult != null) {
{
// check version
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;

if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
//构建PullMessageRequestHeader
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
/*
如果消息过滤的模式是类过滤,则根据topic、broker地址找到注册在broker上的FilterServer地址,从FilterServer上拉取信息,
否则从broker上拉取信息
*/

String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}

PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);

return pullResult;
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

  进入public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) 方法中会发现客户端向broker发送的请求类型是“RequestCode.PULL_MESSAGE”,通过在代码中查找可以发现broker端处理该类型请求的是PullMessageProcessor的processRequest方法。

/**
* PullMessageProcessor
*/

this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

  下面来看broker端是如何处理客户端发送的拉取消息的请求。
  1.构建返回给consumer端的response并解析发送到broker端的request
  2.检查broker的权限是否可读,如果不可读则将response的code设置为ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST并返回给consumer端
  3.在broker端获取consumer消费组的信息,如果配置信息中consumeEnable属性为false,则将response的code设置为ResponseCode.NO_PERMISSION并返回给consumer端
  4.从请求中获取消息拉取时设置的系统标记
  5.在broker端获取消息topic的配置信息,如果配置信息为空则将response的code设置为ResponseCode.TOPIC_NOT_EXIST并返回给consumer端
  6.检查topic的权限是否可读,如果不可读则将response的code设置为ResponseCode.NO_PERMISSION并返回给consumer端
  7.检查待拉取信息的MessageQueue的queueid是否合法,如果不合法则将response的code设置为ResponseCode.SYSTEM_ERROR并返回给consumer端
  8.根据topic、消息过滤表达式构建订阅消息实体,如果不是TAG模式则构建过滤数据consumerFilterData
  9.构建消息过滤对象messageFilter
  10.根据requestHeader中消费者的消费组名称、topic名称、MessageQueue的queueId、待拉取信息的ConsumeQueue的逻辑偏移量、最大拉取消息条数和消息过滤器来查找消息。getMessage方法中会计算出下次拉取任务的开始偏移量nextBeginOffset
  11.如果获取到的getMessageResult不为空,则在response中设置nextBeginOffset、minOffset、maxOffset
  12.如果从节点中包含下次拉取的偏移量则设置为下一次拉取任务的brokerId
  13.根据getMessageResult的status来设置response中的code,其对应关系如下:

getMessageResult status ResponseCode
FOUND SUCCESS
MESSAGE_WAS_REMOVING PULL_RETRY_IMMEDIATELY
NO_MATCHED_LOGIC_QUEUE、NO_MESSAGE_IN_QUEUE、OFFSET_OVERFLOW_BADLY 、OFFSET_TOO_SMALL PULL_OFFSET_MOVED
NO_MATCHED_MESSAGE PULL_RETRY_IMMEDIATELY
OFFSET_FOUND_NULL 、OFFSET_OVERFLOW_ONE PULL_NOT_FOUND

  14.如果当前节点是主节点并且commitlog标记可用,则会触发更新消息消费进度
  15.将response返回给consumer端
  broker将response返回给consumer端时会回调PullCallBack的onSuccess或者onException,PullCallBack就是pullMessage(final PullRequest pullRequest) 方法中创建的。回调PullCallBack的方法如下:

this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});

  接下来看看consumer端收到broker返回的response会如何处理?
  1.根据broker端返回的response将其处理成PullResult,这一过程调用的是processPullResponse方法,该方法会进行状态码转换、构建PullResult对象。

response code pull status
SUCCESS FOUND
PULL_NOT_FOUND NO_NEW_MSG
PULL_RETRY_IMMEDIATELY NO_MATCHED_MSG
PULL_OFFSET_MOVED OFFSET_ILLEGAL

  2.根据pullResult更新下一次拉取的偏移量,如果pullResult中的msgFoundList为空则立刻把PullRequest放入PullMessageService的pullRequestQueue队列中
  3.将拉取到的消息放入processQueue中,然后再将消息提交到ConsumeMessageQueue(ConsumeMessageQueue分为两种,分别是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService)中用于consumer消费
  4.如果pullInterval大于0,则将pullRequest延迟pullInterval毫秒后放入PullMessageService的pullRequestQueue队列中,这样形成持续拉取消息流程
  最后,总结下消息拉取流程,该流程总体上分为三步:
  1.consumer端封装消息拉取请求PullRequest并将其发送给broker
  2.broker根据请求查找并返回消息给consumer端
  3.consumer端将返回的消息消费
  那么consumer端获取到消息后如何进行消费,下篇文章再分析。