RocketMQ 平滑升级到主从切换(实战篇)

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> RocketMQ 平滑升级到主从切换(实战篇)

本文主要介绍如何将 RocketMQ 集群从原先的主从同步升级到主从切换。

本文首先介绍与 DLedger 多副本即 RocketMQ 主从切换相关的核心配置属性,然后尝试搭建一个主从同步集群,最后将原先的 RocketMQ 集群平滑升级到 DLedger 集群的示例,并简单测试一下主从切换功能。

1、RocketMQ 主从切换核心配置参数详解

其主要的配置参数如下所示:

  • enableDLegerCommitLog 是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true 。

  • dLegerGroup 节点所属的 raft 组,建议与 brokerName 保持一致,例如 broker-a。

  • dLegerPeers 集群节点信息,示例配置如下:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913,多个节点用英文冒号隔开,单个条目遵循   legerSlefId-ip:端口,这里的端口用作 dledger 内部通信。

  • dLegerSelfId 当前节点id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字。

  • storePathRootDir DLedger 日志文件的存储根目录,为了能够支持平滑升级,该值与 storePathCommitLog 设置为不同的目录。

  • dLegerGroup
    节点所属的 raft 组,建议与 brokerName 保持一致,例如 broker-a。

    dLegerSelfId
    当前节点id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字。

    2、搭建主从同步环境

    首先先搭建一个传统意义上的主从同步架构,往集群中灌一定量的数据,然后升级到 DLedger 集群。

    在 Linux 服务器上搭建一个 rocketmq 主从同步集群我想不是一件很难的事情,故本文就不会详细介绍按照过程,只贴出相关配置。

    实验环境的部署结构采取 一主一次,其部署图如下:

    下面我就重点贴一下 broker 的配置文件。

    220 上的 broker 配置文件如下:

    
     1brokerClusterName = DefaultCluster
     2brokerName = broker-a
     3brokerId = 0
     4deleteWhen = 04
     5fileReservedTime = 48
     6brokerRole = ASYNC_MASTER
     7flushDiskType = ASYNC_FLUSH
     8brokerIP1=192.168.0.220
     9brokerIP2=192.168.0.220
    10namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
    11storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
    12storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
    13autoCreateTopicEnable=false
    14autoCreateSubscriptionGroup=false
    

    221 上 broker 的配置文件如下:

    
     1brokerClusterName = DefaultCluster
     2brokerName = broker-a
     3brokerId = 1
     4deleteWhen = 04
     5fileReservedTime = 48
     6brokerRole = SLAVE
     7flushDiskType = ASYNC_FLUSH
     8brokerIP1=192.168.0.221
     9brokerIP2=192.168.0.221
    10namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
    11storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
    12storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
    13autoCreateTopicEnable=false
    14autoCreateSubscriptionGroup=false
    

    相关的启动命令如下:

    
    1nohup bin/mqnamesrv  /dev/null  2&1 &
    2nohup bin/mqbroker -c conf/broker.conf  /dev/null  2&1 &
    

    安装后的集群信息如图所示:

    3、主从同步集群升级到DLedger

    3.1 部署架构

    DLedger 集群至少需要3台机器,故搭建 DLedger 还需要再引入一台机器,其部署结构图如下:

    从主从同步集群升级到 DLedger 集群,用户最关心的还是升级后的集群是否能够兼容原先的数据,即原先存储在消息能否能被消息消费者消费端,甚至于能否查询到。

    为了方便后续验证,首先我使用下述程序向 mq 集群中添加了一篇方便查询的消息(设置消息的key)。

    
     1public class Producer {
     2    public static void main(String[] args) throws MQClientException, InterruptedException {
     3        DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");
     4        producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");
     5        producer.start();
     6        for(int i =600000; i  600100; i ++) {
     7            try {
     8                Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET));
     9                SendResult sendResult = producer.send(msg);
    10               //System.out.printf("%s%n", sendResult);
    11            } catch (Exception e) {
    12                e.printStackTrace();
    13                Thread.sleep(1000);
    14            }
    15        }
    16        producer.shutdown();
    17        System.out.println("end");
    18    }
    19}
    

    消息的查询结果示例如下:

    3.2 升级步骤

    Step1:将 192.168.0.220 的 rocketmq 拷贝到 192.168.0.222,可以使用如下命令进行操作。在 192.168.0.220 上敲如下命令:

    
    1 scp -r rocketmq-all-4.5.2-bin-release/ root@192.168.0.222:/opt/application/rocketmq-all-4.5.2-bin-release
    

    温馨提示:示例中由于版本是一样,实际过程中,版本需要升级,故需先下载最新的版本,然后将老集群中的 store 目录完整的拷贝到新集群的 store 目录。

    Step2:依次在三台服务器的 broker.conf 配置文件中添加与 dledger 相关的配置属性。

    192.168.0.220 broker配置文件如下:

    
     1brokerClusterName = DefaultCluster
     2brokerId = 0
     3deleteWhen = 04
     4fileReservedTime = 48
     5brokerRole = ASYNC_MASTER
     6flushDiskType = ASYNC_FLUSH
     7brokerIP1=192.168.0.220
     8brokerIP2=192.168.0.220
     9namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
    10storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
    11storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
    12autoCreateTopicEnable=false
    13autoCreateSubscriptionGroup=false
    14# 与 dledger 相关的属性
    15enableDLegerCommitLog=true
    16storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
    17dLegerGroup=broker-a
    18dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
    19dLegerSelfId=n0
    

    192.168.0.221 broker配置文件如下:

    
     1brokerClusterName = DefaultCluster
     2brokerName = broker-a
     3brokerId = 1
     4deleteWhen = 04
     5fileReservedTime = 48
     6brokerRole = SLAVE
     7flushDiskType = ASYNC_FLUSH
     8brokerIP1=192.168.0.221
     9brokerIP2=192.168.0.221
    10namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
    11storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
    12storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
    13autoCreateTopicEnable=false
    14autoCreateSubscriptionGroup=false
    15# 与dledger 相关的配置属性
    16enableDLegerCommitLog=true
    17storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
    18dLegerGroup=broker-a
    19dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
    20dLegerSelfId=n1
    

    192.168.0.222 broker配置文件如下:

    
     1brokerClusterName = DefaultCluster
     2brokerName = broker-a
     3brokerId = 0
     4deleteWhen = 04
     5fileReservedTime = 48
     6brokerRole = ASYNC_MASTER
     7flushDiskType = ASYNC_FLUSH
     8brokerIP1=192.168.0.222
     9brokerIP2=192.168.0.222
    10namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
    11storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
    12storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
    13autoCreateTopicEnable=false
    14autoCreateSubscriptionGroup=false
    15# 与 dledger 相关的配置
    16enableDLegerCommitLog=true
    17storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
    18dLegerGroup=broker-a
    19dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
    20dLegerSelfId=n2
    

    温馨提示:legerSelfId 分别为 n0、n1、n2。在真实的生产环境中,broker配置文件中的 storePathRootDir、storePathCommitLog 尽量使用单独的根目录,这样判断其磁盘使用率时才不会相互影响。

    Step3:将 store/config 下的 所有文件拷贝到 dledger store 的 congfig 目录下。

    
    1cd /opt/application/rocketmq-all-4.5.2-bin-release/store/
    2cp config/* dledger_store/config/
    3
    

    温馨提示:该步骤按照各自按照时配置的目录进行复制即可。

    Step4:依次启动三台 broker。

    
    1nohup bin/mqbroker -c conf/broker.conf  /dev/null  2&1 &
    

    如果启动成功,则在 rocketmq-console 中看到的集群信息如下:

    3.3 验证消息发送与消息查找

    首先我们先验证升级之前的消息是否能查询到,那我们还是查找key 为 m600000 的消息,查找结果如图所示:

    然后我们来测试一下消息发送。测试代码如下:

    
     1public class Producer {
     2    public static void main(String[] args) throws MQClientException, InterruptedException {
     3        DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");
     4        producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");
     5        producer.start();
     6        for(int i =600200; i  600300; i ++) {
     7            try {
     8                Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET));
     9                SendResult sendResult = producer.send(msg);
    10                System.out.printf("%s%n", sendResult);
    11            } catch (Exception e) {
    12                e.printStackTrace();
    13                Thread.sleep(1000);
    14            }
    15        }
    16        producer.shutdown();
    17        System.out.println("end");
    18    }
    19}
    

    执行结果如下:

    再去控制台查询一下消息,其结果也表明新的消息也能查询到。

    最后我们再来验证一下主节点宕机,消息发送是否会受影响。

    在消息发送的过程中,去关闭主节点,其截图如下:

    再来看一下集群的状态:

    等待该复制组重新完成主服务器选举后,即可继续处理消息发送。

    温馨提示:由于本示例是一主一从,故在选举期间,消息不可用,但在真实的生产环境上,其部署架构是多主主从,即一个复制组在 leader 选举期间,其他复制组可以接替该复制组完成消息的发送,实现消息服务的高可用。

    与 DLedger 相关的日志,默认存储在 broker_default.log 文件中。

    本文就介绍到这里了,如果觉得文章对您有帮助的话,还希望帮忙点个【在看】,谢谢。

    推荐阅读:源码分析 RocketMQ DLedger 多副本即主从切换系列文章:

    1、

    2、

    3、

    4、

    5、

    7、

    8、

    9、

    更多文章请关注微信公众号:中间件兴趣圈。

    RocketMQ 平滑升级到主从切换(实战篇)

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> RocketMQ 平滑升级到主从切换(实战篇)


     上一篇
    源码阅读技巧篇——RocketMQ DLedger 多副本即主从切换专栏回顾 源码阅读技巧篇——RocketMQ DLedger 多副本即主从切换专栏回顾
    RocketMQ DLedger 多副本即主从切换专栏总共包含9篇文章,时间跨度大概为2个月的时间,笔者觉得授人以鱼不如授人以渔,借以这个系列来展示该系列的创作始末,展示笔者阅读源码的技巧。 首先在下决心研读  RocketMQ DLedg
    下一篇 
    RocketMQ 整合 DLedger(多副本)即主从切换实现平滑升级的设计技巧 RocketMQ 整合 DLedger(多副本)即主从切换实现平滑升级的设计技巧
    源码分析 RocketMQ DLedger 多副本即主从切换系列已经进行到第8篇了,前面的章节主要是介绍了基于 raft  协议的选主与日志复制,从本篇开始将开始关注如何将 DLedger 应用到 RocketMQ中。 摘要:详细分析了Ro