此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
首先是一个for循环对传入的url列表进行分类,分类结果如下:
Map<String, List<URL>> result:
{ configurators=[ empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509 ], routers=[ empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509 ], providers=[ dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=318&revision=2.5.7&side=provider×tamp=1510225244315, dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25215&side=provider×tamp=1510225334486 ] }
之后执行第二个for循环,对上述的result进行遍历,分别进行保存文件和通知。其中前两个entry没做什么核心事,直接来看providers的entry的通知。代码RegistryDirectory.
notify(List<URL> urls)。这里的urls就是上边的providers的两个value值。
1 public synchronized void notify(List<URL> urls) { 2 List<URL> invokerUrls = new ArrayList<URL>(); 3 List<URL> routerUrls = new ArrayList<URL>(); 4 List<URL> configuratorUrls = new ArrayList<URL>(); 5 for (URL url : urls) { 6 String protocol = url.getProtocol(); 7 String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 8 if (Constants.ROUTERS_CATEGORY.equals(category) 9 || Constants.ROUTE_PROTOCOL.equals(protocol)) { 10 routerUrls.add(url); 11 } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 12 || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { 13 configuratorUrls.add(url); 14 } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { 15 invokerUrls.add(url); 16 } else { 17 logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); 18 } 19 } 20 // configurators 21 if (configuratorUrls != null && configuratorUrls.size() > 0) { 22 this.configurators = toConfigurators(configuratorUrls); 23 } 24 // routers 25 if (routerUrls != null && routerUrls.size() > 0) { 26 List<Router> routers = toRouters(routerUrls); 27 if (routers != null) { // null - do nothing 28 setRouters(routers); 29 } 30 } 31 List<Configurator> localConfigurators = this.configurators; // local reference 32 // 合并override参数 33 this.overrideDirectoryUrl = directoryUrl; 34 if (localConfigurators != null && localConfigurators.size() > 0) { 35 for (Configurator configurator : localConfigurators) { 36 this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); 37 } 38 } 39 // providers 40 refreshInvoker(invokerUrls); 41 }
这里首先将输入的两个provider的url存放在invokerUrls列表中,之后调用refreshInvoker(invokerUrls)。
1 private void refreshInvoker(List<URL> invokerUrls) { 2 ... 3 Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference 4 ... 5 this.cachedInvokerUrls = new HashSet<URL>(); 6 this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比 7 ... 8 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表 9 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表 10 this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; 11 this.urlInvokerMap = newUrlInvokerMap; 12 ... 13 }
1 private Map<String, Invoker<T>> toInvokers(List<URL> urls) { 2 Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); 3 ... 4 for (URL providerUrl : urls) { 5 String key = url.toFullString(); // URL参数是排序的 6 ... 7 Invoker<T> invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); 8 9 newUrlInvokerMap.put(key, invoker); 10 } 11 ... 12 return newUrlInvokerMap; 13 }
这里会遍历两个providerUrl:protocol是Protocol$Adaptive实例,依旧是走listener->filter->DubboProtocol,看一下filter部分:
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 3 return protocol.refer(type, url); 4 } 5 return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); 6 }
两个常量是:reference.filter和consumer。最后来看DubboProtocol.refer
1 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { 2 // create rpc invoker. 3 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); 4 invokers.add(invoker); 5 return invoker; 6 }
这里首先执行getClients创建Netty客户端,创建客户端与服务端的长连接,之后封装为DubboInvoker,最后返回。返回之后进行filter链包装该DubboInvoker实例。最后又会使用InvokerDelegete包装带有filter链的DubboInvoker实例。在最后,将该InvokerDelegete实例放置到newUrlInvokerMap缓存中,这就是整个toInvokers(List<URL> urls)的逻辑。最后再将newUrlInvokerMap转换封装到Map<String, List<Invoker<T>>> newMethodInvokerMap缓存中。这就是整个refreshInvoker(List<URL> invokerUrls)的逻辑。执行完成之后,订阅通知就执行完了。
来看一下getClients(url):
1 private ExchangeClient[] getClients(URL url) { 2 //是否共享连接 3 boolean service_share_connect = false; 4 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); 5 //如果connections不配置,则共享连接,否则每服务每连接 6 if (connections == 0) { 7 service_share_connect = true; 8 connections = 1; 9 } 10 11 ExchangeClient[] clients = new ExchangeClient[connections]; 12 for (int i = 0; i < clients.length; i++) { 13 if (service_share_connect) { 14 clients[i] = getSharedClient(url); 15 } else { 16 clients[i] = initClient(url); 17 } 18 } 19 return clients; 20 } 21 22 /** 23 * 获取共享连接 24 */ 25 private ExchangeClient getSharedClient(URL url) { 26 String key = url.getAddress(); 27 ReferenceCountExchangeClient client = referenceClientMap.get(key); 28 if (client != null) { 29 if (!client.isClosed()) { 30 client.incrementAndGetCount(); 31 return client; 32 } else { 33 referenceClientMap.remove(key); 34 } 35 } 36 synchronized (key.intern()) { 37 ExchangeClient exchangeClient = initClient(url); 38 client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); 39 referenceClientMap.put(key, client); 40 ghostClientMap.remove(key); 41 return client; 42 } 43 } 44 45 46 /** 47 * 创建新连接. 48 */ 49 private ExchangeClient initClient(URL url) { 50 51 // client type setting. 52 String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); 53 54 String version = url.getParameter(Constants.DUBBO_VERSION_KEY); 55 boolean compatible = (version != null && version.startsWith("1.0.")); 56 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); 57 //默认开启heartbeat 58 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 59 60 // BIO存在严重性能问题,暂时不允许使用 61 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { 62 throw new RpcException("Unsupported client type: " + str + "," + 63 " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); 64 } 65 66 ExchangeClient client; 67 try { 68 //设置连接应该是lazy的 69 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { 70 client = new LazyConnectExchangeClient(url, requestHandler); 71 } else { 72 client = Exchangers.connect(url, requestHandler); 73 } 74 } catch (RemotingException e) { 75 throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); 76 } 77 return client; 78 }
注意:这里由于使用了共享链接,实际上就是在一个消费者机器和一个服务提供者机器之间只建立一条nio长连接,也可以指定连接数,那样就会建立多条连接。
最后执行到HeaderExchanger.connect(URL url, ExchangeHandler handler)
1 public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 2 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); 3 }
执行Transporters.connect:
1 public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 ChannelHandler handler; 6 if (handlers == null || handlers.length == 0) { 7 handler = new ChannelHandlerAdapter(); 8 } else if (handlers.length == 1) { 9 handler = handlers[0]; 10 } else { 11 handler = new ChannelHandlerDispatcher(handlers); 12 } 13 return getTransporter().connect(url, handler); 14 }
执行NettyTransporter.connect:
1 public Client connect(URL url, ChannelHandler listener) throws RemotingException { 2 return new NettyClient(url, listener); 3 }
1 public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { 2 super(url, wrapChannelHandler(url, handler)); 3 }
这里继续包装handler。和provider一样,6层。之后进行一系列的赋值后,打开netty客户端:
1 protected void doOpen() throws Throwable { 2 NettyHelper.setNettyLoggerFactory(); 3 bootstrap = new ClientBootstrap(channelFactory); 4 // config 5 // @see org.jboss.netty.channel.socket.SocketChannelConfig 6 bootstrap.setOption("keepAlive", true); 7 bootstrap.setOption("tcpNoDelay", true); 8 bootstrap.setOption("connectTimeoutMillis", getTimeout()); 9 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); 10 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 11 public ChannelPipeline getPipeline() { 12 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); 13 ChannelPipeline pipeline = Channels.pipeline(); 14 pipeline.addLast("decoder", adapter.getDecoder()); 15 pipeline.addLast("encoder", adapter.getEncoder()); 16 pipeline.addLast("handler", nettyHandler); 17 return pipeline; 18 } 19 }); 20 }
之后进行连接netty服务端:
1 protected void doConnect() throws Throwable { 2 long start = System.currentTimeMillis(); 3 ChannelFuture future = bootstrap.connect(getConnectAddress()); 4 try { 5 boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); 6 7 if (ret && future.isSuccess()) { 8 Channel newChannel = future.getChannel(); 9 newChannel.setInterestOps(Channel.OP_READ_WRITE); 10 try { 11 // 关闭旧的连接 12 Channel oldChannel = NettyClient.this.channel; // copy reference 13 if (oldChannel != null) { 14 try { 15 if (logger.isInfoEnabled()) { 16 logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); 17 } 18 oldChannel.close(); 19 } finally { 20 NettyChannel.removeChannelIfDisconnected(oldChannel); 21 } 22 } 23 } finally { 24 if (NettyClient.this.isClosed()) { 25 try { 26 if (logger.isInfoEnabled()) { 27 logger.info("Close new netty channel " + newChannel + ", because the client closed."); 28 } 29 newChannel.close(); 30 } finally { 31 NettyClient.this.channel = null; 32 NettyChannel.removeChannelIfDisconnected(newChannel); 33 } 34 } else { 35 NettyClient.this.channel = newChannel; 36 } 37 } 38 } else if (future.getCause() != null) { 39 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " 40 + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause()); 41 } else { 42 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " 43 + getRemoteAddress() + " client-side timeout " 44 + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " 45 + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); 46 } 47 } finally { 48 if (!isConnected()) { 49 future.cancel(); 50 } 51 } 52 }
到此为止NettyClient就创建好了,之后将该client封装为HeaderExchangeClient中。
1 public HeaderExchangeClient(Client client, boolean needHeartbeat) { 2 if (client == null) { 3 throw new IllegalArgumentException("client == null"); 4 } 5 this.client = client; 6 this.channel = new HeaderExchangeChannel(client); 7 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); 8 this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); 9 this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); 10 if (heartbeatTimeout < heartbeat * 2) { 11 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); 12 } 13 if (needHeartbeat) { 14 startHeatbeatTimer(); 15 } 16 }
启动心跳。
最后将HeaderExchangeClient实例封装为ReferenceCountExchangeClient:
1 public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) { 2 this.client = client; 3 refenceCount.incrementAndGet(); 4 this.url = client.getUrl(); 5 if (ghostClientMap == null) { 6 throw new IllegalStateException("ghostClientMap can not be null, url: " + url); 7 } 8 this.ghostClientMap = ghostClientMap; 9 }
最后放到缓存Map<String, ReferenceCountExchangeClient> referenceClientMap中。最后将ReferenceCountExchangeClient封装到DubboInvoker中。我们来看此时的DubboInvoker:
-->Map<String, String> attachment: {interface=com.alibaba.dubbo.demo.DemoService} -->ExchangeClient[] clients:[ReferenceCountExchangeClient实例]//如果设置了多条连接,此处有多个client -->Class<T> type: interface com.alibaba.dubbo.demo.DemoService -->Url url: dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&remote.timestamp=1510225244315&revision=2.5.7&side=consumer×tamp=1510225913509
更多网易技术、产品、运营经验分享请点击。