dubbo-monitor计数监控(1)

叁叁肆2018-11-18 11:12

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


监控总体图:


红色:监控中心 -  dubbo-simple-monitor

黄色:provider

蓝色:consumer

统计总体流程:

  • MonitorFilter向DubboMonitor发送数据
  • DubboMonitor将数据进行聚合后(默认聚合1min中的统计数据)暂存到ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap,然后使用一个含有3个线程(线程名字:DubboMonitorSendTimer)的线程池每隔1min钟,调用SimpleMonitorService遍历发送statisticsMap中的统计数据,每发送完毕一个,就重置当前的Statistics的AtomicReference<long[]>
  • SimpleMonitorService将这些聚合数据塞入BlockingQueue<URL> queue中(队列大写为100000)
  • SimpleMonitorService使用一个后台线程(线程名为:DubboMonitorAsyncWriteLogThread)将queue中的数据写入文件(该线程以死循环的形式来写)
  • SimpleMonitorService还会使用一个含有1个线程(线程名字:DubboMonitorTimer)的线程池每隔5min钟,将文件中的统计数据画成图表

注意:

  • SimpleMonitorService理解为一个服务提供者;而provider和consumer都是一个服务消费者,所以二者的DubboMonitor中的MonitorService实例都是一个代理实例。
  • dubbo-monitor计数监控不支持异步调用下的数据监控

 

一、dubbo-monitor使用

在配置文件中添加:

1 <dubbo:monitor address="10.211.55.5:9090" />

即开启了monitor监控,并且指定了监控中心服务器为“10.211.55.5:9090”。

9090端口是Prometheus的默认端口,dubbo提供的监控中心比较简陋,我们后续会使用Prometheus作为监控中心来存储监控数据。

 

二、服务端加载monitor配置

doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)中:

 1             if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
 2                 ...
 3                 if (registryURLs != null && registryURLs.size() > 0) {
 4                     for (URL registryURL : registryURLs) {
 5                         ...
 6                         URL monitorUrl = loadMonitor(registryURL);
 7                         if (monitorUrl != null) {
 8                             url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
 9                         }
10                        ...
11                     }
12                 } else {
13                     ...
14                 }
15             }

 其中loadMonitor(URL registryURL)方法主要用于创建MonitorConfig对象(如果monitor配置在dubbo.properties中的话),并且设置属性,之后设置到数据总线Url中。

 1     protected URL loadMonitor(URL registryURL) {
 2         if (monitor == null) {
 3             String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address");
 4             String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol");
 5             if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) {
 6                 return null;
 7             }
 8 
 9             monitor = new MonitorConfig();
10             if (monitorAddress != null && monitorAddress.length() > 0) {
11                 monitor.setAddress(monitorAddress);
12             }
13             if (monitorProtocol != null && monitorProtocol.length() > 0) {
14                 monitor.setProtocol(monitorProtocol);
15             }
16         }
17         appendProperties(monitor);
18         ...
19     }

 

三、消费端加载monitor配置

createProxy(Map<String, String> map)中:

 1                 List<URL> us = loadRegistries(false);
 2                 if (us != null && us.size() > 0) {
 3                     for (URL u : us) {
 4                         URL monitorUrl = loadMonitor(u);
 5                         if (monitorUrl != null) {
 6                             map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
 7                         }
 8                         ...
 9                     }
10                 }

 

四、MonitorFilter收集监控数据

