源码版本号:版本号:4.9.4
RocketMQ消息拉取由PullMessageService
与RebalanceService
共同协作完成
PullMessageService
负责消息拉取,每一次拉取消息都是从它的pullRequestQueue
队列里面拉取任务,
而往这个队列放任务是由RebalanceService
触发的。
在 MQClientInstance#start
方法中的
第242行会调用 RebalanceService#start
方法
// MQClientInstance 的第242行
this.rebalanceService.start();
RebalanceService
继承了ServiceThread
,ServiceThread
实现了Runnable
接口,
RebalanceService
是MQClientInstance
中的属性并跟随MQClientInstance
启动。
查看它的run方法,默认情况每隔20秒执行一次
public class RebalanceService extends ServiceThread {// 找到36行@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);// 调用的是`MQClientInstance#doRebalance`方法this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
}
查看MQClientInstance#doRebalance
方法
public class MQClientInstance {// 找到954行public void doRebalance() {// 遍历MQClientInstance实例中的消费者for (Map.Entry entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {// 调用DefaultMQPushConsumerImpl#doRebalance方法impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}}
}
查看DefaultMQPushConsumerImpl#doRebalance
方法
public class DefaultMQPushConsumerImpl implements MQConsumerInner {// 找到1006行@Overridepublic void doRebalance() {if (!this.pause) {/*** 这里的rebalanceImpl是RebalancePushImpl, 继承了RebalanceImpl* 调用 RebalanceImpl#doRebalance方法*/this.rebalanceImpl.doRebalance(this.isConsumeOrderly());}}
}
这里的rebalanceImpl
是RebalancePushImpl
,继承了RebalanceImpl
查看RebalanceImpl#doRebalance
方法
public abstract class RebalanceImpl {/*** 217行*/public void doRebalance(final boolean isOrder) {// 拿到消费者订阅的topic信息Map subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry entry : subTable.entrySet()) {final String topic = entry.getKey();try {// 通过topic进行负载this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}// 省略}
}
通过topic进行负载,分为集群模式与广播模式,广播模式比较简单,这里主要分析集群模式。
下面提到的定时任务都是MQClientInstance
启动的时候调用MQClientInstance#startScheduledTask
方法,
开启一堆的定时任务。
public abstract class RebalanceImpl {// 239行private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {// 省略......这里是广播模式break;}case CLUSTERING: {// 258行/*** 1.通过topic找到对应的队列信息* 有定时任务每隔30s会从broker拉取最新的topic信息* 具体代码在MQClientInstance#updateTopicRouteInfoFromNameServer()方法里面*/Set mqSet = this.topicSubscribeInfoTable.get(topic);/*** 2.从broker拿到所有订阅了这个topic的clientId, 生产者和消费者在启动的时候生成的* 有定时任务每隔30s会把消费者订阅了哪些实例通过心跳机制告诉broker* 具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock*/List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);/*** 省略......* 找到271行*/if (mqSet != null && cidAll != null) {List mqAll = new ArrayList();mqAll.addAll(mqSet);// 对clientId和队列排序Collections.sort(mqAll);Collections.sort(cidAll);/*** 3.拿到分配策略, 有多种分配策略, 后面再分析如何分配的*/AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List allocateResult = null;try {/*** 通过分配策略计算出当前消费者应该消费哪些队列* 会保证一个队列只能被该消费组内的一个实例消费*/allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set allocateResultSet = new HashSet();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// 4.这里面会将分配到的队列封装成PullRequest放到PullMessageService的pullRequestQueue队列里面boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {/*** 省略......如果分配的结果跟上次不一样, 会做一些事情*/}}break;}default:break;}}
}
第一步:从主题订阅信息缓存表中获取主题的队列信息。
因为有定时任务每隔30s会从broker拉取最新的topic信息,
具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock
。
所以每个消费者都有每个topic最近的队列信息。
第二步:发送请求从broker中获取该消费组内当前所有的消费者clientId。
因为有定时任务每隔30s会把消费者订阅了哪些实例通过心跳机制告诉broker,
具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock
,
所以broker保存了所有消费该topic的所有的clientId。
第三步:首先对cidAll, mqAll排序,这个很重要,同一个消费组内看到的视图保持一致,
确保同一个消费队列不会被多个消费者分配。 RocketMQ消息队列分配算法接口。
第四步:队列负载策略。
RocketMQ默认提供5种分配算法。以AllocateMessageQueueAveragely
为例:
如果现在有8个消息消费队列q1, q2, q3, q4, q5, q6, q7, q8,
有3个消费者c1, c2, c3,那么根据该负载算法,消息队列分配如下:
c1: q1, q2, q3
c2:q4, q5, q6
c3:q7, q8
第五步:调用RebalanceImpl#updateProcessQueueTableInRebalance
方法
public abstract class RebalanceImpl { // 找到329行private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,final boolean isOrder) {boolean changed = false;/*** processQueueTable保存了每个队列对应的ProcessQueue* 从broker拉取回来的消息会先存放在ProcessQueue里面*/Iterator> it = this.processQueueTable.entrySet().iterator();/*** 省略......* 主要是对processQueueTable中的数据与mqSet做对比* 如果processQueueTable中的MessageQueue不在mqSet中, * 说明分配算法没有将这个队列再次分配给自己, 则不能消费这个队列了, 做一些处理*//*** 找到367行, 开始生成PullRequest*/List pullRequestList = new ArrayList();for (MessageQueue mq : mqSet) {/*** 如果 processQueueTable 中不存在 这个MessageQueue* 说明是第一次分配到这个队列, 则需要构造PullRequest*/if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}/*** 清掉这个队列的在本地内存的消费进度* RemoteBrokerOffsetStore.removeOffset(mq);)*/this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();long nextOffset = -1L;try {/*** 从broker获取这个队列的消费进度*/nextOffset = this.computePullFromWhereWithException(mq);} catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue;}if (nextOffset >= 0) {// 将MessageQueue对应的ProcessQueue保存到processQueueTable中ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}/*** 调用RebalancePushImpl#dispatchPullRequest方法* 将构造好的PullRequest放到PullMessageService的pullRequestQueue队列中*/this.dispatchPullRequest(pullRequestList);return changed;}
}
具体的做法是,先将分配到的消息队列集合mqSet
与processQueueTable
做一个过滤比对。
上图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。
将这些队列设置Dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,
这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回true。
如果等待1s后,仍然拿不到当前消费处理队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;
上图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。
判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置Dropped属性为true,
并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry;
最后,为过滤后的消息队列集合(mqSet)中的每个MessageQueue
创建一个ProcessQueue
对象并存入
RebalanceImpl
的processQueueTable
队列中(其中调用RebalanceImpl
实例的computePullFromWhere(MessageQueue mq)
方法
获取该MessageQueue
对象的下一个进度消费值offset
,随后填充至接下来要创建的pullRequest
对象属性中),
并创建拉取请求对象PullRequest
添加到拉取列表pullRequestList
中,最后执行dispatchPullRequest()
方法,
将拉取消息的请求对象PullRequest
依次放入PullMessageService
服务线程的阻塞队列pullRequestQueue
中,
待该服务线程取出后向Broker端发起Pull消息的请求。