踩坑记——rocketmq-console 消费TPS为0,但消息积压数却在降低是个什么“鬼”

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 踩坑记——rocketmq-console 消费TPS为0,但消息积压数却在降低是个什么“鬼”

点击上方“中间件兴趣圈”,选择“设为星标”

做积极的人,越努力越幸运!

1、背景

当消息积压后,消费端将其代码进行优化后,重启消费端服务器,从rocketmq-console上发现TPS为0。如图所示。

乍一看,第一时间得出应用还未恢复,就开始去查看相关的启动日志,通常查看的是应用服务器的 /home/baseuser/logs/rockemqlogs/rocketmq_client.logs,碰巧又看到如下的错误日志:


RebalanceService - [BUG] ConsumerGroup: consumergroup_1 The consumerId: consumer-client-id-clusterA-192.168.x.x@21932 not in cidAll: [consumer-client-id-clusterA-192.168.x.x@22164]

上面的日志显示在队列负载时候,当前节点竟然不属于 consumergroup_1 消费组的活跃连接,导致一大片的报错:


2019-11-02 19:29:17 WARN NettyClientPublicExecutor_1 - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 25  DESC: the consumer's subscription not latest
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:639)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:156)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:592)
    at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51)
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:275)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

乍一看确实是 rocketmq 相关的问题,导致上述 消费TPS 为0,经过半个小时的日志分析,发现这是RocketMQ 这是一种正常现象,最终会自动恢复,这里我留一个伏笔,将在我的知识星球中与广大星友讨论,经过日志分析得出 rocketmq 没问题,故后面去查看消息积压,发现消息积压明显在减少,那这就奇了怪了,咋消息积压在快速减少,但为啥消费TPS还是为0呢?

接下来将该问题进行探讨。

温馨提示:在问题分析部分,作者没有直接给出答案,而是一步一步探寻答案,因此会通过追踪源码来寻求答案,如果大家想急于答案,可以跳过问题分析,直接查看本文末尾的问题解答部分。 通过本文的阅读,您将获得如下信息: 1、RocketMQ 消费TPS的收集与计算逻辑。 2、RocketMQ 监控指标的设计思路。 3、RocketMQ 主从同步,消费者从主服务器拉取还是从从服务器拉取的判断逻辑。

2、问题分析

2.1 rocketmq-console 数据获获取逻辑探讨

要解开消费TPS 显示为0的问题,我们首先要来看一下 rocketmq-console 这个页面的展示逻辑,即通过阅读 rocketmq-console的源码来解开其采集逻辑。

得知,【消费者】界面查询各个消费组的基本信息的接口为 /consumer/groupList.query,那接下来,我们首先从源码的角度来分析该接口的实现逻辑。其入口如下:


org.apache.rocketmq.console.controller.ConsumerController#list
@RequestMapping(value = "/groupList.query")
@ResponseBody
public Object list() {
    return consumerService.queryGroupList();
}
就是调用消费服务处理类的 queryGroupList 方法,其实现代码如下:
ConsumerServiceImpl#queryGroupList
public ListGroupConsumeInfo queryGroupList() {
    SetString consumerGroupSet = Sets.newHashSet();
    try {
        ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();  // @1
        for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {   // @2
            SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);  // @3
            consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());                                                                 
        }
    } catch (Exception err) {
        throw Throwables.propagate(err);
    }
    ListGroupConsumeInfo groupConsumeInfoList = Lists.newArrayList();
    for (String consumerGroup : consumerGroupSet) {                                                // @4
        groupConsumeInfoList.add(queryGroup(consumerGroup));                              
    }
    Collections.sort(groupConsumeInfoList);
    return groupConsumeInfoList;
}

代码@1:获取集群的 broker 信息,主要是通过向 NameServer 发送 GET_BROKER_CLUSTER_INFO 请求,NameServer 返回集群包含的所有 broker 信息,包含从节点的信息,返回的格式如下:


