构建客户端源码解析(1)

叁叁肆2018-11-18 11:29

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


准备工作:

先启动两个provider:

  • dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=318&revision=2.5.7&side=provider&timestamp=1510225244315
  • dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25215&side=provider&timestamp=1510225334486

来看一下ReferenceBean的继承实现关系图:


在执行DemoService demoService = (DemoService) context.getBean("demoService")时,由于ReferenceBean是一个FactoryBean,所以这里会通过FactoryBean.getObject方法获取Bean。

来看一下ReferenceBean的核心代码:

    public Object getObject() throws Exception {
        return get();
    }

    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }

    private void init() {
        ...
        ref = createProxy(map);
    }

    private T createProxy(Map<String, String> map) {
        ...
        if (urls.size() == 1) {
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } 
        ...
        // 创建服务代理
        return (T) proxyFactory.getProxy(invoker);
    }

最核心的两行代码如上红色。

 

一 使用Protocol将interfaceClass转化为Invoker

1 invoker = refprotocol.refer(interfaceClass, urls.get(0))

这里的refprotocol是Protocol$Adaptive实例。

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    ...
    public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null)
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
    ...
}

这里extName="registry"。之后经过ProtocolListenerWrapper.refer->ProtocolFilterWrapper.refer->RegistryProtocol.refer,前两步什么都不做(registry协议)。来看RegistryProtocol.refer方法核心代码:

1     public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
2         url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
3         Registry registry = registryFactory.getRegistry(url);
4         ...
5         return doRefer(cluster, registry, type, url);
6     }

参数:

  • url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&register.ip=10.10.10.10&side=consumer&timestamp=1510225913509&registry=zookeeper&timestamp=1510225984358
  • type: interface com.alibaba.dubbo.demo.DemoService

第一行代码执行完成之后,替换了协议,此时的url为:

zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&register.ip=10.10.10.10&side=consumer&timestamp=1510225913509&timestamp=1510225984358

之后开始获取Registry。这里的registryFactory是RegistryFactory$Adaptive实例。

 1 public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {
 2     public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
 3         if (arg0 == null)
 4             throw new IllegalArgumentException("url == null");
 5         com.alibaba.dubbo.common.URL url = arg0;
 6         String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );//zookeeper
 7         if(extName == null)
 8             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
 9         com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
10         return extension.getRegistry(arg0);
11     }
12 }

这里的extName是zookeeper。之后执行ZookeeperRegistryFactory的父类AbstractRegistryFactory.getRegistry,如下:

 1     public Registry getRegistry(URL url) {
 2         url = url.setPath(RegistryService.class.getName())
 3                 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
 4                 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
 5         String key = url.toServiceString();
 6         // 锁定注册中心获取过程,保证注册中心单一实例
 7         LOCK.lock();
 8         try {
 9             Registry registry = REGISTRIES.get(key);
10             if (registry != null) {
11                 return registry;
12             }
13             registry = createRegistry(url);
14             if (registry == null) {
15                 throw new IllegalStateException("Can not create registry " + url);
16             }
17             REGISTRIES.put(key, registry);
18             return registry;
19         } finally {
20             // 释放锁
21             LOCK.unlock();
22         }
23     }

经过处理的url为:

zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=25267&timestamp=1510225984358

之后调用ZookeeperRegistryFactory.createRegistry(URL url):

1     public Registry createRegistry(URL url) {
2         return new ZookeeperRegistry(url, zookeeperTransporter);
3     }

这里的zookeeperTransporter为ZookeeperTransporter$Adaptive实例。

 1     public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
 2         super(url);
 3         if (url.isAnyHost()) {
 4             throw new IllegalStateException("registry address == null");
 5         }
 6         String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
 7         if (!group.startsWith(Constants.PATH_SEPARATOR)) {
 8             group = Constants.PATH_SEPARATOR + group;
 9         }
10         this.root = group;
11         zkClient = zookeeperTransporter.connect(url);
12         zkClient.addStateListener(new StateListener() {
13             public void stateChanged(int state) {
14                 if (state == RECONNECTED) {
15                     try {
16                         recover();
17                     } catch (Exception e) {
18                         logger.error(e.getMessage(), e);
19                     }
20                 }
21             }
22         });
23     }

通过super(url)这句代码,调用了ZookeeperRegistry的父类FailbackRegistry(启动失败处理器:注册失败/注销失败/订阅失败/反订阅失败/通知失败)和AbstractRegistry(将信息写入properties文件,进行相应通知-这里没有url的订阅器,所以没做什么事)。

然后获取ZkClient客户端,最后添加失败重连监听器。

执行zookeeperTransporter.connect(url),该类中的extName是"zkClient"(我们在provider部分使用了curator)。之后执行ZkclientZookeeperTransporter.connect:

