原文地址:http://www.yunai.me/RocketMQ/message-send-and-receive/?mp
RocketMQ
带注释源码地址 :https://github.com/YunaiV/incubator-rocketmq
😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号
1、概述
2、Producer 发送消息
- DefaultMQProducerImpl#tryToFindTopicPublishInfo()
- MQFaultStrategy
- DefaultMQProducerImpl#sendKernelImpl()
- MQFaultStrategy
- LatencyFaultTolerance
- LatencyFaultToleranceImpl
- FaultItem
- DefaultMQProducer#send(Message)
- DefaultMQProducerImpl#sendDefaultImpl()
3、Broker 接收消息
- AbstractSendMessageProcessor#msgCheck
- SendMessageProcessor#sendMessage
- DefaultMessageStore#putMessage
4、某种结尾
1、概述
Producer
发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。Broker
接收消息。(存储消息在《RocketMQ 源码分析 —— Message 存储》解析)
2、Producer 发送消息
DefaultMQProducer#send(Message)
// .... 省略代码
- 说明:发送同步消息,
DefaultMQProducer#send(Message)
对DefaultMQProducerImpl#send(Message)
进行封装。
DefaultMQProducerImpl#sendDefaultImpl()
// .... 省略代码
- 说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。
- 第 1 至 7 行:对
sendsendDefaultImpl(...)
进行封装。 - 第 20 行 :
invokeID
仅仅用于打印日志,无实际的业务用途。 - 第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
- 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。
- 第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy
- 第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()
- 第 46 行 :更新
Broker
可用性信息。在选择发送到的消息队列时,会参考Broker
发送消息的延迟,详细解析见:MQFaultStrategy - 第 62 至 68 行:当抛出
RemotingException
时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException
),实际Broker
接收到该消息并处理成功。因此,Consumer
在消费时,需要保证幂等性。
DefaultMQProducerImpl#tryToFindTopicPublishInfo()
// .... 省略代码
- 说明 :获得 Topic发布信息。优先从缓存
topicPublishInfoTable
,其次从Namesrv
中获得。 - 第 3 行 :从缓存
topicPublishInfoTable
中获得 Topic发布信息。 - 第 5 至 9 行 :从
Namesrv
中获得 Topic发布信息。 - 第 13 至 17 行 :当从
Namesrv
无法获取时,使用{@link DefaultMQProducer#createTopicKey}
对应的 Topic发布信息。目的是当Broker
开启自动创建 Topic开关时,Broker
接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》。
MQFaultStrategy
MQFaultStrategy
// .... 省略代码
- 说明 :
Producer
消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false
。 - 第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。
- 第 64 行 :未开启容错策略选择消息队列逻辑。
- 第 74 至 79 行 :更新延迟容错信息。当
Producer
发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMax
,notAvailableDuration
的配置,对应如下:
LatencyFaultTolerance
// .... 省略代码
- 说明 :延迟故障容错接口
LatencyFaultToleranceImpl
// .... 省略代码
- 说明 :延迟故障容错实现。维护每个对象的信息。
FaultItem
// .... 省略代码
- 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。
DefaultMQProducerImpl#sendKernelImpl()
// .... 省略代码
- 说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给
Broker
。 - 第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》。
- 第 64 至 121 行 :构建发送消息请求
SendMessageRequestHeader
。 - 第 107 至 117 行 :执行
MQClientInstance#sendMessage(...)
发起网络请求。
3、Broker 接收消息
SendMessageProcessor#sendMessage
// .... 省略代码
#processRequest()
说明 :处理消息请求。#sendMessage()
说明 :发送消息,并返回发送消息结果。- 第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()。
- 第 60 至 64 行 :消息队列编号小于0时,
Broker
可以设置随机选择一个消息队列。 - 第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成”%DLQ%” + 分组名, 即加 死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》。
- 第 105 至 118 行 :创建
MessageExtBrokerInner
。 - 第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()。
- 第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。
- 第 186 至 214 行 :发送成功,响应。这里
doResponse(ctx, request, response)
进行响应,最后return null
,原因是:响应给Producer
可能发生异常,#doResponse(ctx, request, response)
捕捉了该异常并输出日志。这样做的话,我们进行排查Broker
接收消息成功后响应是否存在异常会方便很多。
AbstractSendMessageProcessor#msgCheck
// .... 省略代码
- 说明:校验消息是否正确,主要是Topic配置方面,例如:
Broker
是否有写入权限,topic配置是否存在,队列编号是否正确。 - 第 11 至 18 行 :检查Topic是否可以被发送。目前是
{@link MixAll.DEFAULT_TOPIC}
不被允许发送。 - 第 20 至 51 行 :当找不到Topic配置,则进行创建。当然,创建会存在不成功的情况,例如说:
defaultTopic
的Topic配置不存在,又或者是 存在但是不允许继承,详细解析见《RocketMQ 源码分析 —— Topic》。
DefaultMessageStore#putMessage
// .... 省略代码
- 说明:存储消息封装,最终存储需要
CommitLog
实现。 - 第 7 至 27 行 :校验
Broker
是否可以写入。 - 第 29 至 39 行 :消息格式与大小校验。
- 第 47 行 :调用
CommitLong
进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》
4、某种结尾
感谢阅读、收藏、点赞本文的工程师同学。
阅读源码是件令自己很愉悦的事情,编写源码解析是让自己脑细胞死伤无数的过程,痛并快乐着。
如果有内容写的存在错误,或是不清晰的地方,见笑了,🙂。欢迎加 QQ:7685413 我们一起探讨,共进步。
再次感谢阅读、收藏、点赞本文的工程师同学。