"clusterInfo": {
    "brokerAddrTable": {
       "broker-a": {
           "cluster": "DefaultCluster",
            "brokerName": "broker-a",
            "brokerAddrs": {
                "0": "192.168.0.168:10911",
                "1": "192.168.0.169:10911"
            }
        },
        "broker-b": {
           "cluster": "DefaultCluster",
            "brokerName": "broker-b",
            "brokerAddrs": {
                "0": "192.168.0.170:10911",
                "1": "192.168.1.171:10911"
            }
        }
    },
    "clusterAddrTable": {
        "DefaultCluster": ["broker-a","broker-b"]
    }
}

代码@2:遍历集群中的 brokerAddrTable 数据结构,即存储了 broker 的地址信息的 Map 。

代码@3:分别向集群中的主节点(brokerData.selectBrokerAddr()) 获取所有的订阅关系(即消费组的订阅信息)。然后将所有的消费者组名称存入 consumerGroupSet。

代码@4:遍历代码@3收集到的消费组,调用 queryGroup 依次请求消费组的运行时信息,后面接下来详细分析。

接下来将重点分析 queryGroup方法的实现细节。

ConsumerServiceImpl#queryGroup


public GroupConsumeInfo queryGroup(String consumerGroup) {
    GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
    try {
        ConsumeStats consumeStats = null;
        try {
            consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);  // @1
        } catch (Exception e) {
            logger.warn("examineConsumeStats exception, " + consumerGroup, e);
        }
        ConsumerConnection consumerConnection = null;
        try {
            consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); 
        } catch (Exception e) {
            logger.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e);
        }
        groupConsumeInfo.setGroup(consumerGroup);

        if (consumeStats != null) {
            groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());    // @2
            groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());                   // @3
        }

        if (consumerConnection != null) {
            groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
            groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
            groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
            groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
        }
    } catch (Exception e) {
        logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, "
                + consumerGroup, e);
    }
    return groupConsumeInfo;
}

从上面@1,@2,@3这三处代码可以得知,rocketmq-console 相关界面上的消费TPS主要来自 examineConsumeStats 方法,该方法我就不再继续深入,我们只需找到该方法向 broker 发送的请求编码,然后根据该请求编码找到 broker 的处理逻辑即可,最后跟踪发送的请求编为:RequestCode.GET_CONSUME_STATS。

GET_CONSUME_STATS 命令在 broker 的处理逻辑如下:

AdminBrokerProcessor#getConsumeStats


private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetConsumeStatsRequestHeader requestHeader =
            (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
        ConsumeStats consumeStats = new ConsumeStats();
        SetString topics = new HashSetString();
        if (UtilAll.isBlank(requestHeader.getTopic())) {
            topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
        } else {
            topics.add(requestHeader.getTopic());
        }
        for (String topic : topics) {   // @1
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
            if (null == topicConfig) {  // @2
                log.warn("consumeStats, topic config not exist, {}", topic);
                continue;
            }
            {                                
                SubscriptionData findSubscriptionData =
                    this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);   // @3
                if (null == findSubscriptionData //
                    && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup())  0) {
                    log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
                    continue;
                }
            }
            for (int i = 0; i  topicConfig.getReadQueueNums(); i++) {   // @4
                MessageQueue mq = new MessageQueue();
                mq.setTopic(topic);
                mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                mq.setQueueId(i);
                OffsetWrapper offsetWrapper = new OffsetWrapper();
                long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
                if (brokerOffset  0)
                    brokerOffset = 0;
                long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
                    requestHeader.getConsumerGroup(), //
                    topic, //
                    i);
                if (consumerOffset  0)
                    consumerOffset = 0;
                offsetWrapper.setBrokerOffset(brokerOffset);                                   // @5
                offsetWrapper.setConsumerOffset(consumerOffset);                       // @6
                long timeOffset = consumerOffset - 1;
                if (timeOffset = 0) {
                    long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
                    if (lastTimestamp  0) {
                        offsetWrapper.setLastTimestamp(lastTimestamp);                 // @7
                    }
                }
                consumeStats.getOffsetTable().put(mq, offsetWrapper);     // @8
            }
            double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic); // @9
            consumeTps += consumeStats.getConsumeTps(); // @10
            consumeStats.setConsumeTps(consumeTps);
        }
        byte[] body = consumeStats.encode();
        response.setBody(body);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
}

