原文地址:http://www.yunai.me/RocketMQ/message-store/
(建议使用原文地址阅读:1、阅读体验;2、代码排版混乱因而省略;)
`RocketMQ` **带注释源码**地址 :https://github.com/YunaiV/incubator-rocketmq **😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号**
- MappedFile#落盘
- FlushRealTimeService
- CommitRealTimeService
- GroupCommitService
- CommitLog#putMessage(...)
- MappedFileQueue#getLastMappedFile(...)
- MappedFile#appendMessage(...)
- DefaultAppendMessageCallback#doAppend(...)
- FlushCommitLogService
结尾
1、概述
本文接《RocketMQ 源码分析 —— Message 发送与接收》;
主要解析
CommitLog
存储消息部分。
2、CommitLog 结构
CommitLog
、
MappedFileQueue
、
MappedFile
的关系如下:
CommitLog
:
MappedFileQueue
:
MappedFile
= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296
CommitLog
、
MappedFileQueue
、
MappedFile
的定义如下:
- 每个 `MappedFile` 统一文件大小。
- 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 `CommitLog` 里默认为 1GB。
- MESSAGE :消息。
- BLANK :文件不足以存储消息时的空白占位。
CommitLog
:针对
MappedFileQueue
的封装使用。
CommitLog
目前存储在
MappedFile
有两种内容类型:
BLANK :文件不足以存储消息时的空白占位。
CommitLog
存储在
MappedFile
的结构:
MESSAGE[1] | MESSAGE[2] | ... | MESSAGE[n - 1] | MESSAGE[n] | BLANK |
---|
MESSAGE
在
CommitLog
存储结构:
BLANK
在
CommitLog
存储结构:
3、CommitLog 存储消息
CommitLog#putMessage(…)
fileName[n] = fileName[n - 1] + n * mappedFileSize fileName[0] = startOffset - (startOffset % this.mappedFileSize) 目前 `CommitLog` 的 `startOffset` 为 0。
此处有个**疑问**,为什么需要 `(startOffset % this.mappedFileSize)`。例如: | startOffset | mappedFileSize | createOffset | | --- | :-- | :-- | | 5 | 1 | 5 | | 5 | 2 | 4 | | 5 | 3 | 3 | | 5 | 4 | 4 | | 5 | 5 | 0 |
_如果有知道的同学,麻烦提示下。😈_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 `this.mappedFileSize` 为每个文件大小时,`startOffset` 所在文件的开始`offset`*
第 14 行 :计算文件名。从此处我们可 以得知,`MappedFile`的文件命名规则:
MappedFile#appendMessage(…)
// 省略代码
DefaultAppendMessageCallback#doAppend(…)
// 省略代码
考虑到写入性能,满足
flushLeastPages * OS_PAGE_SIZE
才进行
flush
。
考虑到写入性能,满足
commitLeastPages * OS_PAGE_SIZE
才进行
commit
。
FlushRealTimeService
CommitRealTimeService
消息插入成功时,异步刷盘时使用。
和
FlushRealTimeService
类似,性能更好。
// 省略代码
GroupCommitService
消息插入成功时,同步刷盘时使用。
- 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在**两个** `MappedFile`(写第N个消息时,`MappedFile` 已满,创建了一个新的),所以需要有循环2次。
- 第 51 行 :唤醒等待写入请求线程,通过 `CountDownLatch` 实现
第 61 至 66 行 :直接刷盘。此处是由于发送的消息的
isWaitStoreMsgOK
未设置成
TRUE
,导致未走批量提交。
第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果
wakeup()
时,则会立即进行一次批量提交。当
Broker
设置成同步落盘 && 消息
isWaitStoreMsgOK=true
,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。
结尾
写的第二篇与RocketMQ源码相关的博文,看到有阅读、点赞、收藏甚至订阅,很受鼓舞。
《Message存储》比起《Message发送&接收》从难度上说是更大的,当然也是更有趣的,如果存在理解错误或者表达不清晰,还请大家多多包含。如果可以的话,还请麻烦添加 QQ:7685413 进行指出,避免自己的理解错误,给大家造成困扰。
推荐《Kafka设计解析(六)- Kafka高性能架构之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的说法:高一个喜马拉雅山。😈认真啃读《Linux内核设计与实现(原书第3版)》,day day up。
再次感谢大家的阅读、点赞、收藏。
下一篇:《RocketMQ 源码分析 —— Message 拉取与消费》 起航!