消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

原文地址:http://www.yunai.me/RocketMQ/message-send-and-receive/?mp

RocketMQ 带注释源码地址 :https://github.com/YunaiV/incubator-rocketmq
😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号

 

1、概述

2、Producer 发送消息

  • DefaultMQProducerImpl#tryToFindTopicPublishInfo()
  • MQFaultStrategy
  • DefaultMQProducerImpl#sendKernelImpl()
  • MQFaultStrategy
  • LatencyFaultTolerance
  • LatencyFaultToleranceImpl
  • FaultItem
  • DefaultMQProducer#send(Message)
  • DefaultMQProducerImpl#sendDefaultImpl()

3、Broker 接收消息

  • AbstractSendMessageProcessor#msgCheck
  • SendMessageProcessor#sendMessage
  • DefaultMessageStore#putMessage

4、某种结尾

1、概述

  1. Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。
  2. Broker 接收消息。(存储消息在《RocketMQ 源码分析 —— Message 存储》解析)
消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

2、Producer 发送消息

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

DefaultMQProducer#send(Message)


  // .... 省略代码

 

  • 说明:发送同步消息, DefaultMQProducer#send(Message) 对  DefaultMQProducerImpl#send(Message)进行封装。

DefaultMQProducerImpl#sendDefaultImpl()


 // .... 省略代码

 

  • 说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。
  • 第 1 至 7 行:对 sendsendDefaultImpl(...)进行封装。
  • 第 20 行 : invokeID仅仅用于打印日志,无实际的业务用途。
  • 第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
  • 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。
  • 第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy
  • 第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()
  • 第 46 行 :更新 Broker可用性信息。在选择发送到的消息队列时,会参考 Broker发送消息的延迟,详细解析见:MQFaultStrategy
  • 第 62 至 68 行:当抛出 RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时( RemotingTimeoutException),实际 Broker接收到该消息并处理成功。因此, Consumer在消费时,需要保证幂等性。

DefaultMQProducerImpl#tryToFindTopicPublishInfo()


 // .... 省略代码

 

  • 说明 :获得 Topic发布信息。优先从缓存 topicPublishInfoTable,其次从 Namesrv中获得。
  • 第 3 行 :从缓存 topicPublishInfoTable中获得 Topic发布信息。
  • 第 5 至 9 行 :从  Namesrv 中获得 Topic发布信息。
  • 第 13 至 17 行 :当从  Namesrv 无法获取时,使用  {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当  Broker 开启自动创建 Topic开关时, Broker 接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》。

MQFaultStrategy

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

MQFaultStrategy


 // .... 省略代码

 

  • 说明 : Producer消息发送容错策略。默认情况下容错策略关闭,即 sendLatencyFaultEnable=false
  • 第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。
  • 第 64 行 :未开启容错策略选择消息队列逻辑。
  • 第 74 至 79 行 :更新延迟容错信息。当  Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照 latencyMax notAvailableDuration的配置,对应如下:
Producer发送消息消耗时长Broker不可用时长|------

LatencyFaultTolerance


  // .... 省略代码

 

  • 说明 :延迟故障容错接口

LatencyFaultToleranceImpl


  // .... 省略代码

 

  • 说明 :延迟故障容错实现。维护每个对象的信息。

FaultItem


 // .... 省略代码

 

  • 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。

DefaultMQProducerImpl#sendKernelImpl()


 // .... 省略代码

 

  • 说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给  Broker
  • 第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》。
  • 第 64 至 121 行 :构建发送消息请求 SendMessageRequestHeader
  • 第 107 至 117 行 :执行  MQClientInstance#sendMessage(...) 发起网络请求。

3、Broker 接收消息

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

SendMessageProcessor#sendMessage


 // .... 省略代码

 

  • #processRequest() 说明 :处理消息请求。
  • #sendMessage() 说明 :发送消息,并返回发送消息结果。
  • 第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()。
  • 第 60 至 64 行 :消息队列编号小于0时, Broker 可以设置随机选择一个消息队列。
  • 第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成”%DLQ%” + 分组名, 即加 死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》。
  • 第 105 至 118 行 :创建 MessageExtBrokerInner
  • 第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()。
  • 第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。
  • 第 186 至 214 行 :发送成功,响应。这里 doResponse(ctx, request, response)进行响应,最后 return null,原因是:响应给  Producer 可能发生异常, #doResponse(ctx, request, response)捕捉了该异常并输出日志。这样做的话,我们进行排查  Broker 接收消息成功后响应是否存在异常会方便很多。

AbstractSendMessageProcessor#msgCheck


 // .... 省略代码

 

  • 说明:校验消息是否正确,主要是Topic配置方面,例如: Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。
  • 第 11 至 18 行 :检查Topic是否可以被发送。目前是  {@link MixAll.DEFAULT_TOPIC} 不被允许发送。
  • 第 20 至 51 行 :当找不到Topic配置,则进行创建。当然,创建会存在不成功的情况,例如说: defaultTopic 的Topic配置不存在,又或者是 存在但是不允许继承,详细解析见《RocketMQ 源码分析 —— Topic》。

DefaultMessageStore#putMessage


  // .... 省略代码
  • 说明:存储消息封装,最终存储需要  CommitLog 实现。
  • 第 7 至 27 行 :校验  Broker 是否可以写入。
  • 第 29 至 39 行 :消息格式与大小校验。
  • 第 47 行 :调用  CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》

4、某种结尾

感谢阅读、收藏、点赞本文的工程师同学。

阅读源码是件令自己很愉悦的事情,编写源码解析是让自己脑细胞死伤无数的过程,痛并快乐着。

如果有内容写的存在错误,或是不清晰的地方,见笑了,🙂。欢迎加 QQ:7685413 我们一起探讨,共进步。

再次感谢阅读、收藏、点赞本文的工程师同学。

 

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收


 上一篇
RabbitMQ的安装 RabbitMQ的安装
一、安装erlang环境官网下载:http://www.erlang.org/downloads 这个文件其实不是gz格式的,使用file otp_src_20.1.tar.gz可以查看它的真实数据格式 解压 tar -xvf otp_
下一篇 
RabbitMQ的简单介绍 RabbitMQ的简单介绍
RabbitMQ是采用Erlang语言开发,基于Advanced Message Queuing Protocol (AMQP)开放标准的开发。 Publisher:数据的发送方,生产者。 Consumer:数据的接收方,消费