RocketMQ-消费队列负载
创始人
2025-05-28 03:05:30
0

源码版本号:版本号:4.9.4

RocketMQ消息拉取由PullMessageServiceRebalanceService共同协作完成

PullMessageService负责消息拉取,每一次拉取消息都是从它的pullRequestQueue队列里面拉取任务,
而往这个队列放任务是由RebalanceService触发的。

MQClientInstance#start 方法中的
第242行会调用 RebalanceService#start 方法

// MQClientInstance 的第242行
this.rebalanceService.start();

RebalanceService继承了ServiceThreadServiceThread实现了Runnable接口,
RebalanceServiceMQClientInstance中的属性并跟随MQClientInstance启动。

查看它的run方法,默认情况每隔20秒执行一次

public class RebalanceService extends ServiceThread {// 找到36行@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);// 调用的是`MQClientInstance#doRebalance`方法this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
}

查看MQClientInstance#doRebalance方法

public class MQClientInstance {// 找到954行public void doRebalance() {// 遍历MQClientInstance实例中的消费者for (Map.Entry entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {// 调用DefaultMQPushConsumerImpl#doRebalance方法impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}}
}

查看DefaultMQPushConsumerImpl#doRebalance方法

public class DefaultMQPushConsumerImpl implements MQConsumerInner {// 找到1006行@Overridepublic void doRebalance() {if (!this.pause) {/*** 这里的rebalanceImpl是RebalancePushImpl, 继承了RebalanceImpl* 调用 RebalanceImpl#doRebalance方法*/this.rebalanceImpl.doRebalance(this.isConsumeOrderly());}}
}

这里的rebalanceImplRebalancePushImpl,继承了RebalanceImpl

查看RebalanceImpl#doRebalance方法

public abstract class RebalanceImpl {/*** 217行*/public void doRebalance(final boolean isOrder) {// 拿到消费者订阅的topic信息Map subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry entry : subTable.entrySet()) {final String topic = entry.getKey();try {// 通过topic进行负载this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}// 省略}
}

通过topic进行负载,分为集群模式与广播模式,广播模式比较简单,这里主要分析集群模式。

下面提到的定时任务都是MQClientInstance启动的时候调用MQClientInstance#startScheduledTask方法,
开启一堆的定时任务。

public abstract class RebalanceImpl {// 239行private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {// 省略......这里是广播模式break;}case CLUSTERING: {// 258行/*** 1.通过topic找到对应的队列信息* 有定时任务每隔30s会从broker拉取最新的topic信息* 具体代码在MQClientInstance#updateTopicRouteInfoFromNameServer()方法里面*/Set mqSet = this.topicSubscribeInfoTable.get(topic);/*** 2.从broker拿到所有订阅了这个topic的clientId, 生产者和消费者在启动的时候生成的* 有定时任务每隔30s会把消费者订阅了哪些实例通过心跳机制告诉broker* 具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock*/List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);/*** 省略......* 找到271行*/if (mqSet != null && cidAll != null) {List mqAll = new ArrayList();mqAll.addAll(mqSet);// 对clientId和队列排序Collections.sort(mqAll);Collections.sort(cidAll);/*** 3.拿到分配策略, 有多种分配策略, 后面再分析如何分配的*/AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List allocateResult = null;try {/*** 通过分配策略计算出当前消费者应该消费哪些队列* 会保证一个队列只能被该消费组内的一个实例消费*/allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set allocateResultSet = new HashSet();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// 4.这里面会将分配到的队列封装成PullRequest放到PullMessageService的pullRequestQueue队列里面boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {/*** 省略......如果分配的结果跟上次不一样, 会做一些事情*/}}break;}default:break;}}
}

第一步:从主题订阅信息缓存表中获取主题的队列信息。
因为有定时任务每隔30s会从broker拉取最新的topic信息,
具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock
所以每个消费者都有每个topic最近的队列信息。

第二步:发送请求从broker中获取该消费组内当前所有的消费者clientId。
因为有定时任务每隔30s会把消费者订阅了哪些实例通过心跳机制告诉broker,
具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock
所以broker保存了所有消费该topic的所有的clientId。

第三步:首先对cidAll, mqAll排序,这个很重要,同一个消费组内看到的视图保持一致,
确保同一个消费队列不会被多个消费者分配。 RocketMQ消息队列分配算法接口。

第四步:队列负载策略。
RocketMQ默认提供5种分配算法。以AllocateMessageQueueAveragely为例:
如果现在有8个消息消费队列q1, q2, q3, q4, q5, q6, q7, q8,
有3个消费者c1, c2, c3,那么根据该负载算法,消息队列分配如下:

c1: q1, q2, q3
c2:q4, q5, q6
c3:q7, q8

第五步:调用RebalanceImpl#updateProcessQueueTableInRebalance方法

public abstract class RebalanceImpl { // 找到329行private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,final boolean isOrder) {boolean changed = false;/*** processQueueTable保存了每个队列对应的ProcessQueue* 从broker拉取回来的消息会先存放在ProcessQueue里面*/Iterator> it = this.processQueueTable.entrySet().iterator();/*** 省略......* 主要是对processQueueTable中的数据与mqSet做对比* 如果processQueueTable中的MessageQueue不在mqSet中, * 说明分配算法没有将这个队列再次分配给自己, 则不能消费这个队列了, 做一些处理*//*** 找到367行, 开始生成PullRequest*/List pullRequestList = new ArrayList();for (MessageQueue mq : mqSet) {/*** 如果 processQueueTable 中不存在 这个MessageQueue* 说明是第一次分配到这个队列, 则需要构造PullRequest*/if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}/*** 清掉这个队列的在本地内存的消费进度* RemoteBrokerOffsetStore.removeOffset(mq);)*/this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();long nextOffset = -1L;try {/*** 从broker获取这个队列的消费进度*/nextOffset = this.computePullFromWhereWithException(mq);} catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue;}if (nextOffset >= 0) {// 将MessageQueue对应的ProcessQueue保存到processQueueTable中ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}/*** 调用RebalancePushImpl#dispatchPullRequest方法* 将构造好的PullRequest放到PullMessageService的pullRequestQueue队列中*/this.dispatchPullRequest(pullRequestList);return changed;}
}

具体的做法是,先将分配到的消息队列集合mqSetprocessQueueTable做一个过滤比对。

上图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。
将这些队列设置Dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,
这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回true。
如果等待1s后,仍然拿不到当前消费处理队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;

上图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。
判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置Dropped属性为true,
并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry;

最后,为过滤后的消息队列集合(mqSet)中的每个MessageQueue创建一个ProcessQueue对象并存入
RebalanceImplprocessQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法
获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要创建的pullRequest对象属性中),
并创建拉取请求对象PullRequest添加到拉取列表pullRequestList中,最后执行dispatchPullRequest()方法,
将拉取消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,
待该服务线程取出后向Broker端发起Pull消息的请求。

