服务远程暴露 - 注册服务到zookeeper(2)

勿忘初心2018-11-18 11:48

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


再来看FailbackRegistry:

只做了一件事,启动了一个含有一个名为DubboRegistryFailedRetryTimer的后台线程的ScheduledThreadPool,线程创建5s后开始第一次执行retry(),之后每隔5s执行一次。来看一下retry()

  1     /**
  2      * 将所有注册失败的url(failedRegistered中的url)进行注册,之后从failedRegistered进行移除;
  3      * 将所有反注册失败的url(failedUnregistered中的url)进行反注册,之后从failedUnregistered进行移除;
  4      * 将所有订阅失败的url(failedSubscribed中的url)进行重新订阅,之后从failedSubscribed进行移除;
  5      * 将所有反订阅失败的url(failedUnsubscribed中的url)进行反订阅,之后从failedUnsubscribed进行移除;
  6      * 将所有通知失败的url(failedNotified中的url)进行通知,之后从failedNotified进行移除;
  7      */
  8     protected void retry() {
  9         if (!failedRegistered.isEmpty()) {
 10             Set<URL> failed = new HashSet<URL>(failedRegistered);
 11             if (failed.size() > 0) {
 12                 if (logger.isInfoEnabled()) {
 13                     logger.info("Retry register " + failed);
 14                 }
 15                 try {
 16                     for (URL url : failed) {
 17                         try {
 18                             doRegister(url);
 19                             failedRegistered.remove(url);
 20                         } catch (Throwable t) { // 忽略所有异常,等待下次重试
 21                             logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 22                         }
 23                     }
 24                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
 25                     logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 26                 }
 27             }
 28         }
 29         if (!failedUnregistered.isEmpty()) {
 30             Set<URL> failed = new HashSet<URL>(failedUnregistered);
 31             if (failed.size() > 0) {
 32                 if (logger.isInfoEnabled()) {
 33                     logger.info("Retry unregister " + failed);
 34                 }
 35                 try {
 36                     for (URL url : failed) {
 37                         try {
 38                             doUnregister(url);
 39                             failedUnregistered.remove(url);
 40                         } catch (Throwable t) { // 忽略所有异常,等待下次重试
 41                             logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 42                         }
 43                     }
 44                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
 45                     logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 46                 }
 47             }
 48         }
 49         if (!failedSubscribed.isEmpty()) {
 50             Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
 51             for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
 52                 if (entry.getValue() == null || entry.getValue().size() == 0) {
 53                     failed.remove(entry.getKey());
 54                 }
 55             }
 56             if (failed.size() > 0) {
 57                 if (logger.isInfoEnabled()) {
 58                     logger.info("Retry subscribe " + failed);
 59                 }
 60                 try {
 61                     for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
 62                         URL url = entry.getKey();
 63                         Set<NotifyListener> listeners = entry.getValue();
 64                         for (NotifyListener listener : listeners) {
 65                             try {
 66                                 doSubscribe(url, listener);//listener需要一个一个订阅,每订阅一个,就将该listener从当前的url监听列表中移除
 67                                 listeners.remove(listener);
 68                             } catch (Throwable t) { // 忽略所有异常,等待下次重试
 69                                 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 70                             }
 71                         }
 72                     }
 73                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
 74                     logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 75                 }
 76             }
 77         }
 78         if (!failedUnsubscribed.isEmpty()) {
 79             Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
 80             for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
 81                 if (entry.getValue() == null || entry.getValue().size() == 0) {
 82                     failed.remove(entry.getKey());
 83                 }
 84             }
 85             if (failed.size() > 0) {
 86                 if (logger.isInfoEnabled()) {
 87                     logger.info("Retry unsubscribe " + failed);
 88                 }
 89                 try {
 90                     for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
 91                         URL url = entry.getKey();
 92                         Set<NotifyListener> listeners = entry.getValue();
 93                         for (NotifyListener listener : listeners) {
 94                             try {
 95                                 doUnsubscribe(url, listener);//listener需要一个一个反订阅,每反订阅一个,就将该listener从当前的url监听列表中移除
 96                                 listeners.remove(listener);
 97                             } catch (Throwable t) { // 忽略所有异常,等待下次重试
 98                                 logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 99                             }
100                         }
101                     }
102                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
103                     logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
104                 }
105             }
106         }
107         if (!failedNotified.isEmpty()) {
108             Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
109             for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
110                 if (entry.getValue() == null || entry.getValue().size() == 0) {
111                     failed.remove(entry.getKey());
112                 }
113             }
114             if (failed.size() > 0) {
115                 if (logger.isInfoEnabled()) {
116                     logger.info("Retry notify " + failed);
117                 }
118                 try {
119                     for (Map<NotifyListener, List<URL>> values : failed.values()) {
120                         for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
121                             try {
122                                 NotifyListener listener = entry.getKey();
123                                 List<URL> urls = entry.getValue();
124                                 listener.notify(urls);
125                                 values.remove(listener);
126                             } catch (Throwable t) { // 忽略所有异常,等待下次重试
127                                 logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
128                             }
129                         }
130                     }
131                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
132                     logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
133                 }
134             }
135         }
136     }

