消息队列中间件 RocketMQ 源码分析 —— Message 存储

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 消息队列中间件 RocketMQ 源码分析 —— Message 存储

原文地址:http://www.yunai.me/RocketMQ/message-store/
(建议使用原文地址阅读:1、阅读体验;2、代码排版混乱因而省略;)

`RocketMQ` **带注释源码**地址 :https://github.com/YunaiV/incubator-rocketmq  **😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号**

  • 1、概述
  • 2、CommitLog 结构
  • 3、CommitLog 存储消息
    • MappedFile#落盘
    • FlushRealTimeService
    • CommitRealTimeService
    • GroupCommitService
    • CommitLog#putMessage(...)
    • MappedFileQueue#getLastMappedFile(...)
    • MappedFile#appendMessage(...)
    • DefaultAppendMessageCallback#doAppend(...)
    • FlushCommitLogService
    • 结尾

      1、概述

      本文接《RocketMQ 源码分析 —— Message 发送与接收》;
      主要解析  CommitLog 存储消息部分。

      2、CommitLog 结构

      CommitLog MappedFileQueue MappedFile 的关系如下:

      消息队列中间件 RocketMQ 源码分析 —— Message 存储

      CommitLog :  MappedFileQueue :  MappedFile = 1 : 1 : N。

      反应到系统文件如下:

      
      Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:27 00000000000000000000 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:29 00000000001073741824 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000002147483648 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:33 00000000003221225472 -rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000004294967296
      

      CommitLog MappedFileQueue MappedFile 的定义如下:

    • `MappedFile` :00000000000000000000、00000000001073741824、00000000002147483648等文件。
    • `MappedFileQueue` :`MappedFile` 所在的文件夹,对 `MappedFile` 进行封装成文件队列,对上层提供可无限使用的文件容量。
      • 每个 `MappedFile` 统一文件大小。
      • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 `CommitLog` 里默认为 1GB。
      • CommitLog :针对  MappedFileQueue 的封装使用。

        CommitLog 目前存储在  MappedFile 有两种内容类型:

      • MESSAGE :消息。
      • BLANK :文件不足以存储消息时的空白占位。
      • BLANK :文件不足以存储消息时的空白占位。

        CommitLog 存储在  MappedFile的结构:

        MESSAGE[1] MESSAGE[2] ... MESSAGE[n - 1] MESSAGE[n] BLANK

        MESSAGE 在  CommitLog 存储结构:

        第几位字段说明数据类型字节数|------

        BLANK 在  CommitLog 存储结构:

        第几位字段说明数据类型字节数|------

        3、CommitLog 存储消息

        消息队列中间件 RocketMQ 源码分析 —— Message 存储

        CommitLog#putMessage(…)

        
         fileName[n] = fileName[n - 1] + n * mappedFileSize  fileName[0] = startOffset - (startOffset % this.mappedFileSize)  目前 `CommitLog` 的 `startOffset` 为 0。 
        此处有个**疑问**,为什么需要 `(startOffset % this.mappedFileSize)`。例如:  | startOffset  | mappedFileSize | createOffset | | --- | :-- | :-- | | 5 | 1 | 5 | | 5 | 2 | 4 | | 5 | 3 | 3  | | 5 | 4 | 4 | | 5 |  5 | 0 |  
        _如果有知道的同学,麻烦提示下。😈_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 `this.mappedFileSize` 为每个文件大小时,`startOffset` 所在文件的开始`offset`*
        
      • 说明 :获取最后一个 `MappedFile`,若不存在或文件已满,则进行创建。
      • 第 5 至 11 行 :计算当文件不存在或已满时,新创建文件的 `createOffset`。
      • 第 14 行 :计算文件名。从此处我们可 以得知,`MappedFile`的文件命名规则:

      • 第 30 至 35 行 :设置 `MappedFile`是否是第一个创建的文件。该标识用于 `ConsumeQueue` 对应的 `MappedFile` ,详见 `ConsumeQueue#fillPreBlank`。
      • MappedFile#appendMessage(…)

        
         // 省略代码
        
      • 说明 :**插入消息**到 `MappedFile`,并返回插入结果。
      • 第 8 行 :获取需要写入的字节缓冲区。为什么会有 `writeBuffer != null` 的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。
      • 第 9 至 11 行 :设置写入 `position`,执行写入,更新 `wrotePosition`(当前写入位置,下次开始写入开始位置)。
      • DefaultAppendMessageCallback#doAppend(…)

        
        // 省略代码 
        
      • 第 51 至 61 行 :获取队列位置(offset)。
      • 第 78 至 95 行 :计算消息总长度。
      • 第 98 至 112 行 :当文件剩余空间不足时,写入 `BLANK` 占位,返回结果。
      • 第 114 至 161 行 :写入 `MESSAGE` 。
      • 第 173 行 :更新队列位置(offset)。
      • 线程服务场景插入消息性能|------ 消息队列中间件 RocketMQ 源码分析 —— Message 存储

        考虑到写入性能,满足  flushLeastPages * OS_PAGE_SIZE 才进行  flush

        考虑到写入性能,满足  commitLeastPages * OS_PAGE_SIZE 才进行  commit

        FlushRealTimeService

      • 说明:实时 `flush `线程服务,调用 `MappedFile#flush` 相关逻辑。
      • 第 23 至 29 行 :每 `flushPhysicQueueThoroughInterval` 周期,执行一次 `flush` 。因为不是每次循环到都能满足 `flushCommitLogLeastPages` 大小,因此,需要一定周期进行一次强制 `flush` 。当然,不能每次循环都去执行强制 `flush`,这样性能较差。
      • 第 33 行 至 37 行 :根据 `flushCommitLogTimed` 参数,可以选择每次循环是**固定周期**还是**等待唤醒**。默认配置是后者,所以,每次插入消息完成,会去调用 `commitLogService.wakeup()` 。
      • 第 45 行 :调用 `MappedFile` 进行 `flush`。
      • 第 61 至 65 行 :`Broker` 关闭时,强制 `flush`,避免有未刷盘的数据。
      • CommitRealTimeService

        消息插入成功时,异步刷盘时使用。
        和  FlushRealTimeService 类似,性能更好。

        
        // 省略代码
        

        GroupCommitService

        消息插入成功时,同步刷盘时使用。

      • 说明:批量写入线程服务。
      • 第 16 至 25 行 :添加写入请求。方法设置了 `sync` 的原因:`this.requestsWrite` 会和 `this.requestsRead` 不断交换,无法保证稳定的同步。
      • 第 27 至 34 行 :读写队列交换。
      • 第 38 至 60 行 :循环写入队列,进行 `flush`。
        • 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在**两个** `MappedFile`(写第N个消息时,`MappedFile` 已满,创建了一个新的),所以需要有循环2次。
        • 第 51 行 :唤醒等待写入请求线程,通过 `CountDownLatch` 实现
        • 第 61 至 66 行 :直接刷盘。此处是由于发送的消息的  isWaitStoreMsgOK 未设置成  TRUE ,导致未走批量提交。

          第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果  wakeup() 时,则会立即进行一次批量提交。当  Broker 设置成同步落盘 && 消息  isWaitStoreMsgOK=true,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。

          结尾

          写的第二篇与RocketMQ源码相关的博文,看到有阅读、点赞、收藏甚至订阅,很受鼓舞。

          《Message存储》比起《Message发送&接收》从难度上说是更大的,当然也是更有趣的,如果存在理解错误或者表达不清晰,还请大家多多包含。如果可以的话,还请麻烦添加 QQ:7685413 进行指出,避免自己的理解错误,给大家造成困扰。

          推荐《Kafka设计解析(六)- Kafka高性能架构之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的说法:高一个喜马拉雅山。😈认真啃读《Linux内核设计与实现(原书第3版)》,day day up。

          再次感谢大家的阅读、点赞、收藏。

          下一篇:《RocketMQ 源码分析 —— Message 拉取与消费》 起航!

          消息队列中间件 RocketMQ 源码分析 —— Message 存储
  • 本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 消息队列中间件 RocketMQ 源码分析 —— Message 存储


     上一篇
    数据库中间件 MyCAT 源码解析 —— 分片结果合并(一) 数据库中间件 MyCAT 源码解析 —— 分片结果合并(一)
    1. 概述相信很多同学看过 MySQL 各种优化的文章,里面 99% 会提到:单表数据量大了,需要进行分片(水平拆分 or 垂直拆分)。分片之后,业务上必然面临的场景:跨分片的数据合并。今天我们就一起来瞅瞅 MyCAT 是如何实现分片结果合
    2021-04-05
    下一篇 
    RabbitMQ的安装 RabbitMQ的安装
    一、安装erlang环境官网下载:http://www.erlang.org/downloads 这个文件其实不是gz格式的,使用file otp_src_20.1.tar.gz可以查看它的真实数据格式 解压 tar -xvf otp_