RocketMQ系列之入门

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> RocketMQ系列之入门

前言

这几天把服务器上环境重新装了下,由于项目也比较忙,所以更新比较慢,上节我们把RMQ的多Master集群搭建起来了,我们今天就来看看如何向这个集群生产消息以及消费消息。

RocketMQ系列之入门

集群搭建回顾

回顾上节的内容,我总结下以下几步:

第一:最新版RMQ4.2.0要求最低JDK8版本

第二:修改虚拟机的host,配置nameserver域名

第三:安装RMQ

第四:新建RMQ的消息存储文件

第五:修改RMQ的日志输出

第六:配置Broker配置文件

第七:根据虚拟机实际情况修改runserver.sh和runbroker.sh启动参数

第八:启动nameserver再启动broker

第九:生成控制台

OK,我们再来看下集群结果:

RocketMQ系列之入门 RocketMQ系列之入门 RocketMQ系列之入门 RocketMQ系列之入门 RocketMQ系列之入门

JAVA操作RMQ

第一步:POM引入相关jar

根据GitHub上的quickstart,我直接将我们将来需要用到的jar都导入进来了(你其实只需要导入):


parent
    groupIdorg.apache.rocketmq/groupId
    artifactIdrocketmq-all/artifactId
    version4.2.0/version
  /parent

dependency
      groupId${project.groupId}/groupId
      artifactIdrocketmq-client/artifactId
    /dependency
    dependency
      groupId${project.groupId}/groupId
      artifactIdrocketmq-srvutil/artifactId
    /dependency
    dependency
      groupIdch.qos.logback/groupId
      artifactIdlogback-classic/artifactId
    /dependency
    dependency
      groupIdorg.javassist/groupId
      artifactIdjavassist/artifactId
    /dependency
    dependency
      groupIdio.openmessaging/groupId
      artifactIdopenmessaging-api/artifactId
    /dependency
    dependency
      groupIdorg.apache.rocketmq/groupId
      artifactIdrocketmq-openmessaging/artifactId
      version4.2.0/version

第二步:生产者Producer


public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 声明一个生产者,需要一个自定义生产者组(后面我们会介绍这个组的概念和作用)
        DefaultMQProducer producer = new DefaultMQProducer("myTestGroup");
        // 设置集群的NameServer地址,多个地址之间以分号分隔
        producer.setNamesrvAddr("139.196.184.3:9876;139.196.51.36:9876");
        // 启动生产者实例
        producer.start();
        // 模拟发送100条消息 到Topic为TopicTest,tag为tagA,消息内容为Hello RocketMQ +i
        for (int i = 0; i  100; i++) {
            try {
                Message msg = new Message("TopicTest" ,"TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                // 调用Produce的send方法发送消息
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // 发送完消息之后调用producer的shutdown()方法关闭producer
        producer.shutdown();
    }
}

第三步:消费者Consumer


public class Consumer {
    public static void main(String[] args) throws Exception {
        // 声明一个消费者consumer,需要传入一个组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest");
        // 设置集群的NameServer地址,多个地址之间以分号分隔
        consumer.setNamesrvAddr("139.196.184.3:9876;139.196.51.36:9876");
        // 设置consumer的消费策略
        // CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        // CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        // CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTest", "*");
        // Listener,主要进行消息的逻辑处理,监听topic,如果有消息就会立即去消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 返回消费状态 有2种状态 CONSUME_SUCCESS 消费成功 RECONSUME_LATER 消费失败,需要稍后重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 调用start()方法启动consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

OK,现在我们启动消费者,注意一定是要先启动消费者,后启动生产者,这点很好理解,消费者得先订阅,生产者再生产消息,订阅之后就能消费消息,当然如果你非要先启动生产者也行,但是这会有一些问题,会导致有可能消息被重复消费(这点后面我会详细讲解),主要原因是RMQ是通过消费返回消息的状态来判断一条消息是否被消费,如果你先启动生产者,就会造成可能第一个consumer1拿到消息A,还没返回状态时,consumer2也拿到了消息A,就会造成重复消费,所以,尽量避免有消息堆积造成消息重复消费,这些我们后面会详细讲,今天主要讲入门,如何玩成一个生产消费的过程。下面我们来看看这个demo的结果:

producer端(细心的小伙伴会发现生产者怎么是4个一组4个一组的发的,这里先卖个关子RocketMQ系列之入门):

RocketMQ系列之入门

consumer端:consumer端是一直启动的,监听这它所订阅的主题,一旦有消息就会去处理

RocketMQ系列之入门

我们再来看看控制台:我是生产了2次100,就就是200条消息,也看到这200条102条落到broker-a上98条落到broker-b上,从这里也证实了生产者是4个消息一组发送的,所以才会出现102+98的结果:

RocketMQ系列之入门

这里就是我们的消费者订阅的主题:可以点详情进去看的,这个控制台其实还是不错的

RocketMQ系列之入门

结束

OK,RMQ的入门就介绍到这里了,上面卖的关子我还是忍不住想告诉你,其实这个是我们在配置broker时可以指定的,这里我把Broker配置文件详细的贴一份给小伙伴们吧,小伙伴们做下相应的修改就能使用了:


brokerClusterName=rocketmq-cluster-justin

brokerName=broker-a

brokerId=0

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数

defaultTopicQueueNums=4

#是否允许 roker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

#Broker对外服务的监听端口
listenPort=10911

#删除文件时间点,默认凌晨4点
deleteWhen=04

#文件保留时间,默认48 小时
fileReservedTime=120

#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000

#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88

#存储路径

storePathRootDir=/usr/local/rocketmq/store

#commitLog存储路径

storePathCommitLog=/usr/local/rocketmq/store/commitlog

#消费队列存储路径存储路径

storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue

#消息索引存储路径

storePathIndex=/usr/local/rocketmq/store/index

#checkpoint文件存储路径

storeCheckpoint=/usr/local/rocketmq/store/checkpoint

#abort文件存储路径

abortFile=/usr/local/rocketmq/store/abort

#限制的消息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER

#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false

相关文章:

RocketMQ系列之入门

原文始发于微信公众号(Justin的后端书架):

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> RocketMQ系列之入门


  转载请注明: 好好学java RocketMQ系列之入门

 上一篇
RocketMQ系列之架构浅谈 RocketMQ系列之架构浅谈
前言 上节我们介绍了一个RMQ的简单入门级别的demo,我们知道了生产者生产消息到MQ,然后消费者订阅消费的过程,但是那只是很简单的讲解了一下,还有很多细节我们没有去细说,今天开始我们将一个个深入的讲解下,首先今天我们先讲讲RMQ的架构设计
下一篇 
RocketMQ系列之初识MQ RocketMQ系列之初识MQ
前言 消息中间件是目前互联网项目应用特别广泛的一个中间件,主要用于处理异步操作,系统之间的应用解耦,以及并发下的流量削峰等等应用场景,今天开我们就进入消息中间件MQ的讲解,主流的MQ有很多,我们只需要掌握其中一种就可以了,基本都大同小异,O