最后回到我们的主角:ZookeeperRegistry

首先是为属性设置root=/dubbo,之后创建zk客户端,启动会话,最后创建了一个StateListener监听器,监听重新连接成功事件,重新连接成功后,之前已经完成注册和订阅的url要重新进行注册和订阅(因为临时节点可能已经跪了)。

来看创建zk客户端,启动会话的代码,这是此处最核心的部分:

ZookeeperTransporter$Adaptive.connect(com.alibaba.dubbo.common.URL registryUrl)

 1     public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg0;
 5         String extName = url.getParameter("client", url.getParameter("transporter", "zkclient"));//curator
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])");
 8         com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName);
 9         return extension.connect(arg0);
10     }

这里创建的extension是CuratorZookeeperTransporter实例。

1 public class CuratorZookeeperTransporter implements ZookeeperTransporter {
2     public ZookeeperClient connect(URL url) {
3         return new CuratorZookeeperClient(url);
4     }
5 }

new CuratorZookeeperClient(registryUrl)

 1     private final CuratorFramework client;
 2 
 3     public CuratorZookeeperClient(URL url) {
 4         super(url);
 5         try {
 6             CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
 7                     .connectString(url.getBackupAddress())
 8                     .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
 9                     .connectionTimeoutMs(5000);
10             String authority = url.getAuthority();
11             if (authority != null && authority.length() > 0) {
12                 builder = builder.authorization("digest", authority.getBytes());
13             }
14             client = builder.build();
15             client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
16                 public void stateChanged(CuratorFramework client, ConnectionState state) {
17                     if (state == ConnectionState.LOST) {
18                         CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
19                     } else if (state == ConnectionState.CONNECTED) {
20                         CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
21                     } else if (state == ConnectionState.RECONNECTED) {
22                         CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
23                     }
24                 }
25             });
26             client.start();
27         } catch (Exception e) {
28             throw new IllegalStateException(e.getMessage(), e);
29         }
30     }

这里首先执行父类AbstractZookeeperClient的构造器来初始化一些参数,之后创建CuratorFramework客户端,然后添加了ConnectionStateListener监听器,监听连接断开/连接成功/重新连接成功事件,之后作出相应的操作(实际上这里只有重新连接成功事件会被处理,而处理器实际上就是ZookeeperRegistry构造器中的那个执行recover()的StateListener),

    protected void stateChanged(int state) {
        for (StateListener sessionListener : getSessionListeners()) {
            sessionListener.stateChanged(state);//此处查找实现类,只有ZookeeperRegistry构造器中的那个StateListener
        }
    }

最后阻塞,直到创建会话完成。

来看一下父类AbstractZookeeperClient

1     private final URL url;
2     private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
3     private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
4     private volatile boolean closed = false;
5 
6     public AbstractZookeeperClient(URL url) {
7         this.url = url;
8     }

说明:

  • 设置属性url=registryUrl:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
  • 创建了一个Set<StateListener> stateListeners,ZookeeperRegistry构造器中的那个执行recover()的StateListener就将会放在这里

 