1     public ZookeeperClient connect(URL url) {
2         return new ZkclientZookeeperClient(url);
3     }
 1     public ZkclientZookeeperClient(URL url) {
 2         super(url);
 3         client = new ZkClientWrapper(url.getBackupAddress(), 30000);
 4         client.addListener(new IZkStateListener() {
 5             public void handleStateChanged(KeeperState state) throws Exception {
 6                 ZkclientZookeeperClient.this.state = state;
 7                 if (state == KeeperState.Disconnected) {
 8                     stateChanged(StateListener.DISCONNECTED);
 9                 } else if (state == KeeperState.SyncConnected) {
10                     stateChanged(StateListener.CONNECTED);
11                 }
12             }
13 
14             public void handleNewSession() throws Exception {
15                 stateChanged(StateListener.RECONNECTED);
16             }
17         });
18         client.start();
19     }

此处的client是ZkClientWrapper实例,来看ZkClientWrapper.start():

 1     private ListenableFutureTask<ZkClient> listenableFutureTask;
 2 
 3     public ZkClientWrapper(final String serverAddr, long timeout) {
 4         this.timeout = timeout;
 5         listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
 6             @Override
 7             public ZkClient call() throws Exception {
 8                 return new ZkClient(serverAddr, Integer.MAX_VALUE);
 9             }
10         });
11     }
12 
13     public void start() {
14         if (!started) {
15             Thread connectThread = new Thread(listenableFutureTask);
16             connectThread.setName("DubboZkclientConnector");
17             connectThread.setDaemon(true);
18             connectThread.start();
19             try {
20                 client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
21             } catch (Throwable t) {
22                 logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
23             }
24             started = true;
25         } else {
26             logger.warn("Zkclient has already been started!");
27         }
28     }

此处会new ZkClient,连接zookeeper。

之后设置失败重连监听器。到此为止,创建Registry就完成了!再回到RegistryProtocol.refer方法核心代码:

1     public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
2         url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
3         Registry registry = registryFactory.getRegistry(url);
4         ...
5         return doRefer(cluster, registry, type, url);
6     }

之后执行最后一行代码:

 1     private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
 2         RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
 3         directory.setRegistry(registry);
 4         directory.setProtocol(protocol);
 5         // REFER_KEY的所有属性
 6         Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
 7         URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
 8         if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
 9                 && url.getParameter(Constants.REGISTER_KEY, true)) {
10             registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
11                     Constants.CHECK_KEY, String.valueOf(false)));
12         }
13         directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
14                 Constants.PROVIDERS_CATEGORY
15                         + "," + Constants.CONFIGURATORS_CATEGORY
16                         + "," + Constants.ROUTERS_CATEGORY));
17         return cluster.join(directory);
18     }

总体步骤:

  • 首先创建RegistryDirectory实例;
  • 之后向zk注册消费者
  • 然后开启监听器(此处发生了第一次服务发现/长连接的建立/netty客户端的建立)
  • 最后使用将RegistryDirectory实例

 首先是创建RegistryDirectory,创建完成的实例:

-->List<Router> routers: [MockInvokersSelector实例]
-->Registry registry: 上述的ZookeeperRegistry实例(zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=25267&timestamp=1510225984358)
-->String serviceKey: com.alibaba.dubbo.registry.RegistryService
-->String[] serviceMethods: [sayHello]
-->Class<T> serviceType: interface com.alibaba.dubbo.demo.DemoService
-->URL url: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&register.ip=10.10.10.10&side=consumer&timestamp=1510225913509&timestamp=1510225984358
-->URL consumerUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&register.ip=10.10.10.10&side=consumer&timestamp=1510225913509&timestamp=1510225984358
-->URL directoryUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&register.ip=10.10.10.10&side=consumer&timestamp=1510225913509
-->URL overrideDirectoryUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&register.ip=10.10.10.10&side=consumer&timestamp=1510225913509
-->Map<String, String> queryMap: {side=consumer, application=demo-consumer, register.ip=10.10.10.10, methods=sayHello, dubbo=2.0.0, pid=25267, check=false, interface=com.alibaba.dubbo.demo.DemoService, timestamp=1510225913509}

其中Router是在RegistryDirectory的父类AbstractDirectory中创建的,代码如下:

    public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
        if (url == null)
            throw new IllegalArgumentException("url == null");
        this.url = url;
        this.consumerUrl = consumerUrl;
        setRouters(routers);
    }

    protected void setRouters(List<Router> routers) {
        // copy list
        routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
        // append url router
        String routerkey = url.getParameter(Constants.ROUTER_KEY);
        if (routerkey != null && routerkey.length() > 0) {
            RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
            routers.add(routerFactory.getRouter(url));
        }
        // append mock invoker selector
        routers.add(new MockInvokersSelector());
        Collections.sort(routers);
        this.routers = routers;
    }