consumer端在发起调用之前会先走filter链;provider端在接收到请求时也是先走filter链,然后才进行真正的业务逻辑处理。默认情况下,在consumer和provider的filter链中都会有Monitorfilter。

  1 /**
  2  * MonitorFilter. (SPI, Singleton, ThreadSafe)
  3  */
  4 @Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
  5 public class MonitorFilter implements Filter {
  6 
  7     private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
  8 
  9     // key: 接口名.方法名 value: 当前的并发数
 10     private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();
 11 
 12     private MonitorFactory monitorFactory;// MonitorFactory$Adaptive
 13 
 14     public void setMonitorFactory(MonitorFactory monitorFactory) {
 15         this.monitorFactory = monitorFactory;
 16     }
 17 
 18     // intercepting invocation
 19     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
 20         if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {// 开启了monitor监控
 21             RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called
 22             String remoteHost = context.getRemoteHost();
 23             long start = System.currentTimeMillis(); // record start timestamp
 24             getConcurrent(invoker, invocation).incrementAndGet(); // 并发数+1
 25             try {
 26                 Result result = invoker.invoke(invocation); // proceed invocation chain
 27                 collect(invoker, invocation, result, remoteHost, start, false);// 收集统计数据
 28                 return result;
 29             } catch (RpcException e) {
 30                 collect(invoker, invocation, null, remoteHost, start, true);// 发生异常时收集统计数据
 31                 throw e;
 32             } finally {
 33                 getConcurrent(invoker, invocation).decrementAndGet(); // 并发数-1
 34             }
 35         } else {
 36             return invoker.invoke(invocation);
 37         }
 38     }
 39 
 40     // collect info
 41     private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
 42         try {
 43             // ---- service statistics ----
 44             long elapsed = System.currentTimeMillis() - start; // 此次调用花费的时间
 45             int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
 46             String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
 47             String service = invoker.getInterface().getName(); // service name
 48             String method = RpcUtils.getMethodName(invocation); // method name
 49             URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);
 50             Monitor monitor = monitorFactory.getMonitor(url);//根据monitorUrl获取Monitor实现(默认使用DubboMonitor)
 51             if (monitor == null) {
 52                 return;
 53             }
 54             int localPort;
 55             String remoteKey;
 56             String remoteValue;
 57             if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {
 58                 // ---- for service consumer ----
 59                 localPort = 0;
 60                 remoteKey = MonitorService.PROVIDER;
 61                 remoteValue = invoker.getUrl().getAddress();
 62             } else {
 63                 // ---- for service provider ----
 64                 localPort = invoker.getUrl().getPort();
 65                 remoteKey = MonitorService.CONSUMER;
 66                 remoteValue = remoteHost;
 67             }
 68             String input = "", output = "";
 69             if (invocation.getAttachment(Constants.INPUT_KEY) != null) {
 70                 input = invocation.getAttachment(Constants.INPUT_KEY);
 71             }
 72             if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {
 73                 output = result.getAttachment(Constants.OUTPUT_KEY);
 74             }
 75             monitor.collect(new URL(Constants.COUNT_PROTOCOL,
 76                     NetUtils.getLocalHost(), localPort,
 77                     service + "/" + method,
 78                     MonitorService.APPLICATION, application,
 79                     MonitorService.INTERFACE, service,
 80                     MonitorService.METHOD, method,
 81                     remoteKey, remoteValue,
 82                     error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",// 成功失败数
 83                     MonitorService.ELAPSED, String.valueOf(elapsed),// 调用消耗的时间
 84                     MonitorService.CONCURRENT, String.valueOf(concurrent),// 并发数
 85                     Constants.INPUT_KEY, input,
 86                     Constants.OUTPUT_KEY, output));
 87         } catch (Throwable t) {
 88             logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
 89         }
 90     }
 91 
 92     // concurrent counter
 93     private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
 94         String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
 95         AtomicInteger concurrent = concurrents.get(key);
 96         if (concurrent == null) {
 97             concurrents.putIfAbsent(key, new AtomicInteger());
 98             concurrent = concurrents.get(key);
 99         }
100         return concurrent;
101     }
102 
103 }

调用之前,记录调用开始时间、并发数,之后进行调用,最后进行统计数据收集:

  • 获取计算各种统计数据(调用消耗时间、调用成功/错误数等)
  • 使用MonitorFactory获取Monitor
  • 将统计数据构造成url
  • 使用Monitor收集这些统计数据

获取Monitor的源码后续再说。这里获取到的是DubboMonitor实例。


免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击