RocketMQ(一):推拉消费模型客户端实践
现在的的互联网系统中,mq已经必备基础设施了,我们已明显感觉它的必要性与强大。然而,它的本质是啥?存储转发系统罢了!
MQ有很多成熟产品,以RocketMQ作为切入点,成本较低。MQ主要角色为:生产者、消费者、消息服务端。
本文先来看看消费者的实现。现在通用的消费模型中,有推和拉两种模型。各有优劣,一言以避之,推更实时,拉更容易控制。
一、 push模式消费例子
/** * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}. */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { /* * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("localhost:9876"); /* * Specify where to start in case the specified consumer group is a brand new one. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /* * Subscribe one more more topics to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
上面的消费操作实践中,主要这么几步:
1. 注册消费者实例到nameserver;
2. consumer.subscribe() 消息topic;
3. 注册消费消息监听回调业务方法;
4. 启动消费;
说了这么多,其实主要就一步:实现消费监听业务回调方法,只关注业务实现。
下面我们来细看一下,这简单的注册消费,背后的逻辑。
1. 消息接收消费
netty 负责消息的io接入
// org.apache.rocketmq.remoting.netty.NettyRemotingClient.NettyClientHandler#channelRead0 class NettyClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } // org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived /** * Entry of incoming command processing. * * * Note: * The incoming remoting command may be *
-
*
- An inquiry request from a remote peer component; *
- A response to a previous request issued by this very participant. *
总结一下push模式的消费过程:
1. netty 接收到请求,channelRead InBoundHandler;
2. 提交到回调线程池,由 responseTable 和 opaque 建立联系;
3. 回调线程池处理完成后,提交到消费者线程池, 由 consumeExecutor 和 ConsumeRequest 等待通信;
4. 调用业务消费代码,实现了 MessageListenerConcurrently 的接口;
push的消费模式,前端代码简单,由服务端进行推送数据,能够在消息到达时及时处理,省去了客户端无谓的轮询类操作。但是需要服务端复杂化推送逻辑,否则很难很好的实现MQ的美好特性。
二、 pull拉取模式消费的例子
public class PullConsumer { private static final Map OFFSE_TABLE = new HashMap(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); // 消费指定topic的所有队列数据 Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { // 拉取队列数据 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: // 如果没有消费到数据,则跳出本段消费,其他情况继续消息当前队列数据 break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
上面的操作实践中,主要这么几步:
1. 注册消费实例consumer到nameserver;
2. fetchSubscribeMessageQueues() 查询对应的topic消息队列以便可以消费其下所有数据;
3. consumer.pullBlockIfNotFound() 获取消息队列,消费消息;
4. 自行维护保存消费偏移量,以便为下一次消费提供依据;
5. 循环以上操作;
1. 服务端调用
// org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#pullBlockIfNotFound @Override public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums); } // org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#pullBlockIfNotFound public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 获取订阅信息 SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } // org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#pullSyncImpl private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.isRunning(); if (null == mq) { throw new MQClientException("mq is null", null); } if (offset < 0) { throw new MQClientException("offset < 0", null); } if (maxNums <= 0) { throw new MQClientException("maxNums <= 0", null); } // ... this.subscriptionAutomatically(mq.getTopic()); // ... int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); // 拉取数据 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), subscriptionData.getExpressionType(), isTagType ? 0L : subscriptionData.getSubVersion(), offset, maxNums, sysFlag, 0, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.SYNC, null ); // 处理返回数据,解析为一条条消息 this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); //If namespace is not null , reset Topic without namespace. this.resetTopic(pullResult.getMsgFoundList()); if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return pullResult; } // org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 选取一个 broker 进行获取消息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } // 构造拉取消费请求 PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } // org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage public PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; } // org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageSync private PullResult pullMessageSync( final String addr, final RemotingCommand request, final long timeoutMillis ) throws RemotingException, InterruptedException, MQBrokerException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processPullResponse(response); } // 解析返回数据 private PullResult processPullResponse( final RemotingCommand response) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: pullStatus = PullStatus.FOUND; break; case ResponseCode.PULL_NOT_FOUND: pullStatus = PullStatus.NO_NEW_MSG; break; case ResponseCode.PULL_RETRY_IMMEDIATELY: pullStatus = PullStatus.NO_MATCHED_MSG; break; case ResponseCode.PULL_OFFSET_MOVED: pullStatus = PullStatus.OFFSET_ILLEGAL; break; default: throw new MQBrokerException(response.getCode(), response.getRemark()); } PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); } // org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List msgList = MessageDecoder.decodes(byteBuffer); List msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } if (this.hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this.executeHook(filterMessageContext); } for (MessageExt msg : msgListFilterAgain) { String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(traFlag)) { msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); } MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); } pullResultExt.setMsgFoundList(msgListFilterAgain); } pullResultExt.setMessageBinary(null); return pullResult; }
2. 构造订阅信息
// org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#getSubscriptionData private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression) throws MQClientException { if (null == mq) { throw new MQClientException("mq is null", null); } try { return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); } } // org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData; }
3. 选择broker
// org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe public FindBrokerResult findBrokerAddressInSubscribe( final String brokerName, final long brokerId, final boolean onlyThisBroker ) { String brokerAddr = null; boolean slave = false; boolean found = false; // 根据 brokerName 获取 broker 的列表 HashMap map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { brokerAddr = map.get(brokerId); // master 的 brokerId=0 slave = brokerId != MixAll.MASTER_ID; found = brokerAddr != null; if (!found && !onlyThisBroker) { Entry entry = map.entrySet().iterator().next(); brokerAddr = entry.getValue(); slave = entry.getKey() != MixAll.MASTER_ID; found = true; } } if (found) { return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); } return null; }
拉取的方式消费数据,就是一个个的直接请求,需要自行维护偏移量,负载均衡等。但控制性也好,比如消费慢了,我就慢点拉,消费快了就加快拉取速度。另外,也减少了服务端的复杂度!
三、 消费者在启动消费时干了啥?
不管是推消费模型,还是拉消费模型,都有一个消费者注册的过程,如 consumer.start() ,这里都干了啥?
1. 推模式的消费注册
// 使用的是 DefaultMQPushConsumer 实现 // org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start /** * This method gets internal infrastructure readily to serve. Instances must call this method after configuration. * * @throws MQClientException if there is any client error. */ @Override public void start() throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } } // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { // 并发消费模式 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); } // 开启各个服务线程 // org.apache.rocketmq.client.impl.factory.MQClientInstance#start public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } } // 并发消费实例构造,创建线程池 // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#ConsumeMessageConcurrentlyService public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue(); // 消费线程准备好,后续被提交任务即可 this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); } // 创建消费客户端实例,为后续消息拉取创建环境,比如:创建线程池、连接到远程等等 // org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; } // org.apache.rocketmq.client.impl.factory.MQClientInstance#MQClientInstance public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { this.clientConfig = clientConfig; this.instanceIndex = instanceIndex; this.nettyClientConfig = new NettyClientConfig(); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); this.clientRemotingProcessor = new ClientRemotingProcessor(this); // MQClientAPIImpl 是连接到远程的一个管理实现 this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); if (this.clientConfig.getNamesrvAddr() != null) { this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr()); } this.clientId = clientId; this.mQAdminImpl = new MQAdminImpl(this); this.pullMessageService = new PullMessageService(this); this.rebalanceService = new RebalanceService(this); this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); this.defaultMQProducer.resetClientConfig(clientConfig); this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}", this.instanceIndex, this.clientId, this.clientConfig, MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); } // org.apache.rocketmq.client.impl.MQClientAPIImpl#MQClientAPIImpl public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook, final ClientConfig clientConfig) { this.clientConfig = clientConfig; topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName()); this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); this.clientRemotingProcessor = clientRemotingProcessor; this.remotingClient.registerRPCHook(rpcHook); this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null); }
2. 拉模式的消费注册
// 使用的是 DefaultMQPullConsumer 实现 // org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#start @Override public void start() throws MQClientException { // 包装consumerGroup 信息 this.setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPullConsumerImpl.start(); } // org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#start public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // 检测配置信息是否正确 this.checkConfig(); this.copySubscription(); if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { // 将 instanceName 设置为 pid this.defaultMQPullConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPullConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); } else { // 根据不一样的消费方式,创建不同的 offsetStore switch (this.defaultMQPullConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 开启远程连接,线程池等等 mQClientFactory.start(); log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } }
pull模式与push模式基本模式相同,只是具体实现不同以及push模式存在另外的消费线程池。不再重复。
客户端进行消费消息,基本都是一此连接的管理,复杂度都不高。主要还是MQ服务端功能,值得深入。
欲知后事如何,且听下文分解。