微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者
Dubbo监控的基本实现原理就是在服务调用时收集服务调用并发度、服务响应时间,然后以一定频率向监控中心汇报统计数据。
MonitorFilter过滤器
过滤器作用 监控过滤器,向监控中心汇报服务调用数据。
使用场景 搭建监控中心监控Dubbo服务调用。
阻断条件 非阻断过滤器。
使用场景
搭建监控中心监控Dubbo服务调用。
MonitorFilter声明
1/**
2 * MonitorFilter. (SPI, Singleton, ThreadSafe)
3 */
4@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
5public class MonitorFilter implements Filter {
6 // 省略具体代码
7}
注:MonitorFilter会在生产者、消费者两端生效。
getConcurrent方法详解
1// concurrent counter
2 private AtomicInteger getConcurrent(Invoker? invoker, Invocation invocation) {
3 String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); // @1
4 AtomicInteger concurrent = concurrents.get(key);
5 if (concurrent == null) {
6 concurrents.putIfAbsent(key, new AtomicInteger()); // @2
7 concurrent = concurrents.get(key);
8 }
9 return concurrent;
10 }
主要是获取当前调用服务的调用次数计算器。
代码@1:使用的是ConcurrentMap String, AtomicInteger作为缓存容器,其key为:interfaceName + “.” + methodName。
代码@2:如果是第一次调用,则创建AtomicInteger,否则返回原先的计数器。
invoker方法详解
1@Override
2 public Result invoke(Invoker? invoker, Invocation invocation) throws RpcException {
3 if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { // @1
4 RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called // @2
5 String remoteHost = context.getRemoteHost();
6 long start = System.currentTimeMillis(); // record start timestamp
7 getConcurrent(invoker, invocation).incrementAndGet(); // count up // @3
8 try {
9 Result result = invoker.invoke(invocation); // proceed invocation chain // @4
10 collect(invoker, invocation, result, remoteHost, start, false); // @5
11 return result;
12 } catch (RpcException e) {
13 collect(invoker, invocation, null, remoteHost, start, true); // @6
14 throw e;
15 } finally {
16 getConcurrent(invoker, invocation).decrementAndGet(); // count down // @7
17 }
18 } else {
19 return invoker.invoke(invocation);
20 }
21 }
代码@1:如果url中存在monitor,则设置了监控中心,收集调用信息。
代码@2:获取本次服务调用的上下文环境。
代码@3:服务调用并发次数增加1,(非服务调用总次数,而是当前服务的并发调用)。
代码@4:执行方法之前先记录当前时间,然后调用下一个过滤器,直到真实服务被调用。
代码@5:调用collect方法收集调用信息。
代码@6:如果调用发送RPC异常,则收集错误信息。
代码@7:一次服务调用结束,并发次数减一。
接下来分析一下collect方法。
collect方法详解
1// collect info
2 private void collect(Invoker? invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { // @1
3 try {
4 // ---- service statistics ---- // @2 start
5 long elapsed = System.currentTimeMillis() - start; // invocation cost
6 int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
7 String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
8 String service = invoker.getInterface().getName(); // service name
9 String method = RpcUtils.getMethodName(invocation); // method name
10 String group = invoker.getUrl().getParameter(Constants.GROUP_KEY);
11 String version = invoker.getUrl().getParameter(Constants.VERSION_KEY);
12 URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY); // @2 end
13 Monitor monitor = monitorFactory.getMonitor(url); // @3
14 if (monitor == null) {
15 return;
16 }
17 int localPort;
18 String remoteKey;
19 String remoteValue;
20 if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) { // @4
21 // ---- for service consumer ----
22 localPort = 0;
23 remoteKey = MonitorService.PROVIDER;
24 remoteValue = invoker.getUrl().getAddress();
25 } else { // @5
26 // ---- for service provider ----
27 localPort = invoker.getUrl().getPort();
28 remoteKey = MonitorService.CONSUMER;
29 remoteValue = remoteHost;
30 }
31 String input = "", output = "";
32 if (invocation.getAttachment(Constants.INPUT_KEY) != null) { // @6
33 input = invocation.getAttachment(Constants.INPUT_KEY);
34 }
35 if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) { // @7
36 output = result.getAttachment(Constants.OUTPUT_KEY);
37 }
38 monitor.collect(new URL(Constants.COUNT_PROTOCOL, // @8
39 NetUtils.getLocalHost(), localPort,
40 service + "/" + method,
41 MonitorService.APPLICATION, application,
42 MonitorService.INTERFACE, service,
43 MonitorService.METHOD, method,
44 remoteKey, remoteValue,
45 error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",
46 MonitorService.ELAPSED, String.valueOf(elapsed),
47 MonitorService.CONCURRENT, String.valueOf(concurrent),
48 Constants.INPUT_KEY, input,
49 Constants.OUTPUT_KEY, output,
50 Constants.GROUP_KEY, group,
51 Constants.VERSION_KEY, version));
52 } catch (Throwable t) {
53 logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
54 }
55 }
代码@1:参数说明。
Invoker ? invoker 服务调用Invoker。
Invocation invocation 本次服务调用信息
Result result 执行结果
String remoteHost 调用者host信息。
long start 服务开始调用时间。
boolean error 是否发生错误。
Invocation invocation
本次服务调用信息
String remoteHost
调用者host信息。
boolean error
是否发生错误。
代码@2:统计基础信息字段说明:
URL url:监控中心url。
concurrent :当前并发度。(当前服务并发调用次数)。
service :服务名。
group :服务所属组。
URL url:监控中心url。
代码@3:根据监控中心获取监控中心实现类,这是监控中心实现扩展点,默认使用com.alibaba.dubbo.monitor.dubbo.DubboMonitor。
代码@4:如果是消费端,由于Monitor在消费端与服务端都会生效:
remoteKey:MonitorService.PROVIDER,表示为服务端。
代码@5:如果为服务端:
localPort 为服务端的服务端口号。
remoteKey MonitorService.CONSUMER,表示远端为服务消费者。
remoteValue 消费者host(ip:port)。
remoteKey
MonitorService.CONSUMER,表示远端为服务消费者。
代码@6:获取本次服务调用请求包的字节数,在服务端解码时会在RpcContext中。
代码@7:获取本次服务调用响应包的字节数,在服务端对响应包编码时会写入,具体代码请参考DubboCountCodec类。
代码@8:调用monitor#collect收集调用信息,Monitor默认实现为DubboMonitor。使用的协议为count://localhost:localPort/service/method?application=applicationName&remoteKey=remoteValue&success|failure=1&elapsed=调用开销&concurrent=并发调用次数&input=入参字节数&output=响应字节数&group=服务所属组&version=版本。
DubboMonitor实现原理
Dubbo中默认的Monitor监控实现类为DubboMonitor:
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("DubboMonitorSendTimer", true)):定时调度线程池,使用3个线程的线程池,线程名称以DubboMonitorSendTimer。
private final ScheduledFuture ? sendFuture:调度任务future。 private final Invoker MonitorService monitorInvoker:监控调度Invoker,Dubbo中的监控中心会作为服务提供者暴露服务,服务提供者,服务消费者可以通过注册中心订阅服务,通过该Invoker向监控中心汇报调用统计数据,也就是一次上报就是一次Dubbo RPC服务调用,其实现类为DubboInvoker,也就是可以通过该Invoker使用dubbo协议调用远程Monitor服务。
private final MonitorService monitorService:对monitorInvoker的proxy代理,主要是对toString、hashcode、equals无需通过RPC向MonitorServer服务提供者发起调 用。主要是通过AbstractProxyFactory#getProxy创建,默认子类为JavassistProxyFactory,动态代理的InvokerHandler为: com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler#invoke。
private final ScheduledFuture ? sendFuture:调度任务future。
private final Invoker MonitorService monitorInvoker:监控调度Invoker,Dubbo中的监控中心会作为服务提供者暴露服务,服务提供者,服务消费者可以通过注册中心订阅服务,通过该Invoker向监控中心汇报调用统计数据,也就是一次上报就是一次Dubbo RPC服务调用,其实现类为DubboInvoker,也就是可以通过该Invoker使用dubbo协议调用远程Monitor服务。
1public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
2 String methodName = method.getName();
3 Class?[] parameterTypes = method.getParameterTypes();
4 if (method.getDeclaringClass() == Object.class) {
5 return method.invoke(invoker, args);
6 }
7 if ("toString".equals(methodName) && parameterTypes.length == 0) {
8 return invoker.toString();
9 }
10 if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
11 return invoker.hashCode();
12 }
13 if ("equals".equals(methodName) && parameterTypes.length == 1) {
14 return invoker.equals(args[0]);
15 }
16 return invoker.invoke(new RpcInvocation(method, args)).recreate();
17 }
private final ConcurrentMap Statistics, AtomicReference long[] statisticsMap:统计信息Map。
构造函数分析
1public DubboMonitor(InvokerMonitorService monitorInvoker, MonitorService monitorService) {
2 this.monitorInvoker = monitorInvoker;
3 this.monitorService = monitorService;
4 this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000); // @1
5 // collect timer for collecting statistics data
6 sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { // @2
7 @Override
8 public void run() {
9 // collect data
10 try {
11 send();
12 } catch (Throwable t) {
13 logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
14 }
15 }
16 }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
17 }
代码@1,从url参数中获取interval属性,如果为空,默认为60000,代表60S。
代码@2:启动定时调度任务,默认60S的间隔执行send()方法,向监控中心汇报服务调用统计数据。
collect 收集统计信息方法
1public void collect(URL url) {
2 // data to collect from url
3 int success = url.getParameter(MonitorService.SUCCESS, 0);
4 int failure = url.getParameter(MonitorService.FAILURE, 0);
5 int input = url.getParameter(MonitorService.INPUT, 0);
6 int output = url.getParameter(MonitorService.OUTPUT, 0);
7 int elapsed = url.getParameter(MonitorService.ELAPSED, 0);
8 int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);
9 // init atomic reference
10 Statistics statistics = new Statistics(url);
11 AtomicReferencelong[] reference = statisticsMap.get(statistics);
12 if (reference == null) {
13 statisticsMap.putIfAbsent(statistics, new AtomicReferencelong[]());
14 reference = statisticsMap.get(statistics);
15 }
16 // use CompareAndSet to sum
17 long[] current;
18 long[] update = new long[LENGTH];
19 do {
20 current = reference.get();
21 if (current == null) {
22 update[0] = success;
23 update[1] = failure;
24 update[2] = input;
25 update[3] = output;
26 update[4] = elapsed;
27 update[5] = concurrent;
28 update[6] = input;
29 update[7] = output;
30 update[8] = elapsed;
31 update[9] = concurrent;
32 } else {
33 update[0] = current[0] + success;
34 update[1] = current[1] + failure;
35 update[2] = current[2] + input;
36 update[3] = current[3] + output;
37 update[4] = current[4] + elapsed;
38 update[5] = (current[5] + concurrent) / 2;
39 update[6] = current[6] input ? current[6] : input;
40 update[7] = current[7] output ? current[7] : output;
41 update[8] = current[8] elapsed ? current[8] : elapsed;
42 update[9] = current[9] concurrent ? current[9] : concurrent;
43 }
44 } while (!reference.compareAndSet(current, update));
45 }
收集的信息主要是10个字段
update[0] :调用成功的次数
update[1] :调用失败的次数
update[2] :总调用流量(请求包的总大小)。
update[3] :总响应流量(响应包的总大小)。
update[4] :总响应时长(总服务调用开销)。
update[5] :一次收集周期的平均TPS。
update[6] :最大请求包大小。
update[7] :最大响应包大小。
update[8] :最大响应时间。
update[9] :最大TPS。
send方法
通过monitorService,最终通过monitorInvoker去调用RPC服务向监控中心汇报数据。接下来看一下监控中心的具体实现。
Dubbo监控中心实现原理
Dubbo官方提供了简易版本的监控中心,其项目为dubbo-ops:dubbo-monitor-simple。该项目是个spring-boot项目,启动后可以看到后台管理界面。
该项目服务提供者文件如下:
从中可以看出,监控中心服务提供者实现类为SimpleMonitorService,其实现接口为MonitorService。
接下来重点分析SimpleMonitorService监控中心的实现,关注如下两个点:
1、监控数据持久化。
2、监控报表生成逻辑。
Thread writeThread:监控数据持久化线程。
String statisticsDirectory = “statistics”:数据持久化目录,SimpleMonitorService将数据持久化到磁盘文件。该值指定目录名称。
private volatile boolean running = true:持久化数据线程是否处于运行状态。
SimpleMonitorService构造函数
1public SimpleMonitorService() {
2 queue = new LinkedBlockingQueueURL(Integer.parseInt(ConfigUtils.getProperty("dubbo.monitor.queue", "100000"))); // @1
3 writeThread = new Thread(new Runnable() { // @2 start
4 public void run() {
5 while (running) {
6 try {
7 write(); // write statistics
8 } catch (Throwable t) {
9 logger.error("Unexpected error occur at write stat log, cause: " + t.getMessage(), t);
10 try {
11 Thread.sleep(5000); // retry after 5 secs
12 } catch (Throwable t2) {
13 }
14 }
15 }
16 }
17 });
18 writeThread.setDaemon(true);
19 writeThread.setName("DubboMonitorAsyncWriteLogThread");
20 writeThread.start(); // @2 end
21 chartFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
22 public void run() {
23 try {
24 draw(); // draw chart
25 } catch (Throwable t) {
26 logger.error("Unexpected error occur at draw stat chart, cause: " + t.getMessage(), t);
27 }
28 }
29 }, 1, 300, TimeUnit.SECONDS); // @3
30 statisticsDirectory = ConfigUtils.getProperty("dubbo.statistics.directory");
31 chartsDirectory = ConfigUtils.getProperty("dubbo.charts.directory"); // @4
32 }
代码@1:创建有界阻塞队列LinkedBlockingQueue,容量默认为100000个,可通过配置参数dubbo.monitor.queue改变默认值,如果队列中已挤压未被处理,后续监控数据将被默认丢弃。
代码@2:创建持久化监控数据线程,名称为DubboMonitorAsyncWriteLogThread,其使命是从LinkedBlockingQueue中获取监控原始数据,如果队列中没数据则被阻塞,然后写入文件中。
代码@3:开启定时调度任务,已每个5分钟的频率,根据持久化的监控数据,生成饼图。
代码@4:获取数据持久化目录与饼图存放目录。
SimpleMonitorService#write
1private void write() throws Exception {
2 URL statistics = queue.take();
3 if (POISON_PROTOCOL.equals(statistics.getProtocol())) {
4 return;
5 }
6 String timestamp = statistics.getParameter(Constants.TIMESTAMP_KEY);
7 Date now;
8 if (timestamp == null || timestamp.length() == 0) {
9 now = new Date();
10 } else if (timestamp.length() == "yyyyMMddHHmmss".length()) {
11 now = new SimpleDateFormat("yyyyMMddHHmmss").parse(timestamp);
12 } else {
13 now = new Date(Long.parseLong(timestamp));
14 }
15 String day = new SimpleDateFormat("yyyyMMdd").format(now);
16 SimpleDateFormat format = new SimpleDateFormat("HHmm");
17 for (String key : types) {
18 try {
19 String type;
20 String consumer;
21 String provider;
22 if (statistics.hasParameter(PROVIDER)) {
23 type = CONSUMER;
24 consumer = statistics.getHost();
25 provider = statistics.getParameter(PROVIDER);
26 int i = provider.indexOf(':');
27 if (i 0) {
28 provider = provider.substring(0, i);
29 }
30 } else {
31 type = PROVIDER;
32 consumer = statistics.getParameter(CONSUMER);
33 int i = consumer == null ? -1 : consumer.indexOf(':');
34 if (i 0) {
35 consumer = consumer.substring(0, i);
36 }
37 provider = statistics.getHost();
38 }
39 String filename = statisticsDirectory
40 + "/" + day
41 + "/" + statistics.getServiceInterface()
42 + "/" + statistics.getParameter(METHOD)
43 + "/" + consumer
44 + "/" + provider
45 + "/" + type + "." + key;
46 File file = new File(filename);
47 File dir = file.getParentFile();
48 if (dir != null && !dir.exists()) {
49 dir.mkdirs();
50 }
51 FileWriter writer = new FileWriter(file, true);
52 try {
53 writer.write(format.format(now) + " " + statistics.getParameter(key, 0) + "n");
54 writer.flush();
55 } finally {
56 writer.close();
57 }
58 } catch (Throwable t) {
59 logger.error(t.getMessage(), t);
60 }
61 }
62 }
数据存储在物理磁盘上,其文件为为:${dubbo.statistics.directory}/${day}/${interfacename}/${method}/${consumer}/${provider}/[consume|provider]/key。
其中key可取值:
FAILURE 调用失败次数
CONCURRENT TPS
MAX_CONCURRENT 最大TPS
以provider.concurrent为例,说明一下其内容:
其内容组织方式为:时间(时分:采集的值)。
draw
根据持久化的数据,在特定的目录下创建饼图,创建饼图方法createChart,具体使用JFreeChart相关类图,在这里就不细细讲解了,感兴趣的朋友可以百度查询相关用法。
监控中心使用效果一览
应用一览表
这个功能可以描述系统与系统的关联关系。
表格字段说明:
1、Application Name:应用名称
2、Providers:该应用包含的服务提供者信息,点击进去可以查看具体的服务提供者URL。
3、Consumers(1):该应用包含的服务消费者信息,点击进去可以查看具体的服务消费者URL。
4、Depends On:该应用依懒的应用。
5、Used By:该应用被依懒的应用。
服务一览表
表格字段说明:
Service Name:服务名。
Application:服务所属应用名。
Providers:服务提供者信息,点击进去,可以看到详细的服务提供者信息。
Consumers:该服务的消费者信息。
Statistics:表格统计信息
饼图统计信息,主要从两个维度展示:QPS(接口每秒请求数)、平均响应时间(包含最大,最小响应时间)。
Dubbo简易监控中心使用方法
1、安装Dubbo简易监控中心
从github dubbo仓库中下载dubbo-simple-monitor即可。
2、应用程序如何使用Dubbo监控中心
成功安装完监控中心还只是第一步,为了监控中心能收集服务调用信息,需要在Dubbo服务提、Dubbo消费者供者所在的应用的dubbo配置文件中加上如下内容:
dubbo:monitor protocol=”registry” /,表示从注册中心发现监控中心的地址,并将服务调用信息提交到监控中心。
服务提供者默认以一分钟的频率(可配置)调用监控中心的dubbo服务,向监控中心上报服务调用信息。监控中心宕机,并不影响消费者,服务提供者的正常工作。
如果要配置其调用频率,可通过如下配置,默认建议保持一分钟的频率,甚至更小,这个频率设置低点,对整个服务器的压力不会增加更大
1 dubbo:monitor protocol="registry"
2 dubbo:parameter key = "interval" value="60000" !-- 单位为毫秒--
3/ dubbo:monitor
注:Dubbo监控中心,服务提供者、服务消费者都可以单独按需配置。
《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
新书7折优惠!7折优惠!7折优惠!
更多文章请关注微信公众号:
推荐关注微信公众号:RocketMQ官方微信公众号
原文始发于微信公众号(中间件兴趣圈):