点击上方“中间件兴趣圈”,选择“设为星标”
做积极的人,越努力越幸运!
1、背景
当消息积压后,消费端将其代码进行优化后,重启消费端服务器,从rocketmq-console上发现TPS为0。如图所示。
乍一看,第一时间得出应用还未恢复,就开始去查看相关的启动日志,通常查看的是应用服务器的 /home/baseuser/logs/rockemqlogs/rocketmq_client.logs,碰巧又看到如下的错误日志:
RebalanceService - [BUG] ConsumerGroup: consumergroup_1 The consumerId: consumer-client-id-clusterA-192.168.x.x@21932 not in cidAll: [consumer-client-id-clusterA-192.168.x.x@22164]
上面的日志显示在队列负载时候,当前节点竟然不属于 consumergroup_1 消费组的活跃连接,导致一大片的报错:
2019-11-02 19:29:17 WARN NettyClientPublicExecutor_1 - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 25 DESC: the consumer's subscription not latest
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:639)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:156)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:592)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:275)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
乍一看确实是 rocketmq 相关的问题,导致上述 消费TPS 为0,经过半个小时的日志分析,发现这是RocketMQ 这是一种正常现象,最终会自动恢复,这里我留一个伏笔,将在我的知识星球中与广大星友讨论,经过日志分析得出 rocketmq 没问题,故后面去查看消息积压,发现消息积压明显在减少,那这就奇了怪了,咋消息积压在快速减少,但为啥消费TPS还是为0呢?
接下来将该问题进行探讨。
温馨提示:在问题分析部分,作者没有直接给出答案,而是一步一步探寻答案,因此会通过追踪源码来寻求答案,如果大家想急于答案,可以跳过问题分析,直接查看本文末尾的问题解答部分。 通过本文的阅读,您将获得如下信息: 1、RocketMQ 消费TPS的收集与计算逻辑。 2、RocketMQ 监控指标的设计思路。 3、RocketMQ 主从同步,消费者从主服务器拉取还是从从服务器拉取的判断逻辑。
2、问题分析
2.1 rocketmq-console 数据获获取逻辑探讨
要解开消费TPS 显示为0的问题,我们首先要来看一下 rocketmq-console 这个页面的展示逻辑,即通过阅读 rocketmq-console的源码来解开其采集逻辑。
得知,【消费者】界面查询各个消费组的基本信息的接口为 /consumer/groupList.query,那接下来,我们首先从源码的角度来分析该接口的实现逻辑。其入口如下:
org.apache.rocketmq.console.controller.ConsumerController#list
@RequestMapping(value = "/groupList.query")
@ResponseBody
public Object list() {
return consumerService.queryGroupList();
}
就是调用消费服务处理类的 queryGroupList 方法,其实现代码如下:
ConsumerServiceImpl#queryGroupList
public ListGroupConsumeInfo queryGroupList() {
SetString consumerGroupSet = Sets.newHashSet();
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); // @1
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { // @2
SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); // @3
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
}
} catch (Exception err) {
throw Throwables.propagate(err);
}
ListGroupConsumeInfo groupConsumeInfoList = Lists.newArrayList();
for (String consumerGroup : consumerGroupSet) { // @4
groupConsumeInfoList.add(queryGroup(consumerGroup));
}
Collections.sort(groupConsumeInfoList);
return groupConsumeInfoList;
}
代码@1:获取集群的 broker 信息,主要是通过向 NameServer 发送 GET_BROKER_CLUSTER_INFO 请求,NameServer 返回集群包含的所有 broker 信息,包含从节点的信息,返回的格式如下:
"clusterInfo": {
"brokerAddrTable": {
"broker-a": {
"cluster": "DefaultCluster",
"brokerName": "broker-a",
"brokerAddrs": {
"0": "192.168.0.168:10911",
"1": "192.168.0.169:10911"
}
},
"broker-b": {
"cluster": "DefaultCluster",
"brokerName": "broker-b",
"brokerAddrs": {
"0": "192.168.0.170:10911",
"1": "192.168.1.171:10911"
}
}
},
"clusterAddrTable": {
"DefaultCluster": ["broker-a","broker-b"]
}
}
代码@2:遍历集群中的 brokerAddrTable 数据结构,即存储了 broker 的地址信息的 Map 。
代码@3:分别向集群中的主节点(brokerData.selectBrokerAddr()) 获取所有的订阅关系(即消费组的订阅信息)。然后将所有的消费者组名称存入 consumerGroupSet。
代码@4:遍历代码@3收集到的消费组,调用 queryGroup 依次请求消费组的运行时信息,后面接下来详细分析。
接下来将重点分析 queryGroup方法的实现细节。
ConsumerServiceImpl#queryGroup
public GroupConsumeInfo queryGroup(String consumerGroup) {
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
try {
ConsumeStats consumeStats = null;
try {
consumeStats = mqAdminExt.examineConsumeStats(consumerGroup); // @1
} catch (Exception e) {
logger.warn("examineConsumeStats exception, " + consumerGroup, e);
}
ConsumerConnection consumerConnection = null;
try {
consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
} catch (Exception e) {
logger.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e);
}
groupConsumeInfo.setGroup(consumerGroup);
if (consumeStats != null) {
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); // @2
groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); // @3
}
if (consumerConnection != null) {
groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
}
} catch (Exception e) {
logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, "
+ consumerGroup, e);
}
return groupConsumeInfo;
}
从上面@1,@2,@3这三处代码可以得知,rocketmq-console 相关界面上的消费TPS主要来自 examineConsumeStats 方法,该方法我就不再继续深入,我们只需找到该方法向 broker 发送的请求编码,然后根据该请求编码找到 broker 的处理逻辑即可,最后跟踪发送的请求编为:RequestCode.GET_CONSUME_STATS。
GET_CONSUME_STATS 命令在 broker 的处理逻辑如下:
AdminBrokerProcessor#getConsumeStats
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumeStatsRequestHeader requestHeader =
(GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
ConsumeStats consumeStats = new ConsumeStats();
SetString topics = new HashSetString();
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
} else {
topics.add(requestHeader.getTopic());
}
for (String topic : topics) { // @1
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) { // @2
log.warn("consumeStats, topic config not exist, {}", topic);
continue;
}
{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); // @3
if (null == findSubscriptionData //
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) 0) {
log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
continue;
}
}
for (int i = 0; i topicConfig.getReadQueueNums(); i++) { // @4
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setQueueId(i);
OffsetWrapper offsetWrapper = new OffsetWrapper();
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset 0)
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
requestHeader.getConsumerGroup(), //
topic, //
i);
if (consumerOffset 0)
consumerOffset = 0;
offsetWrapper.setBrokerOffset(brokerOffset); // @5
offsetWrapper.setConsumerOffset(consumerOffset); // @6
long timeOffset = consumerOffset - 1;
if (timeOffset = 0) {
long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
if (lastTimestamp 0) {
offsetWrapper.setLastTimestamp(lastTimestamp); // @7
}
}
consumeStats.getOffsetTable().put(mq, offsetWrapper); // @8
}
double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic); // @9
consumeTps += consumeStats.getConsumeTps(); // @10
consumeStats.setConsumeTps(consumeTps);
}
byte[] body = consumeStats.encode();
response.setBody(body);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
该方法比较长,重点关注如下关键点。
代码@1:遍历该消费组订阅的所有主题。消费TPS将是所有主题消费TPS的总和,其他的信息按主题、队列信息单独存放。
代码@2:如果 topic 的元信息不存在,则跳过该主题。
代码@3:如果消费组的订阅信息不存在,则跳过该订阅关系。
代码@4:收集该主题所有的读队列,以messagequeue为键,OffsetWrapper为值存储在 consumeStats.getOffsetTable() ,见代码@8。
代码@5:设置该队列的最新偏移量。
代码@6:设置该消费组对该队列的消费进度,设置为consumeOffset。
代码@7:lastTimestamp 上一次消费的消息的存储时间,实现逻辑为:取消费组对于队列的消息消费进度 -1 的消息,存储在 broker 的时间,如果对应的消息已过期被删除,则在界面上显示的时间就会为1970-01-01 08:00:00。
代码@9:通过 BrokerStatsManager 的 tpsGroupGetNums 方法从统计数据中获取该消费组针对该队列的消费TPS。
代码@10:累积消费TPS,并最终作为该消费组的总TPS。
上面这个方法非常关键,是返回给前段页面核心的数据组装逻辑,以队列、消费组为纬度给出 brokerOffset、consumeOffset、lastTimestamp。然后将数据返回给前段页面进行展示。
接下将聚焦到消费组消费TPS的统计处理,其入口为 tpsGroupGetNums 。
2.2 rocketmq 消费TPS统计实现原理
2.2.1 消费TPS计算逻辑
首先我们还是从 tpsGroupGetNums 方法入手,探究一下 tps 的获取逻辑,然后再探究数据的采集原理(这也是 rocketmq 监控相关)。
BrokerStatsManager#tpsGroupGetNums
public double tpsGroupGetNums(final String group, final String topic) {
final String statsKey = buildStatsKey(topic, group); // @1
return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); // @2
}
代码@1:构建统计key,其逻辑为:其键为:topic@consumerGroup,即消息主题@消费组名。
要读懂 代码@2 的代码,先来看一下 rocketmq 监控指标的存储数据结构,如下图所示:
正如上图所示:RocketMQ 使用 HashMap String, StatusItemSet 来存储监控收集的数据,其中Key 为监控指标的类型,例如 topic 发送消息数量、topic 发送消息大小、消费组获取消息个数等信息,每一项使用 StatsItemSet 存储,该存储结构内部又维护一个HashMap:ConcurrentMap,key 代表某一个具体的统计目标,例如记录消费组拉取消息的数量监控指标,那其统计的对象即 topic@consumer_group,最终数据的载体是 StatsItem,使用如下几个关键字段来记录统计信息:
AtomicLong value = new AtomicLong(0) 总数量,统计指标TOPIC_GET_NUMS 指标为例,记录的是消息拉取的总条数,例如一次消息拉取操作获取了32条消息,则该数量增加32。
AtomicLong times = new AtomicLong(0) 改变上述 value 的次数,还是以统计指标TOPIC_GET_NUMS 指标为例,记录的是增加 value 的次数。
LinkedList CallSnapshot csListMinute 一分钟的快照信息,该 List 只会存储6个元素,每10s记录一次调用快照,超过6条,则移除第一条,这个将在下文介绍。
LinkedList CallSnapshot csListHour 一小时的快照信息,该 List 只会存储6个元素,每10分钟记录一次快照,超过6条,则移除第一条。
LinkedList CallSnapshot csListDay 一天的快照新,该List 只会存储24个元素,每1小时记录一次快照,超过24条,则移除第一条。
了解了上述存储结构后,代码@2,最终其实调用的就是 StatsItemSet 的 getStatsDataInMinute 方法。
StatsItemSet#getStatsDataInMinute
public StatsSnapshot getStatsDataInMinute(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null != statsItem) {
return statsItem.getStatsDataInMinute();
}
return new StatsSnapshot();
}
从代码上最终调用 StatesItem 的 getStatsDataInMinute 方法。
StatesItem#getStatsDataInMinute
public StatsSnapshot getStatsDataInMinute() {
return computeStatsData(this.csListMinute);
}
private static StatsSnapshot computeStatsData(final LinkedListCallSnapshot csList) {
StatsSnapshot statsSnapshot = new StatsSnapshot();
synchronized (csList) {
double tps = 0;
double avgpt = 0;
long sum = 0;
if (!csList.isEmpty()) {
CallSnapshot first = csList.getFirst(); // @1
CallSnapshot last = csList.getLast(); // @2
sum = last.getValue() - first.getValue(); // @3
tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp()); // @4
long timesDiff = last.getTimes() - first.getTimes();
if (timesDiff 0) { // @5
avgpt = (sum * 1.0d) / timesDiff;
}
}
statsSnapshot.setSum(sum);
statsSnapshot.setTps(tps);
statsSnapshot.setAvgpt(avgpt);
}
return statsSnapshot;
}
代码@1:首先取快照中的第一条消息。
代码@2:取快照列表中的最后一条消息。
代码@3:计算这两个时间点 value 的差值,即这段时间内新增的总数。
代码@4:计算这段时间内的tps,即每秒处理的消息条数。
代码@5:计算 avgpt ,即平均一次操作新增的消息条数(即平均一次操作,value 新增的个数)。
消费组的消费TPS的计算逻辑就介绍到这里了,那还有一个疑问,即 StatsItem 中 csListMinute 中的数据从哪来呢?
2.2.2 如何采集消费TPS原始数据
StatsItem#init
public void init() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
samplingInSeconds();
} catch (Throwable ignored) {
}
}
}, 0, 10, TimeUnit.SECONDS);
// 省略其他代码
}
原来在创建一个新的StatsItem 的时候,就会启动一个定时任务,每隔 10s 调用 samplingInSeconds 方法进行抽样,那我们简单看一下这个方法:
StatsItem#samplingInSeconds
public void samplingInSeconds() {
synchronized (this.csListMinute) {
this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
.get()));
if (this.csListMinute.size() 7) {
this.csListMinute.removeFirst();
}
}
}
就是将当前StatsItem 中的 value 与 变更次数(time ) 存入封装成 CallSnapshot ,然后存储在快照列表中。这里的关键是times values 这些值在什么情况下会改变呢?
接着往下看,源码在消息拉取的时候,会将本次拉取的信息加入到统计信息中,其入口为:
PullMessageProcessor#processRequest
switch (response.getCode()) {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
// 省略其他代码
}
该方法会最终更新 StatsItem 中的 values ,而 times 是 每调用一次,加1。
理论基础讲解完毕后,接下来我们来回答一下题目中的现象。
3、问题解答
按照上面的讲解,通过 rocketmq-console 发起查看消费组的TPS时,Broker 会根据过去一分钟内采集的快照数据进行计算。快照信息的采集机制是 broker 端会每10s 会记录一下消费组对应的拉取消息数量与拉取次数。
那既然消息延迟(堆积数量在不断减少),说明消费端正在消费,按道理来说,通过上述机制进行计算,TPS 不可能会是0?那又是什么原因呢?
如果TPS为0,可以说明消费端并没有向 broker 拉取消息,因为一旦从 broker 拉取消息,有关 StatsItem 的 拉取消息总数(value) 与 拉取次数(times) 再两次采集国产中肯定不会相等,只要两者有差距,其TPS就不可能为0,那消费组在消费消息,但又不从主节点上拉取消息,这种情况会出现吗?
答案是会的,在 RocketMQ 主从同步架构中,如果需要访问的消息偏移量与当前 commitlog 最大偏移的之间的差距超过了内存的40%,消息消费将由从节点接管,故此时消费的拉取不会去主节点拉取,故上面返回的TPS就会为0。这样就能完美解答了。
经过上面的分析,我相信大家已经非常认可这个原因了,其实我们还有一个重要的论据,大家可以分别去查看 Rocketmq 主从节点 /home/{username}/logs/rocketmqlogs/stats.log,里面会每隔1分钟在日志中打印各个消费组的消费TPS.
从服务器(rocketmq-slave)对应的日志如下:
INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 785717 TPS: 15714.34 AVGPT: 8.14
INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 940522 TPS: 15675.37 AVGPT: 8.06
主服务器(rocketmq-master)对应的日志如下:
INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 0 TPS: 0.00 AVGPT: 0.00
INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 0 TPS: 0.00 AVGPT: 0.00
主服务器上的TPS一定会0吗?不一定,其实也不一定。这里借着这波日志,再来总结一下 RocketMQ 主从同步时的切换逻辑。
1、如果消费端请求的消息物理偏移量与 broker 当前最新的物理偏移量之间的差距查过内存的40%,下一次拉取会往从节点发送(当然前提是slaveReadEnable = true)。
2、当从节点开始接管消息消费时,下一次拉取请求一定会往从节点发送码?答案也是不一定:
关于 RocketMQ 主从同步若干问题答疑,可以参考笔者的另外一篇文章: 。
欢迎加入我的知识星球,探讨 java 高并发、分布式服务架构,一起交流源码,长按如下二维码:
如果喜欢您喜欢这篇文章,点赞 转发是对我最大的鼓励,越努力越幸运,一起加油。
原文始发于微信公众号(中间件兴趣圈):