此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
监控总体图:
红色:监控中心 - dubbo-simple-monitor
黄色:provider
蓝色:consumer
统计总体流程:
注意:
一、dubbo-monitor使用
在配置文件中添加:
1 <dubbo:monitor address="10.211.55.5:9090" />
即开启了monitor监控,并且指定了监控中心服务器为“10.211.55.5:9090”。
9090端口是Prometheus的默认端口,dubbo提供的监控中心比较简陋,我们后续会使用Prometheus作为监控中心来存储监控数据。
二、服务端加载monitor配置
doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)中:
1 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 2 ... 3 if (registryURLs != null && registryURLs.size() > 0) { 4 for (URL registryURL : registryURLs) { 5 ... 6 URL monitorUrl = loadMonitor(registryURL); 7 if (monitorUrl != null) { 8 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 9 } 10 ... 11 } 12 } else { 13 ... 14 } 15 }
其中loadMonitor(URL registryURL)方法主要用于创建MonitorConfig对象(如果monitor配置在dubbo.properties中的话),并且设置属性,之后设置到数据总线Url中。
1 protected URL loadMonitor(URL registryURL) { 2 if (monitor == null) { 3 String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address"); 4 String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol"); 5 if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) { 6 return null; 7 } 8 9 monitor = new MonitorConfig(); 10 if (monitorAddress != null && monitorAddress.length() > 0) { 11 monitor.setAddress(monitorAddress); 12 } 13 if (monitorProtocol != null && monitorProtocol.length() > 0) { 14 monitor.setProtocol(monitorProtocol); 15 } 16 } 17 appendProperties(monitor); 18 ... 19 }
三、消费端加载monitor配置
createProxy(Map<String, String> map)中:
1 List<URL> us = loadRegistries(false); 2 if (us != null && us.size() > 0) { 3 for (URL u : us) { 4 URL monitorUrl = loadMonitor(u); 5 if (monitorUrl != null) { 6 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 7 } 8 ... 9 } 10 }
四、MonitorFilter收集监控数据
consumer端在发起调用之前会先走filter链;provider端在接收到请求时也是先走filter链,然后才进行真正的业务逻辑处理。默认情况下,在consumer和provider的filter链中都会有Monitorfilter。
1 /** 2 * MonitorFilter. (SPI, Singleton, ThreadSafe) 3 */ 4 @Activate(group = {Constants.PROVIDER, Constants.CONSUMER}) 5 public class MonitorFilter implements Filter { 6 7 private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class); 8 9 // key: 接口名.方法名 value: 当前的并发数 10 private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>(); 11 12 private MonitorFactory monitorFactory;// MonitorFactory$Adaptive 13 14 public void setMonitorFactory(MonitorFactory monitorFactory) { 15 this.monitorFactory = monitorFactory; 16 } 17 18 // intercepting invocation 19 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { 20 if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {// 开启了monitor监控 21 RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called 22 String remoteHost = context.getRemoteHost(); 23 long start = System.currentTimeMillis(); // record start timestamp 24 getConcurrent(invoker, invocation).incrementAndGet(); // 并发数+1 25 try { 26 Result result = invoker.invoke(invocation); // proceed invocation chain 27 collect(invoker, invocation, result, remoteHost, start, false);// 收集统计数据 28 return result; 29 } catch (RpcException e) { 30 collect(invoker, invocation, null, remoteHost, start, true);// 发生异常时收集统计数据 31 throw e; 32 } finally { 33 getConcurrent(invoker, invocation).decrementAndGet(); // 并发数-1 34 } 35 } else { 36 return invoker.invoke(invocation); 37 } 38 } 39 40 // collect info 41 private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { 42 try { 43 // ---- service statistics ---- 44 long elapsed = System.currentTimeMillis() - start; // 此次调用花费的时间 45 int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count 46 String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY); 47 String service = invoker.getInterface().getName(); // service name 48 String method = RpcUtils.getMethodName(invocation); // method name 49 URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY); 50 Monitor monitor = monitorFactory.getMonitor(url);//根据monitorUrl获取Monitor实现(默认使用DubboMonitor) 51 if (monitor == null) { 52 return; 53 } 54 int localPort; 55 String remoteKey; 56 String remoteValue; 57 if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) { 58 // ---- for service consumer ---- 59 localPort = 0; 60 remoteKey = MonitorService.PROVIDER; 61 remoteValue = invoker.getUrl().getAddress(); 62 } else { 63 // ---- for service provider ---- 64 localPort = invoker.getUrl().getPort(); 65 remoteKey = MonitorService.CONSUMER; 66 remoteValue = remoteHost; 67 } 68 String input = "", output = ""; 69 if (invocation.getAttachment(Constants.INPUT_KEY) != null) { 70 input = invocation.getAttachment(Constants.INPUT_KEY); 71 } 72 if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) { 73 output = result.getAttachment(Constants.OUTPUT_KEY); 74 } 75 monitor.collect(new URL(Constants.COUNT_PROTOCOL, 76 NetUtils.getLocalHost(), localPort, 77 service + "/" + method, 78 MonitorService.APPLICATION, application, 79 MonitorService.INTERFACE, service, 80 MonitorService.METHOD, method, 81 remoteKey, remoteValue, 82 error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",// 成功失败数 83 MonitorService.ELAPSED, String.valueOf(elapsed),// 调用消耗的时间 84 MonitorService.CONCURRENT, String.valueOf(concurrent),// 并发数 85 Constants.INPUT_KEY, input, 86 Constants.OUTPUT_KEY, output)); 87 } catch (Throwable t) { 88 logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); 89 } 90 } 91 92 // concurrent counter 93 private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) { 94 String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); 95 AtomicInteger concurrent = concurrents.get(key); 96 if (concurrent == null) { 97 concurrents.putIfAbsent(key, new AtomicInteger()); 98 concurrent = concurrents.get(key); 99 } 100 return concurrent; 101 } 102 103 }
调用之前,记录调用开始时间、并发数,之后进行调用,最后进行统计数据收集:
获取Monitor的源码后续再说。这里获取到的是DubboMonitor实例。
更多网易技术、产品、运营经验分享请点击。