至此,一个完整的ZookeeperRegistry实例就创建完成了,来看一下属性:

  • ZookeeperClient zkClient = CuratorZookeeperClient实例
    • CuratorFramework client:CuratorFrameworkImpl实例
    • String url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
    • Set<StateListener> stateListeners:{ 监听了重连成功事件的执行recover()的StateListener }
  • String root="/dubbo"
  • URL registryUrl = zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
  • Set<URL> registered:0//已经注册的url集合,此处为空
  • ConcurrentMap<URL, Set<NotifyListener>> subscribed:0//已经订阅的<URL, Set<NotifyListener>>
  • ConcurrentMap<URL, Map<String, List<URL>>> notified:0//已经通知的<URL, Map<String, List<URL>>>
  • Set<URL> failedRegistered:0//注册失败的url
  • Set<URL> failedUnregistered:0//反注册失败的url
  • ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed:0//订阅失败的url
  • ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed:0//反订阅失败的url
  • ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified:0//通知失败的url
  • ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners:0

还有一个定时线程:DubboRegistryFailedRetryTimer每隔5s执行一次retry(),进行失败重试。

最后,该ZookeeperRegistry会存储在ZookeeperRegistry的父类的static属性Map<String, Registry> REGISTRIES中:

Map<String, Registry> REGISTRIES:{ "zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService" : ZookeeperRegistry实例 }

 

二  获取真正要注册到zk的节点url

1 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
 1     /**
 2      * 1 获取originInvoker的export参数值:就是providerUrl
 3      * 2 去除providerUrl中所有参数名是"."开头的,然后去除参数monitor
 4      */
 5     private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
 6         URL providerUrl = getProviderUrl(originInvoker);
 7         //注册中心看到的地址
 8         final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameter(Constants.MONITOR_KEY);
 9         return registedProviderUrl;
10     }
11 
12     /**
13      * 从invoker的URL中的Map<String, String> parameters中获取key为export的地址providerUrl:
14      */
15     private URL getProviderUrl(final Invoker<?> origininvoker) {
16         String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
17         if (export == null || export.length() == 0) {
18             throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
19         }
20         URL providerUrl = URL.valueOf(export);
21         return providerUrl;
22     }
23 
24     //过滤URL中不需要输出的参数(以点号开头的)
25     private static String[] getFilteredKeys(URL url) {
26         Map<String, String> params = url.getParameters();
27         if (params != null && !params.isEmpty()) {
28             List<String> filteredKeys = new ArrayList<String>();
29             for (Map.Entry<String, String> entry : params.entrySet()) {
30                 if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) {
31                     filteredKeys.add(entry.getKey());
32                 }
33             }
34             return filteredKeys.toArray(new String[filteredKeys.size()]);
35         } else {
36             return new String[]{};
37         }
38     }

最后得到的registedProviderUrl是:

  • dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=4758&side=provider&timestamp=1507289961588

 

三  注册服务到zk

registry.register(registedProviderUrl);//创建节点(即注册服务到zk上)

这里的registry是ZookeeperRegistry。register(registedProviderUrl)方法在ZookeeperRegistry的父类FailbackRegistry中实现。

1  FailbackRegistry.register(registedProviderUrl)

 1     @Override
 2     public void register(URL url) {
 3         if (destroyed.get()){
 4             return;
 5         }
 6         super.register(url);
 7         failedRegistered.remove(url);
 8         failedUnregistered.remove(url);
 9         try {
10             // 向服务器端发送注册请求
11             doRegister(url);
12         } catch (Exception e) {
13             Throwable t = e;
14             // 如果开启了启动时检测check=true,则直接抛出异常,不会加入到failedRegistered中
15             boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
16                     && url.getParameter(Constants.CHECK_KEY, true)
17                     && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
18             boolean skipFailback = t instanceof SkipFailbackWrapperException;
19             if (check || skipFailback) {
20                 if (skipFailback) {
21                     t = t.getCause();
22                 }
23                 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
24             } else {
25                 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
26             }
27             // 将失败的注册请求记录到失败列表,定时重试
28             failedRegistered.add(url);
29         }
30     }

首先调用父类AbstractRegistry的register(registedProviderUrl)将当前的registeredProviderUrl放到Set<URL> registered属性中,如下:

