RocketMQ系列之消息重试+重复消费

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> RocketMQ系列之消息重试+重复消费

前言

上节我们介绍了RMQ的几大模块以及每个模块的作用,前面也介绍了RMQ的订阅消费案例,今天我们来看一个RMQ的消息重试机制和重复消费的问题,了解这2点有助于我们更好更合理的处理消息消费的异常情况。

RocketMQ系列之消息重试+重复消费

为什么会出现消息重试?

因为RMQ的消息都是通过网络传输的,通过网络传输就难免会受网络环境的影响,各种可能存在的情况,可能导致生产者Producer发送消息失败,也可能导致消费者Consumer消费消息失败,因此RMQ的消息重试机制就显得比较重要了,这也是RMQ的一大优势所在,显然消息重试机制分2种。

RocketMQ系列之消息重试+重复消费

生产者Producer端重试

生产端发送消息失败就是指,Producer向MQ发送消息的时候没有发送成功,导致的原因可能有网络传输失败等,下面我们就来看看生产端是怎么处理消息发送失败的:

  • 配置生产者的重试次数:消息重试发送的次数限制
  • 配置生产者发送消息时的超时等待:在指定时间内如果消息没有成功发送到MQ就尝试重新发送

  • 配置生产者发送消息时的超时等待:在指定时间内如果消息没有成功发送到MQ就尝试重新发送

    
    # 如果消息在1秒之内没有发送成功就重试 重试次数上限为5
    producer.setRetryTimesWhenSendFailed(5);
    SendResult sendResult = producer.send(msg,1000);
    

    Producer端代码如下:

    
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            // 声明一个生产者,需要一个自定义生产者组(后面我们会介绍这个组的概念和作用)
            DefaultMQProducer producer = new DefaultMQProducer("myTestGroup");
            // 设置集群的NameServer地址,多个地址之间以分号分隔
            producer.setNamesrvAddr("139.196.184.3:9876;139.196.51.36:9876");
            // 如果消息发送失败就进行5次重试
            producer.setRetryTimesWhenSendFailed(5);
            // 启动生产者实例
            producer.start();
            // 模拟发送10条消息 到Topic为TopicTest,tag为tagA,消息内容为Hello RocketMQ +i
            for (int i = 0; i  4; i++) {
                try {
                    Message msg = new Message("TopicTest" ,"TagA",("生产第" + i+"条消息").getBytes(RemotingHelper.DEFAULT_CHARSET)
                    );
                    // 调用Produce的send方法发送消息
                    SendResult sendResult = producer.send(msg,1000);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            // 发送完消息之后调用producer的shutdown()方法关闭producer
            producer.shutdown();
        }
    }
    
    RocketMQ系列之消息重试+重复消费

    消费者Consumer端重试

    消费者消费消息失败就是指,Consumer从MQ取到消息进行消费的过程中,由于某些原因导致消费失败(网络原因,消息逻辑处理异常,消费者直接宕机等等),下面我们就来看下消费端是怎么处理消息失败的:

  • 设置消费最大重试次数:默认是16,当配置的值大于16的时候,第16次之后就会每次重试时间间隔2小时,当配置的值小于等于16时,重试的间隔时间如下图: RocketMQ系列之消息重试+重复消费 从Broker的启动日志也能发现这一点:
    `2018-04-23 19:21:35 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h`
  • 通过返回重试状态码:Consumer提供了2个状态码
    • **CONSUME_SUCCESS**:消息消费成功状态,如果返回该状态,那么对应的这条消息就会从RMQ上被消费完成并移出MQ
    • **RECONSUME_LATER**:消息消费重试状态,如果返回该状态,消费者会在间隔时间内再次尝试消费该消息,每尝试一次之后,该消息对应的reconsumeTimes的值+1,默认第一次失败时为0,不算重试次数
    • RocketMQ系列之消息重试+重复消费

      通过返回重试状态码:Consumer提供了2个状态码

      RECONSUME_LATER:消息消费重试状态,如果返回该状态,消费者会在间隔时间内再次尝试消费该消息,每尝试一次之后,该消息对应的reconsumeTimes的值+1,默认第一次失败时为0,不算重试次数

      
      // 设置最大重试次数,默认是16次
      consumer.setMaxReconsumeTimes(5);
      // 返回重试状态
      return ConsumeConcurrentlyStatus.RECONSUME_LATER;
      
      RocketMQ系列之消息重试+重复消费

      消息消费重试实践

      消费者消费失败又分多种情况,下面我们将一次次来实践一遍看看效果:

      第一种情况:消费者处理消息逻辑时异常

      Consumer端代码如下:

      
      public class Consumer {
          public static void main(String[] args) throws InterruptedException, MQClientException {
              // 声明一个消费者consumer,需要传入一个组
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest");
              // 设置集群的NameServer地址,多个地址之间以分号分隔
              consumer.setNamesrvAddr("139.196.184.3:9876;139.196.51.36:9876");
              // 设置consumer的消费策略
              consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
              // 集群模式消费,广播消费不会重试
              consumer.setMessageModel(MessageModel.CLUSTERING);
              // 设置最大重试次数,默认是16次
              consumer.setMaxReconsumeTimes(5);
              // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
              consumer.subscribe("TopicTest", "*");
              // Listener,主要进行消息的逻辑处理,监听topic,如果有消息就会立即去消费
              consumer.registerMessageListener(new MessageListenerConcurrently() {
                  @Override
                  public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
                      // 获取第一条消息,进行处理
                      try {
                          MessageExt messageExt = msgs.get(0);
                          String msgBody = new String(messageExt.getBody(),"utf-8");
                          System.out.println(" 接收新的消息:消息内容为:"+msgBody +" == 消息整体为:"+ msgs);
                          // 模拟消息消费失败操作
                          if(StringUtils.equals(msgBody,"生产第2条消息")){
                              int a = 1 / 0;
                          }
                      } catch (Exception e) {
                          e.printStackTrace();
                          System.out.println(e);
                          // 尝试重新消费,直接第三次如果还不成功就放弃消费,进行消息消费失败补偿操作
                          if(msgs.get(0).getReconsumeTimes() == 3){
                              // 返回成功状态码,消息会在mq上被清掉,但是这是一条失败的消息,所以我们需要做失败补偿操作
                              // 补偿策略:记录数据库或者日志,启动一个定时脚本去扫描这些失败的消息,进行相应处理
                              //           或者将失败的消息放入另一个topic,生产者订阅该topic,将失败的消息发送给生产者,生产者重新发送到mq上
                              System.out.println("消息记录日志:"+msgs);
                              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                          }else {
                              // 重试状态码,重试机制可配置
                              System.out.println("消息消费失败,尝试重试!!!");
                              return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                          }
                      }
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  }
              });
              // 调用start()方法启动consumer
              consumer.start();
              System.out.printf("Consumer1 启动.%n");
          }
      }
      

      在这里,为了验证之前说的Group组的概念,我开了2个Consumer端,属于同一个组,我们其实可以看到消息是被负载的分给2个Consumer的,我们看下面运行结果:

      RocketMQ系列之消息重试+重复消费

      RocketMQ系列之消息重试+重复消费我们再看下该消息对应的重试次数参数变化:

      RocketMQ系列之消息重试+重复消费

      第二种情况:当同一个组中的消费者Consumer宕机之后,MQ会将消息转发给剩下的其他Consumer处理,包括失败重试的消息也一样会转到其他Consumer中被处理(是不是突然对这个Group组有着莫名的好感)

      我们看下面这个例子:我先启动了2个Consumer,这个重试的消息落在了c1的身上,当重试一次之后,我把c1宕机了,我们来看看c2发生了什么,第2条消息被转发到了c2上,而且重试次数也是在之前c1的基础上操作的;

      RocketMQ系列之消息重试+重复消费 RocketMQ系列之消息重试+重复消费

      第三种情况:当同一个组下的某个Consumer处理的消息超时的时候,MQ消息就会不断尝试处理这条消息,直到发送成功为止(这个是RMQ内部自己做的重试机制),这种情况是不会转发给另一个Consumer处理的:生产者生产一条消息,被c2处理了,c2睡眠60秒,在这60秒内,消息都是一直在c2上进行重试(隐式实现),直到我把c2宕机,你会发现,消息才会被c1处理(上述第二种情况):

      
      MessageExt messageExt = msgs.get(0);
                          String msgBody = new String(messageExt.getBody(), "utf-8");
                          System.out.println(" 接收新的消息:消息内容为:" + msgBody + " == 消息重试次数:" + messageExt.getReconsumeTimes());
                          Thread.sleep(60000);
      
      RocketMQ系列之消息重试+重复消费

      RocketMQ系列之消息重试+重复消费RocketMQ系列之消息重试+重复消费

      RocketMQ系列之消息重试+重复消费

      讲到这里,其实我们的消息重试就差不多讲完了,但是有一点一定要注意

      RocketMQ系列之消息重试+重复消费RocketMQ系列之消息重试+重复消费RocketMQ系列之消息重试+重复消费注意:消费端的消息重试机制一定要在集群消费模式下才有效,广播消费模式下,RMQ是不会进行重试机制的,广播模式下,消息只消费一次,不管你有没有成功!!!

      RocketMQ系列之消息重试+重复消费 RocketMQ系列之消息重试+重复消费

      消息重复消费问题

      之前我们讲过当我们先启动生产者生产消息,后启动消费者消费消息时,当多个消费者就有可能消费到同一条消息,就像2个人去领任务,第一个人先领取了任务1,但是还在处理,任务还没完成,第二个人过来时,也看到了任务1,就也领取了任务1,然后就造成2个人处理了同一个任务,我们可以看下面示例:c1和c2同时处理了第一条消息,很明显这是不合理的

      RocketMQ系列之消息重试+重复消费 RocketMQ系列之消息重试+重复消费

      对于上面的问题,我们就需要相应的处理策略,我总结觉得可以从下面2个方面入手

      保证消费端处理消息的业务逻辑保持幂等性

      如何保证幂等呢,我们主要从以下几个手段考虑:

    • 幂等性可以自己业务逻辑实现,例如不管逻辑代码执行多少次,只要是同一个编号处理,得到的结果都是一样的,例如更新订单状态,只要是同一个订单号,就算重复消费,执行了多个update,最终数据库还是一样的结果;
    • 如果不是update这种操作呢,例如insert一条订单下单成功记录,那么此时我们可以通过设置数据库表某个字段唯一约束,例如订单号,来解决处理结果的幂等;
    • 如果insert的数据不能设置唯一约束呢,那么我们还可以启动一个脚本,定时扫描数据库表,发现如果是同样的数据被生成出来,可以删掉一条,以此来保证重复消费带来的数据重复;
    • 如果不是update这种操作呢,例如insert一条订单下单成功记录,那么此时我们可以通过设置数据库表某个字段唯一约束,例如订单号,来解决处理结果的幂等;

      总之,不管你用什么办法,就是假如消息被重复消费了,那么我们一定要想办法来保证执行结果的幂等。

      保证每条消息都有唯一标识,且每条消息只会被处理一次

      既然上面是假如消息被重复消费了,那么当然还有一个办法就是防止消息被重复消费主要有下面2个手段:

    • 利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息
    • 我们可以给每条消息自定义一个状态字段,当生产消息时默认为未消费状态,当获取到消息时,标为正在消息状态,当消费完时标为已消费状态(这一步可以不做,因为当一个消息被成功消费完时,其实他也就不在RMQ中了,其他消费者也不会获取到这条消息)。然后每次消费者消费消息时,都先对消息这个状态值进行判断,如果是正在消费或者已消费就不做处理,直接获取下一条
    • 我们可以给每条消息自定义一个状态字段,当生产消息时默认为未消费状态,当获取到消息时,标为正在消息状态,当消费完时标为已消费状态(这一步可以不做,因为当一个消息被成功消费完时,其实他也就不在RMQ中了,其他消费者也不会获取到这条消息)。然后每次消费者消费消息时,都先对消息这个状态值进行判断,如果是正在消费或者已消费就不做处理,直接获取下一条

      OK,以上就是我们今天所讲的消息重试和重复消费问题,希望看完,能对您有所帮助,便于你能更好的使用RMQ,同时也为他的天才设计感到敬佩!!!

      RocketMQ系列之消息重试+重复消费

      原文始发于微信公众号(Justin的后端书架):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> RocketMQ系列之消息重试+重复消费


     上一篇
    RocketMQ系列之顺序消费 RocketMQ系列之顺序消费
    前言 上节我们介绍了RMQ的两大亮点,重试和重复消费的问题,其实重试才能算亮点,重复消费最终还是要由我们自己来解决这个问题,RMQ自身并没有提供很好的机制,至少目前是没有,不知道将来会不会有,OK扯远了,今天呢,我们再来介绍RMQ一个不错的
    下一篇 
    RocketMQ系列之架构浅谈 RocketMQ系列之架构浅谈
    前言 上节我们介绍了一个RMQ的简单入门级别的demo,我们知道了生产者生产消息到MQ,然后消费者订阅消费的过程,但是那只是很简单的讲解了一下,还有很多细节我们没有去细说,今天开始我们将一个个深入的讲解下,首先今天我们先讲讲RMQ的架构设计