本文沿着的思路,从如下3个方面对其源码进行解读:
消息轨迹格式
发送消息轨迹流程
首先我们来看一下在消息发送端如何启用消息轨迹,示例代码如下:
1public class TraceProducer {
2 public static void main(String[] args) throws MQClientException, InterruptedException {
3 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); // @1
4 producer.setNamesrvAddr("127.0.0.1:9876");
5 producer.start();
6 for (int i = 0; i 10; i++)
7 try {
8 {
9 Message msg = new Message("TopicTest",
10 "TagA",
11 "OrderID188",
12 "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
13 SendResult sendResult = producer.send(msg);
14 System.out.printf("%s%n", sendResult);
15 }
16
17 } catch (Exception e) {
18 e.printStackTrace();
19 }
20 producer.shutdown();
21 }
22}
从上述代码可以看出其关键点是在创建DefaultMQProducer时指定开启消息轨迹跟踪。我们不妨浏览一下DefaultMQProducer与启用消息轨迹相关的构造函数:
1public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
2public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
3
参数如下:
String producerGroup 生产者所属组名。
boolean enableMsgTrace 是否开启跟踪消息轨迹,默认为false。
String customizedTraceTopic 如果开启消息轨迹跟踪,用来存储消息轨迹数据所属的主题名称,默认为:RMQ_SYS_TRACE_TOPIC。
boolean enableMsgTrace
是否开启跟踪消息轨迹,默认为false。
1.1 DefaultMQProducer构造函数
1public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { // @1
2 this.producerGroup = producerGroup;
3 defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
4 //if client open the message trace feature
5 if (enableMsgTrace) { // @2
6 try {
7 AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
8 dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
9 traceDispatcher = dispatcher;
10 this.getDefaultMQProducerImpl().registerSendMessageHook(
11 new SendMessageTraceHookImpl(traceDispatcher)); // @3
12 } catch (Throwable e) {
13 log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
14 }
15 }
16}
代码@1:首先介绍一下其局部变量。
String producerGroup 生产者所属组。
RPCHook rpcHook 生产者发送钩子函数。
boolean enableMsgTrace 是否开启消息轨迹跟踪。
String customizedTraceTopic 定制用于存储消息轨迹的数据。
RPCHook rpcHook
生产者发送钩子函数。
String customizedTraceTopic
定制用于存储消息轨迹的数据。
代码@2:用来构建AsyncTraceDispatcher,看其名:异步转发消息轨迹数据,稍后重点关注。
代码@3:构建SendMessageTraceHookImpl对象,并使用AsyncTraceDispatcher用来异步转发。
1.2 SendMessageTraceHookImpl钩子函数
1、SendMessageTraceHookImpl类图
SendMessageHook 消息发送钩子函数,用于在消息发送之前、发送之后执行一定的业务逻辑,是记录消息轨迹的最佳扩展点。
TraceDispatcher
int queueSize 异步转发,队列长度,默认为2048,当前版本不能修改。
int batchSize 批量消息条数,消息轨迹一次消息发送请求包含的数据条数,默认为100,当前版本不能修改。
int maxMsgSize 消息轨迹一次发送的最大消息大小,默认为128K,当前版本不能修改。
DefaultMQProducer traceProducer 用来发送消息轨迹的消息发送者。
ThreadPoolExecutor traceExecuter 线程池,用来异步执行消息发送。
AtomicLong discardCount 记录丢弃的消息个数。
Thread worker woker线程,主要负责从追加队列中获取一批待发送的消息轨迹数据,提交到线程池中执行。
ArrayBlockingQueue TraceContext traceContextQueue 消息轨迹TraceContext队列,用来存放待发送到服务端的消息。
ArrayBlockingQueue Runnable appenderQueue 线程池内部队列,默认长度1024。
DefaultMQPushConsumerImpl hostConsumer 消费者信息,记录消息消费时的轨迹信息。
String traceTopicName 用于跟踪消息轨迹的topic名称。
int batchSize
批量消息条数,消息轨迹一次消息发送请求包含的数据条数,默认为100,当前版本不能修改。
DefaultMQProducer traceProducer
用来发送消息轨迹的消息发送者。
AtomicLong discardCount
记录丢弃的消息个数。
ArrayBlockingQueue TraceContext traceContextQueue
消息轨迹TraceContext队列,用来存放待发送到服务端的消息。
DefaultMQPushConsumerImpl hostConsumer
消费者信息,记录消息消费时的轨迹信息。
2、 SendMessageTraceHookImpl
sendMessageBefore
1public void sendMessageBefore(SendMessageContext context) {
2 //if it is message trace data,then it doesn't recorded
3 if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { // @1
4 return;
5 }
6 //build the context content of TuxeTraceContext
7 TraceContext tuxeContext = new TraceContext();
8 tuxeContext.setTraceBeans(new ArrayListTraceBean(1));
9 context.setMqTraceContext(tuxeContext);
10 tuxeContext.setTraceType(TraceType.Pub);
11 tuxeContext.setGroupName(context.getProducerGroup()); // @2
12 //build the data bean object of message trace
13 TraceBean traceBean = new TraceBean(); // @3
14 traceBean.setTopic(context.getMessage().getTopic());
15 traceBean.setTags(context.getMessage().getTags());
16 traceBean.setKeys(context.getMessage().getKeys());
17 traceBean.setStoreHost(context.getBrokerAddr());
18 traceBean.setBodyLength(context.getMessage().getBody().length);
19 traceBean.setMsgType(context.getMsgType());
20 tuxeContext.getTraceBeans().add(traceBean);
21}
代码@1:如果topic主题为消息轨迹的Topic,直接返回。
代码@2:在消息发送上下文中,设置用来跟踪消息轨迹的上下环境,里面主要包含一个TraceBean集合、追踪类型(TraceType.Pub)与生产者所属的组。
代码@3:构建一条跟踪消息,用TraceBean来表示,记录原消息的topic、tags、keys、发送到broker地址、消息体长度等消息。
从上文看出,sendMessageBefore主要的用途就是在消息发送的时候,先准备一部分消息跟踪日志,存储在发送上下文环境中,此时并不会发送消息轨迹数据。
sendMessageAfter
1public void sendMessageAfter(SendMessageContext context) {
2 //if it is message trace data,then it doesn't recorded
3 if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) // @1
4 || context.getMqTraceContext() == null) {
5 return;
6 }
7 if (context.getSendResult() == null) {
8 return;
9 }
10
11 if (context.getSendResult().getRegionId() == null
12 || !context.getSendResult().isTraceOn()) {
13 // if switch is false,skip it
14 return;
15 }
16
17 TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
18 TraceBean traceBean = tuxeContext.getTraceBeans().get(0); // @2
19 int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); // @3
20 tuxeContext.setCostTime(costTime); // @4
21 if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
22 tuxeContext.setSuccess(true);
23 } else {
24 tuxeContext.setSuccess(false);
25 }
26 tuxeContext.setRegionId(context.getSendResult().getRegionId());
27 traceBean.setMsgId(context.getSendResult().getMsgId());
28 traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
29 traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
30 localDispatcher.append(tuxeContext); // @5
31}
代码@1:如果topic主题为消息轨迹的Topic,直接返回。
代码@2:从MqTraceContext中获取跟踪的TraceBean,虽然设计成List结构体,但在消息发送场景,这里的数据永远只有一条,即使是批量发送也不例外。
代码@3:获取消息发送耗时。
代码@4:设置costTime(耗时)、success(是否发送成功)、regionId(发送到broker所在的分区)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,则是最后一条消息的物理偏移量)、storeTime,这里使用的是(客户端发送时间 + 二分之一的耗时)来表示消息的存储时间,这里是一个估值。
代码@5:将需要跟踪的信息通过TraceDispatcher转发到Broker服务器。其代码如下:
1public boolean append(final Object ctx) {
2 boolean result = traceContextQueue.offer((TraceContext) ctx);
3 if (!result) {
4 log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
5 }
6 return result;
7}
这里一个非常关键的点是offer方法的使用,当队列无法容纳新的元素时会立即返回false,并不会阻塞。
接下来将目光转向TraceDispatcher的实现。
1.3 TraceDispatcher实现原理
TraceDispatcher,用于客户端消息轨迹数据转发到Broker,其默认实现类:AsyncTraceDispatcher。
TraceDispatcher构造函数
1public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
2 // queueSize is greater than or equal to the n power of 2 of value
3 this.queueSize = 2048;
4 this.batchSize = 100;
5 this.maxMsgSize = 128000;
6 this.discardCount = new AtomicLong(0L);
7 this.traceContextQueue = new ArrayBlockingQueueTraceContext(1024);
8 this.appenderQueue = new ArrayBlockingQueueRunnable(queueSize);
9 if (!UtilAll.isBlank(traceTopicName)) {
10 this.traceTopicName = traceTopicName;
11 } else {
12 this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
13 } // @1
14 this.traceExecuter = new ThreadPoolExecutor(// :
15 10, //
16 20, //
17 1000 * 60, //
18 TimeUnit.MILLISECONDS, //
19 this.appenderQueue, //
20 new ThreadFactoryImpl("MQTraceSendThread_"));
21 traceProducer = getAndCreateTraceProducer(rpcHook); // @2
22}
代码@1:初始化核心属性,该版本这些值都是“固化”的,用户无法修改。
queueSize 队列长度,默认为2048,异步线程池能够积压的消息轨迹数量。
batchSize 一次向Broker批量发送的消息条数,默认为100.
maxMsgSize 向Broker汇报消息轨迹时,消息体的总大小不能超过该值,默认为128k。
discardCount 整个运行过程中,丢弃的消息轨迹数据,这里要说明一点的是,如果消息TPS发送过大,异步转发线程处理不过来时,会主动丢弃消息轨迹数据。
traceContextQueue traceContext积压队列,客户端(消息发送、消息消费者)在收到处理结果后,将消息轨迹提交到噶队列中,则会立即返回。
appenderQueue 提交到Broker线程池中队列。
traceTopicName 用于接收消息轨迹的Topic,默认为RMQ_SYS_TRANS_HALF_TOPIC。
traceExecuter 用于发送到Broker服务的异步线程池,核心线程数默认为10,最大线程池为20,队列堆积长度2048,线程名称:MQTraceSendThread_。、
traceProducer 发送消息轨迹的Producer。
batchSize
一次向Broker批量发送的消息条数,默认为100.
discardCount
整个运行过程中,丢弃的消息轨迹数据,这里要说明一点的是,如果消息TPS发送过大,异步转发线程处理不过来时,会主动丢弃消息轨迹数据。
appenderQueue
提交到Broker线程池中队列。
traceExecuter
用于发送到Broker服务的异步线程池,核心线程数默认为10,最大线程池为20,队列堆积长度2048,线程名称:MQTraceSendThread_。、
代码@2:调用getAndCreateTraceProducer方法创建用于发送消息轨迹的Producer(消息发送者),下面详细介绍一下其实现。
getAndCreateTraceProducer详解
1private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
2 DefaultMQProducer traceProducerInstance = this.traceProducer;
3 if (traceProducerInstance == null) { //@1
4 traceProducerInstance = new DefaultMQProducer(rpcHook);
5 traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
6 traceProducerInstance.setSendMsgTimeout(5000);
7 traceProducerInstance.setVipChannelEnabled(false);
8 // The max size of message is 128K
9 traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
10 }
11 return traceProducerInstance;
12 }
代码@1:如果还未建立发送者,则创建用于发送消息轨迹的消息发送者,其GroupName为:_INNER_TRACE_PRODUCER,消息发送超时时间5s,最大允许发送消息大小118K。
start
1public void start(String nameSrvAddr) throws MQClientException {
2 if (isStarted.compareAndSet(false, true)) { // @1
3 traceProducer.setNamesrvAddr(nameSrvAddr);
4 traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
5 traceProducer.start();
6 }
7 this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); // @2
8 this.worker.setDaemon(true);
9 this.worker.start();
10 this.registerShutDownHook();
11}
开始启动,其调用的时机为启动DefaultMQProducer时,如果启用跟踪消息轨迹,则调用之。
代码@1:如果用于发送消息轨迹的发送者没有启动,则设置nameserver地址,并启动着。
代码@2:启动一个线程,用于执行AsyncRunnable任务,接下来将重点介绍。
AsyncRunnable
1class AsyncRunnable implements Runnable {
2 private boolean stopped;
3 public void run() {
4 while (!stopped) {
5 ListTraceContext contexts = new ArrayListTraceContext(batchSize); // @1
6 for (int i = 0; i batchSize; i++) {
7 TraceContext context = null;
8 try {
9 //get trace data element from blocking Queue — traceContextQueue
10 context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); // @2
11 } catch (InterruptedException e) {
12 }
13 if (context != null) {
14 contexts.add(context);
15 } else {
16 break;
17 }
18 }
19 if (contexts.size() 0) { :
20 AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); // @3
21 traceExecuter.submit(request);
22 } else if (AsyncTraceDispatcher.this.stopped) {
23 this.stopped = true;
24 }
25 }
26 }
27}
代码@1:构建待提交消息跟踪Bean,每次最多发送batchSize,默认为100条。
代码@2:从traceContextQueue中取出一个待提交的TraceContext,设置超时时间为5s,即如何该队列中没有待提交的TraceContext,则最多等待5s。
代码@3:向线程池中提交任务AsyncAppenderRequest。
AsyncAppenderRequest#sendTraceData
1public void sendTraceData(ListTraceContext contextList) {
2 MapString, ListTraceTransferBean transBeanMap = new HashMapString, ListTraceTransferBean();
3 for (TraceContext context : contextList) { //@1
4 if (context.getTraceBeans().isEmpty()) {
5 continue;
6 }
7 // Topic value corresponding to original message entity content
8 String topic = context.getTraceBeans().get(0).getTopic(); // @2
9 // Use original message entity's topic as key
10 String key = topic;
11 ListTraceTransferBean transBeanList = transBeanMap.get(key);
12 if (transBeanList == null) {
13 transBeanList = new ArrayListTraceTransferBean();
14 transBeanMap.put(key, transBeanList);
15 }
16 TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); // @3
17 transBeanList.add(traceData);
18 }
19 for (Map.EntryString, ListTraceTransferBean entry : transBeanMap.entrySet()) { // @4
20 flushData(entry.getValue());
21 }
22}
代码@1:遍历收集的消息轨迹数据。
代码@2:获取存储消息轨迹的Topic。
代码@3:对TraceContext进行编码,这里是消息轨迹的传输数据,稍后对其详细看一下,了解其上传的格式。
代码@4:将编码后的数据发送到Broker服务器。
TraceDataEncoder#encoderFromContextBean
根据消息轨迹跟踪类型,其格式会有一些不一样,下面分别来介绍其合适。
PUB(消息发送)
1case Pub: {
2 TraceBean bean = ctx.getTraceBeans().get(0);
3 //append the content of context and traceBean to transferBean's TransData
4 sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
5 .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
6 .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
7 .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
8 .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
9 .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
10 .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
11 .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
12 .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
13 .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
14 .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
15 .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
16 .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
17 .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
18}
消息轨迹数据的协议使用字符串拼接,字段的分隔符号为1,整个数据以2结尾,感觉这个设计还是有点“不可思议”,为什么不直接使用json协议呢?
SubBefore(消息消费之前)
1for (TraceBean bean : ctx.getTraceBeans()) {
2 sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
3 .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
4 .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
5 .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
6 .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
7 .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
8 .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
9 .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
10 }
11}
轨迹就是按照上述顺序拼接而成,各个字段使用1分隔,每一条记录使用2结尾。
SubAfter(消息消费后)
1case SubAfter: {
2 for (TraceBean bean : ctx.getTraceBeans()) {
3 sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
4 .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
5 .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
6 .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
7 .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
8 .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
9 .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
10 }
11 }
12}
格式编码一样,就不重复多说。
经过上面的源码跟踪,消息发送端的消息轨迹跟踪流程、消息轨迹数据编码协议就清晰了,接下来我们使用一张序列图来结束本部分的讲解。
其实行文至此,只关注了消息发送的消息轨迹跟踪,消息消费的轨迹跟踪又是如何呢?其实现原理其实是一样的,就是在消息消费前后执行特定的钩子函数,其实现类为ConsumeMessageTraceHookImpl,由于其实现与消息发送的思路类似,故就不详细介绍了。
消息轨迹数据如何存储
其实从上面的分析,我们已经得知,RocketMQ的消息轨迹数据存储在到Broker上,那消息轨迹的主题名如何指定?其路由信息又怎么分配才好呢?是每台Broker上都创建还是只在其中某台上创建呢?RocketMQ支持系统默认与自定义消息轨迹的主题。
2.1 使用系统默认的主题名称
RocketMQ默认的消息轨迹主题为:RMQ_SYS_TRACE_TOPIC,那该Topic需要手工创建吗?其路由信息呢?
1{
2 if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { // @1
3 String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
4 TopicConfig topicConfig = new TopicConfig(topic);
5 this.systemTopicList.add(topic);
6 topicConfig.setReadQueueNums(1); // @2
7 topicConfig.setWriteQueueNums(1);
8 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
9 }
10}
上述代码出自TopicConfigManager的构造函数,在Broker启动的时候会创建topicConfigManager对象,用来管理topic的路由信息。
代码@1:如果Broker开启了消息轨迹跟踪(traceTopicEnable=true)时,会自动创建默认消息轨迹的topic路由信息,注意其读写队列数为1。
2.2 用户自定义消息轨迹主题
在创建消息发送者、消息消费者时,可以显示的指定消息轨迹的Topic,例如:
1public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)
2
3public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
4 AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
5
通过customizedTraceTopic来指定消息轨迹Topic。
温馨提示:通常在生产环境上,不会开启自动创建主题,故需要RocketMQ运维管理人员提前创建好Topic。
好了,本文就介绍到这里了,详细介绍了RocktMQ消息轨迹的实现原理,下一篇,我们将进入到多副本的学习中。
各位读者朋友们,如果本文对您有一定的帮助,麻烦点下在看,您的鼓励是我持续创作的最大动力。更多精彩文章,请关注微信公众号:
原文始发于微信公众号(中间件兴趣圈):