1     public void register(URL url) {
2         if (url == null) {
3             throw new IllegalArgumentException("register url == null");
4         }
5         if (logger.isInfoEnabled()) {
6             logger.info("Register: " + url);
7         }
8         registered.add(url);
9     }

之后,从failedRegistered和failedUnregistered两个url集合中删除该url。然后执行真正的服务注册(创建节点,doRegister(url)),如果在创建过程中抛出异常,如果url的协议不是consumer并且开启了check=true的属性并且当前存储的URL registryUrl也有check=true的话,那么直接抛出异常,不会将该url加入到failedRegistered集合;当然抛出的异常如果是SkipFailbackWrapperException,那么也会直接抛出异常,不会将该url加入到failedRegistered集合。否则,会将该url加入到failedRegistered集合,然后DubboRegistryFailedRetryTimer线程会每隔5s执行一次doRegister(url)。

 

我们来看真正doRegister(url)。

2  ZookeeperRegistry.doRegister(registedProviderUrl)

1     protected void doRegister(URL url) {
2         try {
3             zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
4         } catch (Throwable e) {
5             throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
6         }
7     }

首先是对入参registedProviderUrl进行一顿处理,

 1     private String toUrlPath(URL url) {
 2         return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
 3     }
 4 
 5     private String toCategoryPath(URL url) {
 6         return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
 7     }
 8 
 9     private String toServicePath(URL url) {
10         String name = url.getServiceInterface();
11         if (Constants.ANY_VALUE.equals(name)) {
12             return toRootPath();
13         }
14         return toRootDir() + URL.encode(name);// /dubbo/com.alibaba.dubbo.demo.DemoService
15     }
16 
17     private String toRootDir() {
18         if (root.equals(Constants.PATH_SEPARATOR)) {
19             return root;
20         }
21         return root + Constants.PATH_SEPARATOR;// /dubbo/
22     }
23 
24     private String toRootPath() {
25         return root;
26     }

这里就体现了上边的ZookeeperRegistry的root属性的作用。最终实际上得到的是:/dubbo/interface/category/encode过的export,该节点也将是创建在zk上的节点。

  • /dubbo是根节点
  • /interface是服务接口
  • /category是providers/consumers/routers/configurators等

最终得到的url是:

  • /dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629
  • 解码后:/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5148&side=provider&timestamp=1507291294629

 

最后执行zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true))来创建节点,该方法由CuratorZookeeperClient的父类AbstractZookeeperClient来执行:

 1     public void create(String path, boolean ephemeral) {
 2         int i = path.lastIndexOf('/');
 3         if (i > 0) {
 4             create(path.substring(0, i), false);
 5         }
 6         if (ephemeral) {
 7             createEphemeral(path);
 8         } else {
 9             createPersistent(path);
10         }
11     }

这里实际上是通过递归分别创建持久化的/dubbo,/dubbo/com.alibaba.dubbo.demo.DemoService以及/dubbo/com.alibaba.dubbo.demo.DemoService/providers节点;最后创建临时节点/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629,而实际上,如果使用了curator的话,可以直接使用递归创建节点即可(结合zk的特性,只有最后一个字节点可以是临时节点,父节点一定是持久化节点),这里这样的写法应该是兼容不能递归创建节点的Zkclient客户端。值得注意的是,url.getParameter(Constants.DYNAMIC_KEY, true)为true则最终创建的节点是临时节点,否则是持久化节点。

创建节点的操作是在CuratorZookeeperClient中进行的。

 1     public void createPersistent(String path) {
 2         try {
 3             client.create().forPath(path);
 4         } catch (NodeExistsException e) {
 5         } catch (Exception e) {
 6             throw new IllegalStateException(e.getMessage(), e);
 7         }
 8     }
 9 
10     public void createEphemeral(String path) {
11         try {
12             client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
13         } catch (NodeExistsException e) {
14         } catch (Exception e) {
15             throw new IllegalStateException(e.getMessage(), e);
16         }
17     }

 

到此为止,我们去zk上看一下节点的创建情况。

或者从zkui上看一下:


隐藏掉的是ip:10.10.10.10。

 

到目前为止,我们再来看看ZookeeperRegistry的属性变化。相较于注册前:

  • Set<URL> registered:[ dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5214&side=provider&timestamp=1507293238549 ]


免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击