该方法比较长,重点关注如下关键点。

代码@1:遍历该消费组订阅的所有主题。消费TPS将是所有主题消费TPS的总和,其他的信息按主题、队列信息单独存放。

代码@2:如果 topic 的元信息不存在,则跳过该主题。

代码@3:如果消费组的订阅信息不存在,则跳过该订阅关系。

代码@4:收集该主题所有的读队列,以messagequeue为键,OffsetWrapper为值存储在 consumeStats.getOffsetTable() ,见代码@8。

代码@5:设置该队列的最新偏移量。

代码@6:设置该消费组对该队列的消费进度,设置为consumeOffset。

代码@7:lastTimestamp 上一次消费的消息的存储时间,实现逻辑为:取消费组对于队列的消息消费进度 -1 的消息,存储在 broker 的时间,如果对应的消息已过期被删除,则在界面上显示的时间就会为1970-01-01 08:00:00。

代码@9:通过 BrokerStatsManager 的 tpsGroupGetNums 方法从统计数据中获取该消费组针对该队列的消费TPS。

代码@10:累积消费TPS,并最终作为该消费组的总TPS。

上面这个方法非常关键,是返回给前段页面核心的数据组装逻辑,以队列、消费组为纬度给出 brokerOffset、consumeOffset、lastTimestamp。然后将数据返回给前段页面进行展示。

接下将聚焦到消费组消费TPS的统计处理,其入口为 tpsGroupGetNums

2.2 rocketmq 消费TPS统计实现原理

2.2.1 消费TPS计算逻辑

首先我们还是从 tpsGroupGetNums 方法入手,探究一下 tps 的获取逻辑,然后再探究数据的采集原理(这也是 rocketmq 监控相关)。

BrokerStatsManager#tpsGroupGetNums


public double tpsGroupGetNums(final String group, final String topic) {
    final String statsKey = buildStatsKey(topic, group); // @1
    return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); // @2
}

代码@1:构建统计key,其逻辑为:其键为:topic@consumerGroup,即消息主题@消费组名。

要读懂 代码@2 的代码,先来看一下 rocketmq 监控指标的存储数据结构,如下图所示:

