RocketMQ系列之顺序消费

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> RocketMQ系列之顺序消费

前言

上节我们介绍了RMQ的两大亮点,重试和重复消费的问题,其实重试才能算亮点,重复消费最终还是要由我们自己来解决这个问题,RMQ自身并没有提供很好的机制,至少目前是没有,不知道将来会不会有,OK扯远了,今天呢,我们再来介绍RMQ一个不错的地方,那就是顺序消费,RMQ是可以保证同一个queue中的消息被顺序的消费。

RMQ实现如何实现顺序消费?

生产者Producer在生产消息时将需要顺序消费的消息发送到同一个queue下,每个topic默认是有4个queue所以Producer需要一个队列选择器来进行queue的选择;

消费者Consumer端在进行消息的消费时,消费者注册的消息监听器就不是之前的MessageListenerConcurrently,而是换成MessageListenerOrderly,这样就可以保证消费者只有一个线程去处理该消息;

Producer端如何操作?

生产端保证将消息发送到topic下同一个队列中即可:我们发送了8条消息到坐标为0的队列中:


public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 声明一个生产者,需要一个自定义生产者组(后面我们会介绍这个组的概念和作用)
        DefaultMQProducer producer = new DefaultMQProducer("myTestGroup");
        // 设置集群的NameServer地址,多个地址之间以分号分隔
        producer.setNamesrvAddr("");
        // 如果消息发送失败就进行5次重试
        producer.setRetryTimesWhenSendFailed(5);
        // 启动生产者实例
        producer.start();

        for (int i = 0; i  8; i++) {
            Message msg = new Message("TopicTest", "order_1", "key" + i, ("order_1" + i).getBytes());
            // 调用Produce的send方法发送消息
            try {
                // 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    // 重写了MessageQueueSelector 的select方法
                    @Override
                    public MessageQueue select(ListMessageQueue list, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        return list.get(id);
                    }
                }, 0);// 队列的下标
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // 关闭
        producer.shutdown();
    }
}

Consumer端

Consumer注册MessageListenerOrderly监听即可:


public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 声明一个消费者consumer,需要传入一个组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest");
        // 设置集群的NameServer地址,多个地址之间以分号分隔
        consumer.setNamesrvAddr("");
        // 设置consumer的消费策略
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 集群模式消费,广播消费不会重试
         consumer.setMessageModel(MessageModel.CLUSTERING);
        // 设置最大重试次数,默认是16次
        //consumer.setMaxReconsumeTimes(5);
        // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTest", "*");
        // Listener,主要进行消息的逻辑处理,监听topic,如果有消息就会立即去消费
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                try {
                    MessageExt messageExt = msgs.get(0);
                    String msgBody = new String(messageExt.getBody(),"utf-8");
                    System.out.println(" 接收新的消息:消息内容为:"+msgBody);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println(e);
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 调用start()方法启动consumer
        consumer.start();
        System.out.printf("Consumer1 启动.%n");
    }
}

OK,我们先看下目前MQ上消息情况如下图:

RocketMQ系列之顺序消费

我们依次启动消费者和生产者:

RocketMQ系列之顺序消费 RocketMQ系列之顺序消费

我们在看下控制台消息情况:8条消息出入记录

RocketMQ系列之顺序消费

到这里可能有的小伙伴就会问了,你消息都发送到同一个队列,那如果我发2个队列,会是什么情况呢?我们把生产者改造下:生产者往下标0和3的队列分别发送4条消息:


for (int i = 0; i  4; i++) {
            Message msg = new Message("TopicTest", "order_1", "key" + i, ("order_1" + i).getBytes());
            // 调用Produce的send方法发送消息
            try {
                // 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    // 重写了MessageQueueSelector 的select方法
                    @Override
                    public MessageQueue select(ListMessageQueue list, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        return list.get(id);
                    }
                }, 0);// 队列的下标
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        for (int i = 0; i  4; i++) {
            Message msg = new Message("TopicTest", "order_1", "key2" + i, ("order_2" + i).getBytes());
            // 调用Produce的send方法发送消息
            try {
                // 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    // 重写了MessageQueueSelector 的select方法
                    @Override
                    public MessageQueue select(ListMessageQueue list, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        return list.get(id);
                    }
                }, 3);// 队列的下标
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

我们再来看下消费者端是怎么消息的,是否保持顺序消费?

RocketMQ系列之顺序消费 RocketMQ系列之顺序消费

可能会出现上面2种结果:不管是第一种还是第二种结果,虽然第二种结果整体上不是有序的,但是仔细看每个每列中的消息,发现都是有序的,这也证明是有序消费指的是在同一个queue下而不是topic,针对的是队列;

其实MessageListenerOrderly设计就是不允许你在消费消息时启动多个线程去消费,这是设计上就不允许的;

还有一种情况就是启动多个consumer,同时消费,网上流传的版本是多个consumer会分别处理多个不同queue下的数据,我本地是没有测试出来,我试了N次的结果都是启动多个consumer时,只有一个consumer会去消费掉所有的消息,不知道是不是我使用的是新版本RMQ的原因还是别的原因,按道理来说一个组下的consumer是会负载均衡的去消费的,这点我后面再看看。

这是我执行的结果:我分别向4个queue发送了消息,都只会被一个consumer处理:

RocketMQ系列之顺序消费 RocketMQ系列之顺序消费

好了,关于顺序消费的问题就先到这了,这个问题后面我再去查阅相关资源看看到底是什么原因?今天先到这,感谢你的关注,感谢你的阅读!!!

RocketMQ系列之顺序消费

原文始发于微信公众号(Justin的后端书架):

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> RocketMQ系列之顺序消费


 上一篇
RabbitMQ的简单介绍 RabbitMQ的简单介绍
RabbitMQ是采用Erlang语言开发,基于Advanced Message Queuing Protocol (AMQP)开放标准的开发。 Publisher:数据的发送方,生产者。 Consumer:数据的接收方,消费
下一篇 
RocketMQ系列之消息重试+重复消费 RocketMQ系列之消息重试+重复消费
前言 上节我们介绍了RMQ的几大模块以及每个模块的作用,前面也介绍了RMQ的订阅消费案例,今天我们来看一个RMQ的消息重试机制和重复消费的问题,了解这2点有助于我们更好更合理的处理消息消费的异常情况。 为什么会出现消息重试? 因为