此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
再来看FailbackRegistry:
只做了一件事,启动了一个含有一个名为DubboRegistryFailedRetryTimer的后台线程的ScheduledThreadPool,线程创建5s后开始第一次执行retry(),之后每隔5s执行一次。来看一下retry()
1 /** 2 * 将所有注册失败的url(failedRegistered中的url)进行注册,之后从failedRegistered进行移除; 3 * 将所有反注册失败的url(failedUnregistered中的url)进行反注册,之后从failedUnregistered进行移除; 4 * 将所有订阅失败的url(failedSubscribed中的url)进行重新订阅,之后从failedSubscribed进行移除; 5 * 将所有反订阅失败的url(failedUnsubscribed中的url)进行反订阅,之后从failedUnsubscribed进行移除; 6 * 将所有通知失败的url(failedNotified中的url)进行通知,之后从failedNotified进行移除; 7 */ 8 protected void retry() { 9 if (!failedRegistered.isEmpty()) { 10 Set<URL> failed = new HashSet<URL>(failedRegistered); 11 if (failed.size() > 0) { 12 if (logger.isInfoEnabled()) { 13 logger.info("Retry register " + failed); 14 } 15 try { 16 for (URL url : failed) { 17 try { 18 doRegister(url); 19 failedRegistered.remove(url); 20 } catch (Throwable t) { // 忽略所有异常,等待下次重试 21 logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); 22 } 23 } 24 } catch (Throwable t) { // 忽略所有异常,等待下次重试 25 logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); 26 } 27 } 28 } 29 if (!failedUnregistered.isEmpty()) { 30 Set<URL> failed = new HashSet<URL>(failedUnregistered); 31 if (failed.size() > 0) { 32 if (logger.isInfoEnabled()) { 33 logger.info("Retry unregister " + failed); 34 } 35 try { 36 for (URL url : failed) { 37 try { 38 doUnregister(url); 39 failedUnregistered.remove(url); 40 } catch (Throwable t) { // 忽略所有异常,等待下次重试 41 logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); 42 } 43 } 44 } catch (Throwable t) { // 忽略所有异常,等待下次重试 45 logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); 46 } 47 } 48 } 49 if (!failedSubscribed.isEmpty()) { 50 Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed); 51 for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) { 52 if (entry.getValue() == null || entry.getValue().size() == 0) { 53 failed.remove(entry.getKey()); 54 } 55 } 56 if (failed.size() > 0) { 57 if (logger.isInfoEnabled()) { 58 logger.info("Retry subscribe " + failed); 59 } 60 try { 61 for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { 62 URL url = entry.getKey(); 63 Set<NotifyListener> listeners = entry.getValue(); 64 for (NotifyListener listener : listeners) { 65 try { 66 doSubscribe(url, listener);//listener需要一个一个订阅,每订阅一个,就将该listener从当前的url监听列表中移除 67 listeners.remove(listener); 68 } catch (Throwable t) { // 忽略所有异常,等待下次重试 69 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 70 } 71 } 72 } 73 } catch (Throwable t) { // 忽略所有异常,等待下次重试 74 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 75 } 76 } 77 } 78 if (!failedUnsubscribed.isEmpty()) { 79 Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed); 80 for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) { 81 if (entry.getValue() == null || entry.getValue().size() == 0) { 82 failed.remove(entry.getKey()); 83 } 84 } 85 if (failed.size() > 0) { 86 if (logger.isInfoEnabled()) { 87 logger.info("Retry unsubscribe " + failed); 88 } 89 try { 90 for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { 91 URL url = entry.getKey(); 92 Set<NotifyListener> listeners = entry.getValue(); 93 for (NotifyListener listener : listeners) { 94 try { 95 doUnsubscribe(url, listener);//listener需要一个一个反订阅,每反订阅一个,就将该listener从当前的url监听列表中移除 96 listeners.remove(listener); 97 } catch (Throwable t) { // 忽略所有异常,等待下次重试 98 logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 99 } 100 } 101 } 102 } catch (Throwable t) { // 忽略所有异常,等待下次重试 103 logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 104 } 105 } 106 } 107 if (!failedNotified.isEmpty()) { 108 Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified); 109 for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) { 110 if (entry.getValue() == null || entry.getValue().size() == 0) { 111 failed.remove(entry.getKey()); 112 } 113 } 114 if (failed.size() > 0) { 115 if (logger.isInfoEnabled()) { 116 logger.info("Retry notify " + failed); 117 } 118 try { 119 for (Map<NotifyListener, List<URL>> values : failed.values()) { 120 for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) { 121 try { 122 NotifyListener listener = entry.getKey(); 123 List<URL> urls = entry.getValue(); 124 listener.notify(urls); 125 values.remove(listener); 126 } catch (Throwable t) { // 忽略所有异常,等待下次重试 127 logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t); 128 } 129 } 130 } 131 } catch (Throwable t) { // 忽略所有异常,等待下次重试 132 logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t); 133 } 134 } 135 } 136 }
最后回到我们的主角:ZookeeperRegistry
首先是为属性设置root=/dubbo,之后创建zk客户端,启动会话,最后创建了一个StateListener监听器,监听重新连接成功事件,重新连接成功后,之前已经完成注册和订阅的url要重新进行注册和订阅(因为临时节点可能已经跪了)。
来看创建zk客户端,启动会话的代码,这是此处最核心的部分:
ZookeeperTransporter$Adaptive.connect(com.alibaba.dubbo.common.URL registryUrl)
1 public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) { 2 if (arg0 == null) 3 throw new IllegalArgumentException("url == null"); 4 com.alibaba.dubbo.common.URL url = arg0; 5 String extName = url.getParameter("client", url.getParameter("transporter", "zkclient"));//curator 6 if(extName == null) 7 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])"); 8 com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName); 9 return extension.connect(arg0); 10 }
这里创建的extension是CuratorZookeeperTransporter实例。
1 public class CuratorZookeeperTransporter implements ZookeeperTransporter { 2 public ZookeeperClient connect(URL url) { 3 return new CuratorZookeeperClient(url); 4 } 5 }
new CuratorZookeeperClient(registryUrl)
1 private final CuratorFramework client; 2 3 public CuratorZookeeperClient(URL url) { 4 super(url); 5 try { 6 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() 7 .connectString(url.getBackupAddress()) 8 .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000)) 9 .connectionTimeoutMs(5000); 10 String authority = url.getAuthority(); 11 if (authority != null && authority.length() > 0) { 12 builder = builder.authorization("digest", authority.getBytes()); 13 } 14 client = builder.build(); 15 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { 16 public void stateChanged(CuratorFramework client, ConnectionState state) { 17 if (state == ConnectionState.LOST) { 18 CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); 19 } else if (state == ConnectionState.CONNECTED) { 20 CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); 21 } else if (state == ConnectionState.RECONNECTED) { 22 CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); 23 } 24 } 25 }); 26 client.start(); 27 } catch (Exception e) { 28 throw new IllegalStateException(e.getMessage(), e); 29 } 30 }
这里首先执行父类AbstractZookeeperClient的构造器来初始化一些参数,之后创建CuratorFramework客户端,然后添加了ConnectionStateListener监听器,监听连接断开/连接成功/重新连接成功事件,之后作出相应的操作(实际上这里只有重新连接成功事件会被处理,而处理器实际上就是ZookeeperRegistry构造器中的那个执行recover()的StateListener),
protected void stateChanged(int state) { for (StateListener sessionListener : getSessionListeners()) { sessionListener.stateChanged(state);//此处查找实现类,只有ZookeeperRegistry构造器中的那个StateListener } }
最后阻塞,直到创建会话完成。
来看一下父类AbstractZookeeperClient:
1 private final URL url; 2 private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>(); 3 private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>(); 4 private volatile boolean closed = false; 5 6 public AbstractZookeeperClient(URL url) { 7 this.url = url; 8 }
说明:
至此,一个完整的ZookeeperRegistry实例就创建完成了,来看一下属性:
还有一个定时线程:DubboRegistryFailedRetryTimer每隔5s执行一次retry(),进行失败重试。
最后,该ZookeeperRegistry会存储在ZookeeperRegistry的父类的static属性Map<String, Registry> REGISTRIES中:
Map<String, Registry> REGISTRIES:{ "zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService" : ZookeeperRegistry实例 }
二 获取真正要注册到zk的节点url
1 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
1 /** 2 * 1 获取originInvoker的export参数值:就是providerUrl 3 * 2 去除providerUrl中所有参数名是"."开头的,然后去除参数monitor 4 */ 5 private URL getRegistedProviderUrl(final Invoker<?> originInvoker) { 6 URL providerUrl = getProviderUrl(originInvoker); 7 //注册中心看到的地址 8 final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameter(Constants.MONITOR_KEY); 9 return registedProviderUrl; 10 } 11 12 /** 13 * 从invoker的URL中的Map<String, String> parameters中获取key为export的地址providerUrl: 14 */ 15 private URL getProviderUrl(final Invoker<?> origininvoker) { 16 String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); 17 if (export == null || export.length() == 0) { 18 throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl()); 19 } 20 URL providerUrl = URL.valueOf(export); 21 return providerUrl; 22 } 23 24 //过滤URL中不需要输出的参数(以点号开头的) 25 private static String[] getFilteredKeys(URL url) { 26 Map<String, String> params = url.getParameters(); 27 if (params != null && !params.isEmpty()) { 28 List<String> filteredKeys = new ArrayList<String>(); 29 for (Map.Entry<String, String> entry : params.entrySet()) { 30 if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) { 31 filteredKeys.add(entry.getKey()); 32 } 33 } 34 return filteredKeys.toArray(new String[filteredKeys.size()]); 35 } else { 36 return new String[]{}; 37 } 38 }
最后得到的registedProviderUrl是:
三 注册服务到zk
registry.register(registedProviderUrl);//创建节点(即注册服务到zk上)
这里的registry是ZookeeperRegistry。register(registedProviderUrl)方法在ZookeeperRegistry的父类FailbackRegistry中实现。
1 FailbackRegistry.register(registedProviderUrl)
1 @Override 2 public void register(URL url) { 3 if (destroyed.get()){ 4 return; 5 } 6 super.register(url); 7 failedRegistered.remove(url); 8 failedUnregistered.remove(url); 9 try { 10 // 向服务器端发送注册请求 11 doRegister(url); 12 } catch (Exception e) { 13 Throwable t = e; 14 // 如果开启了启动时检测check=true,则直接抛出异常,不会加入到failedRegistered中 15 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 16 && url.getParameter(Constants.CHECK_KEY, true) 17 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 18 boolean skipFailback = t instanceof SkipFailbackWrapperException; 19 if (check || skipFailback) { 20 if (skipFailback) { 21 t = t.getCause(); 22 } 23 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); 24 } else { 25 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); 26 } 27 // 将失败的注册请求记录到失败列表,定时重试 28 failedRegistered.add(url); 29 } 30 }
首先调用父类AbstractRegistry的register(registedProviderUrl)将当前的registeredProviderUrl放到Set<URL> registered属性中,如下:
1 public void register(URL url) { 2 if (url == null) { 3 throw new IllegalArgumentException("register url == null"); 4 } 5 if (logger.isInfoEnabled()) { 6 logger.info("Register: " + url); 7 } 8 registered.add(url); 9 }
之后,从failedRegistered和failedUnregistered两个url集合中删除该url。然后执行真正的服务注册(创建节点,doRegister(url)),如果在创建过程中抛出异常,如果url的协议不是consumer并且开启了check=true的属性并且当前存储的URL registryUrl也有check=true的话,那么直接抛出异常,不会将该url加入到failedRegistered集合;当然抛出的异常如果是SkipFailbackWrapperException,那么也会直接抛出异常,不会将该url加入到failedRegistered集合。否则,会将该url加入到failedRegistered集合,然后DubboRegistryFailedRetryTimer线程会每隔5s执行一次doRegister(url)。
我们来看真正doRegister(url)。
2 ZookeeperRegistry.doRegister(registedProviderUrl)
1 protected void doRegister(URL url) { 2 try { 3 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 }
首先是对入参registedProviderUrl进行一顿处理,
1 private String toUrlPath(URL url) { 2 return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); 3 } 4 5 private String toCategoryPath(URL url) { 6 return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 7 } 8 9 private String toServicePath(URL url) { 10 String name = url.getServiceInterface(); 11 if (Constants.ANY_VALUE.equals(name)) { 12 return toRootPath(); 13 } 14 return toRootDir() + URL.encode(name);// /dubbo/com.alibaba.dubbo.demo.DemoService 15 } 16 17 private String toRootDir() { 18 if (root.equals(Constants.PATH_SEPARATOR)) { 19 return root; 20 } 21 return root + Constants.PATH_SEPARATOR;// /dubbo/ 22 } 23 24 private String toRootPath() { 25 return root; 26 }
这里就体现了上边的ZookeeperRegistry的root属性的作用。最终实际上得到的是:/dubbo/interface/category/encode过的export,该节点也将是创建在zk上的节点。
最终得到的url是:
最后执行zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true))来创建节点,该方法由CuratorZookeeperClient的父类AbstractZookeeperClient来执行:
1 public void create(String path, boolean ephemeral) { 2 int i = path.lastIndexOf('/'); 3 if (i > 0) { 4 create(path.substring(0, i), false); 5 } 6 if (ephemeral) { 7 createEphemeral(path); 8 } else { 9 createPersistent(path); 10 } 11 }
这里实际上是通过递归分别创建持久化的/dubbo,/dubbo/com.alibaba.dubbo.demo.DemoService以及/dubbo/com.alibaba.dubbo.demo.DemoService/providers节点;最后创建临时节点/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629,而实际上,如果使用了curator的话,可以直接使用递归创建节点即可(结合zk的特性,只有最后一个字节点可以是临时节点,父节点一定是持久化节点),这里这样的写法应该是兼容不能递归创建节点的Zkclient客户端。值得注意的是,url.getParameter(Constants.DYNAMIC_KEY, true)为true则最终创建的节点是临时节点,否则是持久化节点。
创建节点的操作是在CuratorZookeeperClient中进行的。
1 public void createPersistent(String path) { 2 try { 3 client.create().forPath(path); 4 } catch (NodeExistsException e) { 5 } catch (Exception e) { 6 throw new IllegalStateException(e.getMessage(), e); 7 } 8 } 9 10 public void createEphemeral(String path) { 11 try { 12 client.create().withMode(CreateMode.EPHEMERAL).forPath(path); 13 } catch (NodeExistsException e) { 14 } catch (Exception e) { 15 throw new IllegalStateException(e.getMessage(), e); 16 } 17 }
到此为止,我们去zk上看一下节点的创建情况。
或者从zkui上看一下:
隐藏掉的是ip:10.10.10.10。
到目前为止,我们再来看看ZookeeperRegistry的属性变化。相较于注册前:
更多网易技术、产品、运营经验分享请点击。