正如上图所示:RocketMQ  使用 HashMap String, StatusItemSet 来存储监控收集的数据,其中Key 为监控指标的类型,例如 topic 发送消息数量、topic 发送消息大小、消费组获取消息个数等信息,每一项使用 StatsItemSet 存储,该存储结构内部又维护一个HashMap:ConcurrentMap,key 代表某一个具体的统计目标,例如记录消费组拉取消息的数量监控指标,那其统计的对象即 topic@consumer_group,最终数据的载体是 StatsItem,使用如下几个关键字段来记录统计信息:

  • AtomicLong value = new AtomicLong(0) 总数量,统计指标TOPIC_GET_NUMS 指标为例,记录的是消息拉取的总条数,例如一次消息拉取操作获取了32条消息,则该数量增加32。

  • AtomicLong times = new AtomicLong(0) 改变上述 value 的次数,还是以统计指标TOPIC_GET_NUMS 指标为例,记录的是增加 value 的次数。

  • LinkedList CallSnapshot csListMinute 一分钟的快照信息,该 List 只会存储6个元素,每10s记录一次调用快照,超过6条,则移除第一条,这个将在下文介绍。

  • LinkedList CallSnapshot csListHour 一小时的快照信息,该 List 只会存储6个元素,每10分钟记录一次快照,超过6条,则移除第一条。

  • LinkedList CallSnapshot csListDay 一天的快照新,该List 只会存储24个元素,每1小时记录一次快照,超过24条,则移除第一条。

  • 了解了上述存储结构后,代码@2,最终其实调用的就是 StatsItemSet 的 getStatsDataInMinute 方法。

    StatsItemSet#getStatsDataInMinute

    
    public StatsSnapshot getStatsDataInMinute(final String statsKey) {
        StatsItem statsItem = this.statsItemTable.get(statsKey);
        if (null != statsItem) {
            return statsItem.getStatsDataInMinute();
        }
        return new StatsSnapshot();
    }
    

    从代码上最终调用 StatesItem 的 getStatsDataInMinute 方法。

    StatesItem#getStatsDataInMinute

    
    public StatsSnapshot getStatsDataInMinute() {
        return computeStatsData(this.csListMinute);
    }
    private static StatsSnapshot computeStatsData(final LinkedListCallSnapshot csList) {
        StatsSnapshot statsSnapshot = new StatsSnapshot();
        synchronized (csList) {
            double tps = 0;
            double avgpt = 0;
            long sum = 0;
            if (!csList.isEmpty()) {
                CallSnapshot first = csList.getFirst();   // @1
                CallSnapshot last = csList.getLast();    // @2
                sum = last.getValue() - first.getValue();  // @3
                tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());   // @4
                long timesDiff = last.getTimes() - first.getTimes();
                if (timesDiff  0) {                                                                                   // @5
                    avgpt = (sum * 1.0d) / timesDiff;
                }
            }
    
            statsSnapshot.setSum(sum);
            statsSnapshot.setTps(tps);
            statsSnapshot.setAvgpt(avgpt);                                                          
        }
        return statsSnapshot;
    }
    

    代码@1:首先取快照中的第一条消息。

    代码@2:取快照列表中的最后一条消息。

    代码@3:计算这两个时间点 value 的差值,即这段时间内新增的总数。

    代码@4:计算这段时间内的tps,即每秒处理的消息条数。

    代码@5:计算 avgpt ,即平均一次操作新增的消息条数(即平均一次操作,value 新增的个数)。

    消费组的消费TPS的计算逻辑就介绍到这里了,那还有一个疑问,即 StatsItem 中 csListMinute 中的数据从哪来呢?

    2.2.2 如何采集消费TPS原始数据

    StatsItem#init

    
    public void init() {
      this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        samplingInSeconds();
                    } catch (Throwable ignored) {
                    }
                }
            }, 0, 10, TimeUnit.SECONDS);
       // 省略其他代码
    }
    

    原来在创建一个新的StatsItem 的时候,就会启动一个定时任务,每隔 10s 调用 samplingInSeconds 方法进行抽样,那我们简单看一下这个方法:

    StatsItem#samplingInSeconds

    
    public void samplingInSeconds() {
        synchronized (this.csListMinute) {
            this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
                    .get()));
            if (this.csListMinute.size()  7) {
                this.csListMinute.removeFirst();
            }
        }
    }
    

    就是将当前StatsItem 中的 value 与 变更次数(time ) 存入封装成 CallSnapshot ,然后存储在快照列表中。这里的关键是times values 这些值在什么情况下会改变呢?

    接着往下看,源码在消息拉取的时候,会将本次拉取的信息加入到统计信息中,其入口为:

    PullMessageProcessor#processRequest

    
    switch (response.getCode()) {
        case ResponseCode.SUCCESS:
            this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                            getMessageResult.getMessageCount());
            this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                            getMessageResult.getBufferTotalSize());
            this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
    
        // 省略其他代码
    }
    

    该方法会最终更新 StatsItem 中的 values ,而 times 是 每调用一次,加1。

    理论基础讲解完毕后,接下来我们来回答一下题目中的现象。

    3、问题解答

    按照上面的讲解,通过 rocketmq-console 发起查看消费组的TPS时,Broker 会根据过去一分钟内采集的快照数据进行计算。快照信息的采集机制是 broker 端会每10s 会记录一下消费组对应的拉取消息数量与拉取次数。

    那既然消息延迟(堆积数量在不断减少),说明消费端正在消费,按道理来说,通过上述机制进行计算,TPS 不可能会是0?那又是什么原因呢?

    如果TPS为0,可以说明消费端并没有向 broker 拉取消息,因为一旦从 broker 拉取消息,有关 StatsItem 的 拉取消息总数(value) 与 拉取次数(times) 再两次采集国产中肯定不会相等,只要两者有差距,其TPS就不可能为0,那消费组在消费消息,但又不从主节点上拉取消息,这种情况会出现吗?

    答案是会的,在 RocketMQ 主从同步架构中,如果需要访问的消息偏移量与当前 commitlog 最大偏移的之间的差距超过了内存的40%,消息消费将由从节点接管,故此时消费的拉取不会去主节点拉取,故上面返回的TPS就会为0。这样就能完美解答了。

    经过上面的分析,我相信大家已经非常认可这个原因了,其实我们还有一个重要的论据,大家可以分别去查看 Rocketmq 主从节点 /home/{username}/logs/rocketmqlogs/stats.log,里面会每隔1分钟在日志中打印各个消费组的消费TPS.

    从服务器(rocketmq-slave)对应的日志如下:

    
    INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 785717 TPS: 15714.34 AVGPT: 8.14
    INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 940522 TPS: 15675.37 AVGPT: 8.06
    

    主服务器(rocketmq-master)对应的日志如下:

    
    INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 0 TPS: 0.00 AVGPT: 0.00
    INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 0 TPS: 0.00 AVGPT: 0.00
    

    主服务器上的TPS一定会0吗?不一定,其实也不一定。这里借着这波日志,再来总结一下 RocketMQ 主从同步时的切换逻辑。

    1、如果消费端请求的消息物理偏移量与 broker 当前最新的物理偏移量之间的差距查过内存的40%,下一次拉取会往从节点发送(当然前提是slaveReadEnable = true)。

    2、当从节点开始接管消息消费时,下一次拉取请求一定会往从节点发送码?答案也是不一定:

  • 如果待拉取的消息偏移量与从节点最新的物理偏移量之间的差距超过内存的30%,下一次拉取请求还是会发往从节点。
  • 如果待拉取的消息偏移量与从节点最新的物理偏移量之际的差距少于内存的30%,下一次拉取请求将发送到主节点。
  • 关于 RocketMQ 主从同步若干问题答疑,可以参考笔者的另外一篇文章: 。

    欢迎加入我的知识星球,探讨 java 高并发、分布式服务架构,一起交流源码,长按如下二维码:

    踩坑记:rocketmq-console 消费TPS为0,但消息积压数却在降低是个什么“鬼” 踩坑记:rocketmq-console 消费TPS为0,但消息积压数却在降低是个什么“鬼” 踩坑记:rocketmq-console 消费TPS为0,但消息积压数却在降低是个什么“鬼”

    如果喜欢您喜欢这篇文章,点赞 转发是对我最大的鼓励,越努力越幸运,一起加油。

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 踩坑记——rocketmq-console 消费TPS为0,但消息积压数却在降低是个什么“鬼”


     上一篇
    源码分析Kafka 消息拉取流程(文末两张流程图) 源码分析Kafka 消息拉取流程(文末两张流程图)
    点击上方“中间件兴趣圈”,选择“设为星标” 做积极的人,越努力越幸运! 本节重点讨论 Kafka 的消息拉起流程。 温馨提示:本文源码分析部分比较长,基本点出了Kafka消息拉取相关的核心要点,如果对源码不感兴趣的话,可以直接跳到文末的
    2021-04-05
    下一篇 
    初始 Kafka Consumer 消费者 初始 Kafka Consumer 消费者
    温馨提示:整个 Kafka 专栏基于 kafka-2.2.1 版本。 1、KafkaConsumer 概述根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征: 在 Kafka 中 KafkaC
    2021-04-05