之后向注册中心注册消费者,注册的方式与服务提供者一样。先是通过FailbackRegistry.register,内部调用子类ZookeeperRegistry的doRegister(),如果失败,加入注册失败列表(会被修复线程后台重新注册)。

 1     public void register(URL url) {
 2         if (destroyed.get()){
 3             return;
 4         }
 5         super.register(url);
 6         failedRegistered.remove(url);
 7         failedUnregistered.remove(url);
 8         try {
 9             // 向服务器端发送注册请求
10             doRegister(url);
11         } catch (Exception e) {
12             Throwable t = e;
13 
14             // 如果开启了启动时检测,则直接抛出异常
15             boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
16                     && url.getParameter(Constants.CHECK_KEY, true)
17                     && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
18             boolean skipFailback = t instanceof SkipFailbackWrapperException;
19             if (check || skipFailback) {
20                 if (skipFailback) {
21                     t = t.getCause();
22                 }
23                 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
24             } else {
25                 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
26             }
27 
28             // 将失败的注册请求记录到失败列表,定时重试
29             failedRegistered.add(url);
30         }
31     }

最后来看ZookeeperRegistry的doRegister方法:

1     protected void doRegister(URL url) {
2         try {
3             zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
4         } catch (Throwable e) {
5             throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
6         }
7     }

在zk上创建临时节点:/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer&timestamp=1510225913509

到此,消费者注册完成!之后directory.subscribe进行订阅。RegistryDirectory.subscribe(URL url):

1     public void subscribe(URL url) {
2         setConsumerUrl(url);
3         registry.subscribe(url, this);
4     }

FailbackRegistry.subscribe(URL url, NotifyListener listener)核心代码:

 1     public void subscribe(URL url, NotifyListener listener) {
 2         ...
 3         super.subscribe(url, listener);
 4         removeFailedSubscribed(url, listener);
 5         try {
 6             // 向服务器端发送订阅请求
 7             doSubscribe(url, listener);
 8         } catch (Exception e) {
 9             ...
10             // 将失败的订阅请求记录到失败列表,定时重试
11             addFailedSubscribed(url, listener);
12         }
13     }

ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)

 1     protected void doSubscribe(final URL url, final NotifyListener listener) {
 2         try {
 3             ...
 4             List<URL> urls = new ArrayList<URL>();
 5             for (String path : toCategoriesPath(url)) {
 6                 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
 7                 if (listeners == null) {
 8                     zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
 9                     listeners = zkListeners.get(url);
10                 }
11                 ChildListener zkListener = listeners.get(listener);
12                 if (zkListener == null) {
13                     listeners.putIfAbsent(listener, new ChildListener() {
14                         public void childChanged(String parentPath, List<String> currentChilds) {
15                             ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
16                         }
17                     });
18                     zkListener = listeners.get(listener);
19                 }
20                 zkClient.create(path, false);
21                 List<String> children = zkClient.addChildListener(path, zkListener);
22                 if (children != null) {
23                     urls.addAll(toUrlsWithEmpty(url, path, children));
24                 }
25             }
26             notify(url, listener, urls);
27         } catch (Throwable e) {
28             throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
29         }
30     }

这里的for循环是3次:

  • /dubbo/com.alibaba.dubbo.demo.DemoService/providers
  • /dubbo/com.alibaba.dubbo.demo.DemoService/configurators
  • /dubbo/com.alibaba.dubbo.demo.DemoService/routers

执行完上述for循环后,来看此时的:

ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners:

1 {
2 consumer://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer&timestamp=1510225913509
3 =
4 {RegistryDirectory实例=ZookeeperRegistry中的匿名内部类ChildListener实例}
5 }

List<URL> urls:(4个元素)

[
dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=318&revision=2.5.7&side=provider&timestamp=1510225244315, 

dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25215&side=provider&timestamp=1510225334486, 

empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer&timestamp=1510225913509, 

empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer&timestamp=1510225913509
]

注意:前边两个元素是在执行List<String> children = zkClient.addChildListener(path, zkListener)代码时,会返回当前path下的节点(实际上就是第一次服务发现)。

之后一路执行到AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)

 1     protected void notify(URL url, NotifyListener listener, List<URL> urls) {
 2         ...
 3         Map<String, List<URL>> result = new HashMap<String, List<URL>>();
 4         for (URL u : urls) {
 5             if (UrlUtils.isMatch(url, u)) {
 6                 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
 7                 List<URL> categoryList = result.get(category);
 8                 if (categoryList == null) {
 9                     categoryList = new ArrayList<URL>();
10                     result.put(category, categoryList);
11                 }
12                 categoryList.add(u);
13             }
14         }
15         if (result.size() == 0) {
16             return;
17         }
18         Map<String, List<URL>> categoryNotified = notified.get(url);
19         if (categoryNotified == null) {
20             notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
21             categoryNotified = notified.get(url);
22         }
23         for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
24             String category = entry.getKey();
25             List<URL> categoryList = entry.getValue();
26             categoryNotified.put(category, categoryList);
27             saveProperties(url);
28             listener.notify(categoryList);
29         }
30     }


免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击