(点击上方公众号,可快速关注)
来源:谢晞鸣 ,
fdx321.github.io/2017/08/21/【RocketMQ源码学习】4-消息发送/
fdx321.github.io/2017/08/21/【RocketMQ源码学习】4-消息发送/
1. Client端,三种发送方式
RocketMQ 支持常见的三种发送方式,
SYNC
producer.send(msg)
同步的发送方式,会等待发送结果后才返回。可以用 send(msg, timeout) 的方式指定等待时间,如果不指定,就是默认的 3000ms. 这个timeout 最终会被设置到 ResponseFuture 里,再发送完消息后,用 countDownLatch 去 await timeout的时间,如果过期,就会抛出异常。
ASYNC
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf(“%-10d OK %s %n”, index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf(“%-10d Exception %s %n”, index, e);
e.printStackTrace();
}
});
@Override
System.out.printf(“%-10d OK %s %n”, index, sendResult.getMsgId());
@Override
System.out.printf(“%-10d Exception %s %n”, index, e);
}
异步的发送方式,发送完后,立刻返回。Client 在拿到 Broker 的响应结果后,会回调指定的 callback. 这个 API 也可以指定 Timeout,不指定也是默认的 3000ms.
ONEWAY
producer.sendOneway(msg);
比较简单,发出去后,什么都不管直接返回。
对于每种方式,Producer 还提供了可以指定 MessageQueue, MessageQueueSelector的API,这属于稍微高端一点的玩法,一般用它提供的默认的策略选择 MessageQueue 就可以了。
2. Client端发送过程
下面以 SYNC 方式为例,看下整个消息的发送过程,其他方式略有差异,总体流程类似。
1. 根据 Topic 找到指定的 TopicPublishInfo
先去本地 map 找,如果没有,就去 Namesrv fetch, 如果 Namesrv 里也没有,则用默认的 Topic 再去 fetch TopicRouteData. 对用用默认 Topic 的这种情况,Client 拿到数据后,会去构建 TopicPublishInfo, 然后用当前的 Topic 作为 key 放到本地 map 里。Broker 在接收到消息的时候,会去更新它本地的配置,然后在 registerBroker 的时候会去更新 namesrv 中的 TopicRouteData 信息,这样 Namesrv 中就会有这样一份配置了。当然,也可以事先在 Namesrv 增加该配置,很多公司内部都有这样定制的平台来管理MQ的接入配置。
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private ListMessageQueue messageQueueList = new ArrayListMessageQueue();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
}
public class TopicRouteData {
private String orderTopicConf;
private ListQueueData queueDatas;
private ListBrokerData brokerDatas;
private HashMapString/* brokerAddr /, ListString/ Filter Server */ filterServerTable;
}
private boolean orderTopic = false;
private ListMessageQueue messageQueueList = new ArrayListMessageQueue();
private TopicRouteData topicRouteData;
private String orderTopicConf;
private ListBrokerData brokerDatas;
}
QueueData 定义了这个 read 和 write 的 queue的数量,Client 在拿到 TopicRouteData 后,会根据这里配的数量去构建响应数目的messageQueue,即 messageQueueList. brokerDatas 保存了各个 broker 的相关信息。
2. 从 messageQueueList 中选择一个 MessageQueue
如果没有 enable latencyFaultTolerance,就用递增取模的方式选择。如果 enable 了,在递增取模的基础上,再过滤掉 not available 的。这里所谓的 latencyFaultTolerance, 是指对之前失败的,按一定的时间做退避:
long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
举个例子,如果上次请求的 latency 超过 550L ms, 就退避 3000L ms;超过 1000L,就退避 60000L.
以上就是 Producer 到 Broker 的简单的负载均衡。
3. 发送消息
到这一步,我们已经拿到了这些关键数据:
有了这些数据,就可以构建 RequestHeader 了,大部分字段意思都很明显(当然,前提是对RocketMQ的源码有所熟悉),个别字段见注释。
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
//系统Flag, 用于判断走什么逻辑。标识是否压缩,事务的不同TYPE(prepare/rollback/commit/not transaction) 等
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息Flag, 最终会落地
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
//TODO,暂不知道这个字段是干嘛用的
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//系统Flag, 用于判断走什么逻辑。标识是否压缩,事务的不同TYPE(prepare/rollback/commit/not transaction) 等
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
最后用这些 header 字段,以及 message body 构建 RemotingCommand,通过 remoting 模块发给 broker.
4. 处理结果
3. Broker端,消息的处理和落地
如图,Broker 有很多 Processor 用来处理不同类型的请求,有些 Processor 会共用一个 Processor 线程池。对于消息发送,Broker 的 remoting 模块在接收到请求后,根据request code,最终会交给 SendMessageProcessor 来处理。SendMessageProcessor 会依次做以下处理:
Store 收到消息后,会先做一些校验,然后交给 commitLog 去 put,然后做些统计并返回。Store 存储消息的过程比较复杂,后面会单独分析。
4. 其他
1. 顺序消息
很多应用并不关注消息顺序,而且消息没有顺序并不代表消息内容没有顺序,合理的系统设计可以避免顺序问题。MQ 要保证消息顺序必然会损失性能、增加系统实现复杂度。具体的分析可以看 分布式开放消息系统(RocketMQ)的原理与实践。
http://www.jianshu.com/p/453c6e7ff81c
在 RocketMQ 里, 在发送消息的时候可以自己定义 MessageQueueSelector,对于同一个订单ID(或其他ID)的不同消息,可以让它走同一个 MessageQueue,这样就可以按顺序发给同一个 Broker 了。
2. Batch Message
Producer 的 API 还支持一次发多个消息。
ListMessage messages = new ArrayList();
messages.add(new Message(topic, “Tag”, “OrderID001”, “Hello world 0”.getBytes()));
messages.add(new Message(topic, “Tag”, “OrderID002”, “Hello world 1”.getBytes()));
producer.send(messages);
messages.add(new Message(topic, “Tag”, “OrderID001”, “Hello world 0”.getBytes()));
Client 模块会将 Message List 封装成 MessageBatch,且会标记 requestHeader 的 batch 标志位为 true. Broker 在接收到消息后就可以根据这个标志位去做不同的处理。
5. Reference
系列
- RocketMQ 源码学习 1 : 整体结构
- RocketMQ 源码学习 2 : Namesrv
- RocketMQ 源码学习 3 : Remoting 模块