前言
上节我们介绍了RMQ的两大亮点,重试和重复消费的问题,其实重试才能算亮点,重复消费最终还是要由我们自己来解决这个问题,RMQ自身并没有提供很好的机制,至少目前是没有,不知道将来会不会有,OK扯远了,今天呢,我们再来介绍RMQ一个不错的地方,那就是顺序消费,RMQ是可以保证同一个queue中的消息被顺序的消费。
RMQ实现如何实现顺序消费?
生产者Producer在生产消息时将需要顺序消费的消息发送到同一个queue下,每个topic默认是有4个queue所以Producer需要一个队列选择器来进行queue的选择;
消费者Consumer端在进行消息的消费时,消费者注册的消息监听器就不是之前的MessageListenerConcurrently,而是换成MessageListenerOrderly,这样就可以保证消费者只有一个线程去处理该消息;
Producer端如何操作?
生产端保证将消息发送到topic下同一个队列中即可:我们发送了8条消息到坐标为0的队列中:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 声明一个生产者,需要一个自定义生产者组(后面我们会介绍这个组的概念和作用)
DefaultMQProducer producer = new DefaultMQProducer("myTestGroup");
// 设置集群的NameServer地址,多个地址之间以分号分隔
producer.setNamesrvAddr("");
// 如果消息发送失败就进行5次重试
producer.setRetryTimesWhenSendFailed(5);
// 启动生产者实例
producer.start();
for (int i = 0; i 8; i++) {
Message msg = new Message("TopicTest", "order_1", "key" + i, ("order_1" + i).getBytes());
// 调用Produce的send方法发送消息
try {
// 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
// 重写了MessageQueueSelector 的select方法
@Override
public MessageQueue select(ListMessageQueue list, Message msg, Object arg) {
Integer id = (Integer) arg;
return list.get(id);
}
}, 0);// 队列的下标
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭
producer.shutdown();
}
}
Consumer端
Consumer注册MessageListenerOrderly监听即可:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 声明一个消费者consumer,需要传入一个组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest");
// 设置集群的NameServer地址,多个地址之间以分号分隔
consumer.setNamesrvAddr("");
// 设置consumer的消费策略
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 集群模式消费,广播消费不会重试
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置最大重试次数,默认是16次
//consumer.setMaxReconsumeTimes(5);
// 设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
// Listener,主要进行消息的逻辑处理,监听topic,如果有消息就会立即去消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext consumeOrderlyContext) {
try {
MessageExt messageExt = msgs.get(0);
String msgBody = new String(messageExt.getBody(),"utf-8");
System.out.println(" 接收新的消息:消息内容为:"+msgBody);
} catch (Exception e) {
e.printStackTrace();
System.out.println(e);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 调用start()方法启动consumer
consumer.start();
System.out.printf("Consumer1 启动.%n");
}
}
OK,我们先看下目前MQ上消息情况如下图:
我们依次启动消费者和生产者:
我们在看下控制台消息情况:8条消息出入记录
到这里可能有的小伙伴就会问了,你消息都发送到同一个队列,那如果我发2个队列,会是什么情况呢?我们把生产者改造下:生产者往下标0和3的队列分别发送4条消息:
for (int i = 0; i 4; i++) {
Message msg = new Message("TopicTest", "order_1", "key" + i, ("order_1" + i).getBytes());
// 调用Produce的send方法发送消息
try {
// 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
// 重写了MessageQueueSelector 的select方法
@Override
public MessageQueue select(ListMessageQueue list, Message msg, Object arg) {
Integer id = (Integer) arg;
return list.get(id);
}
}, 0);// 队列的下标
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
for (int i = 0; i 4; i++) {
Message msg = new Message("TopicTest", "order_1", "key2" + i, ("order_2" + i).getBytes());
// 调用Produce的send方法发送消息
try {
// 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
// 重写了MessageQueueSelector 的select方法
@Override
public MessageQueue select(ListMessageQueue list, Message msg, Object arg) {
Integer id = (Integer) arg;
return list.get(id);
}
}, 3);// 队列的下标
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
我们再来看下消费者端是怎么消息的,是否保持顺序消费?
可能会出现上面2种结果:不管是第一种还是第二种结果,虽然第二种结果整体上不是有序的,但是仔细看每个每列中的消息,发现都是有序的,这也证明是有序消费指的是在同一个queue下而不是topic,针对的是队列;
其实MessageListenerOrderly设计就是不允许你在消费消息时启动多个线程去消费,这是设计上就不允许的;
还有一种情况就是启动多个consumer,同时消费,网上流传的版本是多个consumer会分别处理多个不同queue下的数据,我本地是没有测试出来,我试了N次的结果都是启动多个consumer时,只有一个consumer会去消费掉所有的消息,不知道是不是我使用的是新版本RMQ的原因还是别的原因,按道理来说一个组下的consumer是会负载均衡的去消费的,这点我后面再看看。
这是我执行的结果:我分别向4个queue发送了消息,都只会被一个consumer处理:
好了,关于顺序消费的问题就先到这了,这个问题后面我再去查阅相关资源看看到底是什么原因?今天先到这,感谢你的关注,感谢你的阅读!!!
原文始发于微信公众号(Justin的后端书架):