RocketMQ-消费队列负载
创始人
2025-05-28 03:05:30
0

源码版本号:版本号:4.9.4

RocketMQ消息拉取由PullMessageServiceRebalanceService共同协作完成

PullMessageService负责消息拉取,每一次拉取消息都是从它的pullRequestQueue队列里面拉取任务,
而往这个队列放任务是由RebalanceService触发的。

MQClientInstance#start 方法中的
第242行会调用 RebalanceService#start 方法

// MQClientInstance 的第242行
this.rebalanceService.start();

RebalanceService继承了ServiceThreadServiceThread实现了Runnable接口,
RebalanceServiceMQClientInstance中的属性并跟随MQClientInstance启动。

查看它的run方法,默认情况每隔20秒执行一次

public class RebalanceService extends ServiceThread {// 找到36行@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);// 调用的是`MQClientInstance#doRebalance`方法this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
}

查看MQClientInstance#doRebalance方法

public class MQClientInstance {// 找到954行public void doRebalance() {// 遍历MQClientInstance实例中的消费者for (Map.Entry entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {// 调用DefaultMQPushConsumerImpl#doRebalance方法impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}}
}

查看DefaultMQPushConsumerImpl#doRebalance方法

public class DefaultMQPushConsumerImpl implements MQConsumerInner {// 找到1006行@Overridepublic void doRebalance() {if (!this.pause) {/*** 这里的rebalanceImpl是RebalancePushImpl, 继承了RebalanceImpl* 调用 RebalanceImpl#doRebalance方法*/this.rebalanceImpl.doRebalance(this.isConsumeOrderly());}}
}

这里的rebalanceImplRebalancePushImpl,继承了RebalanceImpl

查看RebalanceImpl#doRebalance方法

public abstract class RebalanceImpl {/*** 217行*/public void doRebalance(final boolean isOrder) {// 拿到消费者订阅的topic信息Map subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry entry : subTable.entrySet()) {final String topic = entry.getKey();try {// 通过topic进行负载this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}// 省略}
}

通过topic进行负载,分为集群模式与广播模式,广播模式比较简单,这里主要分析集群模式。

下面提到的定时任务都是MQClientInstance启动的时候调用MQClientInstance#startScheduledTask方法,
开启一堆的定时任务。

public abstract class RebalanceImpl {// 239行private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {// 省略......这里是广播模式break;}case CLUSTERING: {// 258行/*** 1.通过topic找到对应的队列信息* 有定时任务每隔30s会从broker拉取最新的topic信息* 具体代码在MQClientInstance#updateTopicRouteInfoFromNameServer()方法里面*/Set mqSet = this.topicSubscribeInfoTable.get(topic);/*** 2.从broker拿到所有订阅了这个topic的clientId, 生产者和消费者在启动的时候生成的* 有定时任务每隔30s会把消费者订阅了哪些实例通过心跳机制告诉broker* 具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock*/List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);/*** 省略......* 找到271行*/if (mqSet != null && cidAll != null) {List mqAll = new ArrayList();mqAll.addAll(mqSet);// 对clientId和队列排序Collections.sort(mqAll);Collections.sort(cidAll);/*** 3.拿到分配策略, 有多种分配策略, 后面再分析如何分配的*/AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List allocateResult = null;try {/*** 通过分配策略计算出当前消费者应该消费哪些队列* 会保证一个队列只能被该消费组内的一个实例消费*/allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set allocateResultSet = new HashSet();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// 4.这里面会将分配到的队列封装成PullRequest放到PullMessageService的pullRequestQueue队列里面boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {/*** 省略......如果分配的结果跟上次不一样, 会做一些事情*/}}break;}default:break;}}
}

第一步:从主题订阅信息缓存表中获取主题的队列信息。
因为有定时任务每隔30s会从broker拉取最新的topic信息,
具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock
所以每个消费者都有每个topic最近的队列信息。

第二步:发送请求从broker中获取该消费组内当前所有的消费者clientId。
因为有定时任务每隔30s会把消费者订阅了哪些实例通过心跳机制告诉broker,
具体代码在MQClientInstance.sendHeartbeatToAllBrokerWithLock
所以broker保存了所有消费该topic的所有的clientId。

第三步:首先对cidAll, mqAll排序,这个很重要,同一个消费组内看到的视图保持一致,
确保同一个消费队列不会被多个消费者分配。 RocketMQ消息队列分配算法接口。

第四步:队列负载策略。
RocketMQ默认提供5种分配算法。以AllocateMessageQueueAveragely为例:
如果现在有8个消息消费队列q1, q2, q3, q4, q5, q6, q7, q8,
有3个消费者c1, c2, c3,那么根据该负载算法,消息队列分配如下:

c1: q1, q2, q3
c2:q4, q5, q6
c3:q7, q8

第五步:调用RebalanceImpl#updateProcessQueueTableInRebalance方法

public abstract class RebalanceImpl { // 找到329行private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,final boolean isOrder) {boolean changed = false;/*** processQueueTable保存了每个队列对应的ProcessQueue* 从broker拉取回来的消息会先存放在ProcessQueue里面*/Iterator> it = this.processQueueTable.entrySet().iterator();/*** 省略......* 主要是对processQueueTable中的数据与mqSet做对比* 如果processQueueTable中的MessageQueue不在mqSet中, * 说明分配算法没有将这个队列再次分配给自己, 则不能消费这个队列了, 做一些处理*//*** 找到367行, 开始生成PullRequest*/List pullRequestList = new ArrayList();for (MessageQueue mq : mqSet) {/*** 如果 processQueueTable 中不存在 这个MessageQueue* 说明是第一次分配到这个队列, 则需要构造PullRequest*/if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}/*** 清掉这个队列的在本地内存的消费进度* RemoteBrokerOffsetStore.removeOffset(mq);)*/this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();long nextOffset = -1L;try {/*** 从broker获取这个队列的消费进度*/nextOffset = this.computePullFromWhereWithException(mq);} catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue;}if (nextOffset >= 0) {// 将MessageQueue对应的ProcessQueue保存到processQueueTable中ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}/*** 调用RebalancePushImpl#dispatchPullRequest方法* 将构造好的PullRequest放到PullMessageService的pullRequestQueue队列中*/this.dispatchPullRequest(pullRequestList);return changed;}
}

具体的做法是,先将分配到的消息队列集合mqSetprocessQueueTable做一个过滤比对。

上图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。
将这些队列设置Dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,
这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回true。
如果等待1s后,仍然拿不到当前消费处理队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;

上图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。
判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置Dropped属性为true,
并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry;

最后,为过滤后的消息队列集合(mqSet)中的每个MessageQueue创建一个ProcessQueue对象并存入
RebalanceImplprocessQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法
获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要创建的pullRequest对象属性中),
并创建拉取请求对象PullRequest添加到拉取列表pullRequestList中,最后执行dispatchPullRequest()方法,
将拉取消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,
待该服务线程取出后向Broker端发起Pull消息的请求。