相关内容

热门资讯

独家丨人形机器人最大融资背后,... 人形机器人,迎来一针暴力强心剂。投中网获悉,银河通用已完成新一轮融资,规模超过3亿美元(约合超21亿...
行业发展引擎切换,房企竞逐“好... 本文来源:时代周报 作者:唐洛当下,房地产市场的关键词已经从“高速发展”转向“高质量发展”,“好房子...
越来越多的中国人,为什么要戴呼... 前不久,“黄磊戴呼吸机睡觉”话题冲上热搜。黄磊戴呼吸机睡觉 来源:《向往的生活》原来,黄磊长期受打鼾...
芯联集成赵奇:构筑三条增长曲线... 中经记者 孙汝祥 夏欣 北京报道“感恩科创成长层,为‘硬科技’企业铺平创新发展之路。”芯联集成(68...
两位总裁?百济神州宣布全球研发... 百济神州高管迎来变动。12月18日晚间,百济神州有限公司(百济神州,ONC.US;6160.HK;6...
近3年七成主动权益基金正收益,... 文/每日财报 楚风临近年末,公募基金年度业绩排名即将出炉。今年来,A股市场走强,上证指数一度突破4...
锚定高质量发展 践行金融为民初... 12月18日,中银三星人寿凭借在业务发展、战略践行、社会责任等多维度的卓越表现,荣膺“2025金柿奖...
沐曦上市,葛卫东日赚近200亿... 据节点财经获悉,投资大佬葛卫东布局GPU企业沐曦股份,日赚近200亿元,而他投资的另一家企业五一视界...
中国这座城市楼市反攻!机构:明... 2025 年本港楼市成功 “反攻”,住宅物业交投节节上升,楼价亦企稳回升,四年来首见 “量额价” 齐...