前言
这几天把服务器上环境重新装了下,由于项目也比较忙,所以更新比较慢,上节我们把RMQ的多Master集群搭建起来了,我们今天就来看看如何向这个集群生产消息以及消费消息。
集群搭建回顾
回顾上节的内容,我总结下以下几步:
第一:最新版RMQ4.2.0要求最低JDK8版本
第二:修改虚拟机的host,配置nameserver域名
第三:安装RMQ
第四:新建RMQ的消息存储文件
第五:修改RMQ的日志输出
第六:配置Broker配置文件
第七:根据虚拟机实际情况修改runserver.sh和runbroker.sh启动参数
第八:启动nameserver再启动broker
第九:生成控制台
OK,我们再来看下集群结果:
JAVA操作RMQ
第一步:POM引入相关jar
根据GitHub上的quickstart,我直接将我们将来需要用到的jar都导入进来了(你其实只需要导入):
parent
groupIdorg.apache.rocketmq/groupId
artifactIdrocketmq-all/artifactId
version4.2.0/version
/parent
dependency
groupId${project.groupId}/groupId
artifactIdrocketmq-client/artifactId
/dependency
dependency
groupId${project.groupId}/groupId
artifactIdrocketmq-srvutil/artifactId
/dependency
dependency
groupIdch.qos.logback/groupId
artifactIdlogback-classic/artifactId
/dependency
dependency
groupIdorg.javassist/groupId
artifactIdjavassist/artifactId
/dependency
dependency
groupIdio.openmessaging/groupId
artifactIdopenmessaging-api/artifactId
/dependency
dependency
groupIdorg.apache.rocketmq/groupId
artifactIdrocketmq-openmessaging/artifactId
version4.2.0/version
第二步:生产者Producer
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 声明一个生产者,需要一个自定义生产者组(后面我们会介绍这个组的概念和作用)
DefaultMQProducer producer = new DefaultMQProducer("myTestGroup");
// 设置集群的NameServer地址,多个地址之间以分号分隔
producer.setNamesrvAddr("139.196.184.3:9876;139.196.51.36:9876");
// 启动生产者实例
producer.start();
// 模拟发送100条消息 到Topic为TopicTest,tag为tagA,消息内容为Hello RocketMQ +i
for (int i = 0; i 100; i++) {
try {
Message msg = new Message("TopicTest" ,"TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 调用Produce的send方法发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
// 发送完消息之后调用producer的shutdown()方法关闭producer
producer.shutdown();
}
}
第三步:消费者Consumer
public class Consumer {
public static void main(String[] args) throws Exception {
// 声明一个消费者consumer,需要传入一个组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest");
// 设置集群的NameServer地址,多个地址之间以分号分隔
consumer.setNamesrvAddr("139.196.184.3:9876;139.196.51.36:9876");
// 设置consumer的消费策略
// CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
// CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
// CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
// Listener,主要进行消息的逻辑处理,监听topic,如果有消息就会立即去消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消费状态 有2种状态 CONSUME_SUCCESS 消费成功 RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 调用start()方法启动consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
OK,现在我们启动消费者,注意一定是要先启动消费者,后启动生产者,这点很好理解,消费者得先订阅,生产者再生产消息,订阅之后就能消费消息,当然如果你非要先启动生产者也行,但是这会有一些问题,会导致有可能消息被重复消费(这点后面我会详细讲解),主要原因是RMQ是通过消费返回消息的状态来判断一条消息是否被消费,如果你先启动生产者,就会造成可能第一个consumer1拿到消息A,还没返回状态时,consumer2也拿到了消息A,就会造成重复消费,所以,尽量避免有消息堆积造成消息重复消费,这些我们后面会详细讲,今天主要讲入门,如何玩成一个生产消费的过程。下面我们来看看这个demo的结果:
producer端(细心的小伙伴会发现生产者怎么是4个一组4个一组的发的,这里先卖个关子):
consumer端:consumer端是一直启动的,监听这它所订阅的主题,一旦有消息就会去处理
我们再来看看控制台:我是生产了2次100,就就是200条消息,也看到这200条102条落到broker-a上98条落到broker-b上,从这里也证实了生产者是4个消息一组发送的,所以才会出现102+98的结果:
这里就是我们的消费者订阅的主题:可以点详情进去看的,这个控制台其实还是不错的
结束
OK,RMQ的入门就介绍到这里了,上面卖的关子我还是忍不住想告诉你,其实这个是我们在配置broker时可以指定的,这里我把Broker配置文件详细的贴一份给小伙伴们吧,小伙伴们做下相应的修改就能使用了:
brokerClusterName=rocketmq-cluster-justin
brokerName=broker-a
brokerId=0
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 roker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
相关文章:
原文始发于微信公众号(Justin的后端书架):