相关内容

热门资讯

glibc: daemon 文章目录 参考说明需要注意的问题调用者直接退出exit_group(0)在daemon之前创建的子线...
Ubuntu系统搭建 一、创建环境常见问题1.1 windows11下打开虚拟机蓝屏问题参考这篇文章,控制面...
性能优化搞得好,Tomcat少... Tomcat基本使用 什么是Web服务器 web服务器的定义 其实并没有标准定义,...
2023还有人不知道kuber... 文章目录Kubernetes(K8s)一、Openstack&VM1、**认识虚拟化****1.1*...
笨鸟学数据结构(绪论) 数据结构的定义按某种逻辑关系组织起来的一批数据,按一定的映象方式把它存放在计算机的存储...
不使用cocoapods-ar... 不使用cocoapods-art插件情况下与jfrog协作原理下载索引创建git仓库或者更新git仓...
微机原理 || push p... 考试真的考了push和pop ,那个加减到底是什么? PUSH 源   ...
使用Spring Boot和C... 原理 Spring Boot是一个基于Spring框架的快速开发应用程序的框架,其提供...
python数据类型常见操作 目录 一、python常见的赋值方式 1.交互性赋值方式 2.连续性赋值方式 3.单独赋值方式 二...
系统架构:经典三层架构 引言 经典三层架构是分层架构中最原始最典型的分层模式,其他分层架构都是其变种或扩展&#...
c++ 流 stream Text Stream: 有解析(parse)和格式化&#...
【springboot】web... 5、视图解析与模板引擎 视图解析:SpringBoot默认不支持 JSP,...
【Java注释】如何自定义注解... 一,如何自定义注解 1.1 在编译时进行格式检查(JDK内置的三个基本注解) ...
Redis案例实战_微信抢红包 目录需求分析架构设计编码实现拓展 需求分析 首先想到发红包的流程 1.发红包 2.抢红包 3.记录红...
关于朋友的思考 关于朋友的思考 朋友就是你高兴时想见的人,烦恼时想找的人,得到对方帮助...
Cobalt Strike--... 获取凭证和哈希      要dump哈希,通过 [beacon] → Access →...
第二章 运算方法和运算器 引入:1. 运算器的运算功能 计算机能够进行的运算包括:算术运算和逻辑运...
DevData Talks 直... 📊本期分享 本期 DevData Talks 邀请到了微众银行研发效能负责人余伟老师...
postgresql基本操作与... postgresql基本操作与基本对象 postgresql是一个C/S架构的大型软件࿰...
【洛谷 P1028】[NOIP... [NOIP2001 普及组] 数的计算 题目描述 给出自然数 nnn,要求按如下方式构...