此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
准备工作:
先启动两个provider:
来看一下ReferenceBean的继承实现关系图:
在执行DemoService demoService = (DemoService) context.getBean("demoService")时,由于ReferenceBean是一个FactoryBean,所以这里会通过FactoryBean.getObject方法获取Bean。
来看一下ReferenceBean的核心代码:
public Object getObject() throws Exception { return get(); } public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref; } private void init() { ... ref = createProxy(map); } private T createProxy(Map<String, String> map) { ... if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } ... // 创建服务代理 return (T) proxyFactory.getProxy(invoker); }
最核心的两行代码如上红色。
一 使用Protocol将interfaceClass转化为Invoker
1 invoker = refprotocol.refer(interfaceClass, urls.get(0))
这里的refprotocol是Protocol$Adaptive实例。
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol { ... public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } ... }
这里extName="registry"。之后经过ProtocolListenerWrapper.refer->ProtocolFilterWrapper.refer->RegistryProtocol.refer,前两步什么都不做(registry协议)。来看RegistryProtocol.refer方法核心代码:
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 3 Registry registry = registryFactory.getRegistry(url); 4 ... 5 return doRefer(cluster, registry, type, url); 6 }
参数:
第一行代码执行完成之后,替换了协议,此时的url为:
zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509×tamp=1510225984358
之后开始获取Registry。这里的registryFactory是RegistryFactory$Adaptive实例。
1 public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory { 2 public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { 3 if (arg0 == null) 4 throw new IllegalArgumentException("url == null"); 5 com.alibaba.dubbo.common.URL url = arg0; 6 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );//zookeeper 7 if(extName == null) 8 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])"); 9 com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName); 10 return extension.getRegistry(arg0); 11 } 12 }
这里的extName是zookeeper。之后执行ZookeeperRegistryFactory的父类AbstractRegistryFactory.getRegistry,如下:
1 public Registry getRegistry(URL url) { 2 url = url.setPath(RegistryService.class.getName()) 3 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) 4 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); 5 String key = url.toServiceString(); 6 // 锁定注册中心获取过程,保证注册中心单一实例 7 LOCK.lock(); 8 try { 9 Registry registry = REGISTRIES.get(key); 10 if (registry != null) { 11 return registry; 12 } 13 registry = createRegistry(url); 14 if (registry == null) { 15 throw new IllegalStateException("Can not create registry " + url); 16 } 17 REGISTRIES.put(key, registry); 18 return registry; 19 } finally { 20 // 释放锁 21 LOCK.unlock(); 22 } 23 }
经过处理的url为:
zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=25267×tamp=1510225984358
之后调用ZookeeperRegistryFactory.createRegistry(URL url):
1 public Registry createRegistry(URL url) { 2 return new ZookeeperRegistry(url, zookeeperTransporter); 3 }
这里的zookeeperTransporter为ZookeeperTransporter$Adaptive实例。
1 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 2 super(url); 3 if (url.isAnyHost()) { 4 throw new IllegalStateException("registry address == null"); 5 } 6 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); 7 if (!group.startsWith(Constants.PATH_SEPARATOR)) { 8 group = Constants.PATH_SEPARATOR + group; 9 } 10 this.root = group; 11 zkClient = zookeeperTransporter.connect(url); 12 zkClient.addStateListener(new StateListener() { 13 public void stateChanged(int state) { 14 if (state == RECONNECTED) { 15 try { 16 recover(); 17 } catch (Exception e) { 18 logger.error(e.getMessage(), e); 19 } 20 } 21 } 22 }); 23 }
通过super(url)这句代码,调用了ZookeeperRegistry的父类FailbackRegistry(启动失败处理器:注册失败/注销失败/订阅失败/反订阅失败/通知失败)和AbstractRegistry(将信息写入properties文件,进行相应通知-这里没有url的订阅器,所以没做什么事)。
然后获取ZkClient客户端,最后添加失败重连监听器。
执行zookeeperTransporter.connect(url),该类中的extName是"zkClient"(我们在provider部分使用了curator)。之后执行ZkclientZookeeperTransporter.connect:
1 public ZookeeperClient connect(URL url) { 2 return new ZkclientZookeeperClient(url); 3 }
1 public ZkclientZookeeperClient(URL url) { 2 super(url); 3 client = new ZkClientWrapper(url.getBackupAddress(), 30000); 4 client.addListener(new IZkStateListener() { 5 public void handleStateChanged(KeeperState state) throws Exception { 6 ZkclientZookeeperClient.this.state = state; 7 if (state == KeeperState.Disconnected) { 8 stateChanged(StateListener.DISCONNECTED); 9 } else if (state == KeeperState.SyncConnected) { 10 stateChanged(StateListener.CONNECTED); 11 } 12 } 13 14 public void handleNewSession() throws Exception { 15 stateChanged(StateListener.RECONNECTED); 16 } 17 }); 18 client.start(); 19 }
此处的client是ZkClientWrapper实例,来看ZkClientWrapper.start():
1 private ListenableFutureTask<ZkClient> listenableFutureTask; 2 3 public ZkClientWrapper(final String serverAddr, long timeout) { 4 this.timeout = timeout; 5 listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() { 6 @Override 7 public ZkClient call() throws Exception { 8 return new ZkClient(serverAddr, Integer.MAX_VALUE); 9 } 10 }); 11 } 12 13 public void start() { 14 if (!started) { 15 Thread connectThread = new Thread(listenableFutureTask); 16 connectThread.setName("DubboZkclientConnector"); 17 connectThread.setDaemon(true); 18 connectThread.start(); 19 try { 20 client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS); 21 } catch (Throwable t) { 22 logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t); 23 } 24 started = true; 25 } else { 26 logger.warn("Zkclient has already been started!"); 27 } 28 }
此处会new ZkClient,连接zookeeper。
之后设置失败重连监听器。到此为止,创建Registry就完成了!再回到RegistryProtocol.refer方法核心代码:
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 3 Registry registry = registryFactory.getRegistry(url); 4 ... 5 return doRefer(cluster, registry, type, url); 6 }
之后执行最后一行代码:
1 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 2 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 3 directory.setRegistry(registry); 4 directory.setProtocol(protocol); 5 // REFER_KEY的所有属性 6 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 7 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); 8 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 9 && url.getParameter(Constants.REGISTER_KEY, true)) { 10 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 11 Constants.CHECK_KEY, String.valueOf(false))); 12 } 13 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 14 Constants.PROVIDERS_CATEGORY 15 + "," + Constants.CONFIGURATORS_CATEGORY 16 + "," + Constants.ROUTERS_CATEGORY)); 17 return cluster.join(directory); 18 }
总体步骤:
首先是创建RegistryDirectory,创建完成的实例:
-->List<Router> routers: [MockInvokersSelector实例] -->Registry registry: 上述的ZookeeperRegistry实例(zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=25267×tamp=1510225984358) -->String serviceKey: com.alibaba.dubbo.registry.RegistryService -->String[] serviceMethods: [sayHello] -->Class<T> serviceType: interface com.alibaba.dubbo.demo.DemoService -->URL url: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509×tamp=1510225984358 -->URL consumerUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509×tamp=1510225984358 -->URL directoryUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509 -->URL overrideDirectoryUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509 -->Map<String, String> queryMap: {side=consumer, application=demo-consumer, register.ip=10.10.10.10, methods=sayHello, dubbo=2.0.0, pid=25267, check=false, interface=com.alibaba.dubbo.demo.DemoService, timestamp=1510225913509}
其中Router是在RegistryDirectory的父类AbstractDirectory中创建的,代码如下:
public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) { if (url == null) throw new IllegalArgumentException("url == null"); this.url = url; this.consumerUrl = consumerUrl; setRouters(routers); } protected void setRouters(List<Router> routers) { // copy list routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers); // append url router String routerkey = url.getParameter(Constants.ROUTER_KEY); if (routerkey != null && routerkey.length() > 0) { RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey); routers.add(routerFactory.getRouter(url)); } // append mock invoker selector routers.add(new MockInvokersSelector()); Collections.sort(routers); this.routers = routers; }
之后向注册中心注册消费者,注册的方式与服务提供者一样。先是通过FailbackRegistry.register,内部调用子类ZookeeperRegistry的doRegister(),如果失败,加入注册失败列表(会被修复线程后台重新注册)。
1 public void register(URL url) { 2 if (destroyed.get()){ 3 return; 4 } 5 super.register(url); 6 failedRegistered.remove(url); 7 failedUnregistered.remove(url); 8 try { 9 // 向服务器端发送注册请求 10 doRegister(url); 11 } catch (Exception e) { 12 Throwable t = e; 13 14 // 如果开启了启动时检测,则直接抛出异常 15 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 16 && url.getParameter(Constants.CHECK_KEY, true) 17 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 18 boolean skipFailback = t instanceof SkipFailbackWrapperException; 19 if (check || skipFailback) { 20 if (skipFailback) { 21 t = t.getCause(); 22 } 23 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); 24 } else { 25 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); 26 } 27 28 // 将失败的注册请求记录到失败列表,定时重试 29 failedRegistered.add(url); 30 } 31 }
最后来看ZookeeperRegistry的doRegister方法:
1 protected void doRegister(URL url) { 2 try { 3 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 }
在zk上创建临时节点:/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509
到此,消费者注册完成!之后directory.subscribe进行订阅。RegistryDirectory.subscribe(URL url):
1 public void subscribe(URL url) { 2 setConsumerUrl(url); 3 registry.subscribe(url, this); 4 }
FailbackRegistry.subscribe(URL url, NotifyListener listener)核心代码:
1 public void subscribe(URL url, NotifyListener listener) { 2 ... 3 super.subscribe(url, listener); 4 removeFailedSubscribed(url, listener); 5 try { 6 // 向服务器端发送订阅请求 7 doSubscribe(url, listener); 8 } catch (Exception e) { 9 ... 10 // 将失败的订阅请求记录到失败列表,定时重试 11 addFailedSubscribed(url, listener); 12 } 13 }
ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
1 protected void doSubscribe(final URL url, final NotifyListener listener) { 2 try { 3 ... 4 List<URL> urls = new ArrayList<URL>(); 5 for (String path : toCategoriesPath(url)) { 6 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); 7 if (listeners == null) { 8 zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); 9 listeners = zkListeners.get(url); 10 } 11 ChildListener zkListener = listeners.get(listener); 12 if (zkListener == null) { 13 listeners.putIfAbsent(listener, new ChildListener() { 14 public void childChanged(String parentPath, List<String> currentChilds) { 15 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); 16 } 17 }); 18 zkListener = listeners.get(listener); 19 } 20 zkClient.create(path, false); 21 List<String> children = zkClient.addChildListener(path, zkListener); 22 if (children != null) { 23 urls.addAll(toUrlsWithEmpty(url, path, children)); 24 } 25 } 26 notify(url, listener, urls); 27 } catch (Throwable e) { 28 throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 29 } 30 }
这里的for循环是3次:
执行完上述for循环后,来看此时的:
ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners:
1 { 2 consumer://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509 3 = 4 {RegistryDirectory实例=ZookeeperRegistry中的匿名内部类ChildListener实例} 5 }
List<URL> urls:(4个元素)
[ dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=318&revision=2.5.7&side=provider×tamp=1510225244315, dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25215&side=provider×tamp=1510225334486, empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509, empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509 ]
注意:前边两个元素是在执行List<String> children = zkClient.addChildListener(path, zkListener)代码时,会返回当前path下的节点(实际上就是第一次服务发现)。
之后一路执行到AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
1 protected void notify(URL url, NotifyListener listener, List<URL> urls) { 2 ... 3 Map<String, List<URL>> result = new HashMap<String, List<URL>>(); 4 for (URL u : urls) { 5 if (UrlUtils.isMatch(url, u)) { 6 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 7 List<URL> categoryList = result.get(category); 8 if (categoryList == null) { 9 categoryList = new ArrayList<URL>(); 10 result.put(category, categoryList); 11 } 12 categoryList.add(u); 13 } 14 } 15 if (result.size() == 0) { 16 return; 17 } 18 Map<String, List<URL>> categoryNotified = notified.get(url); 19 if (categoryNotified == null) { 20 notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); 21 categoryNotified = notified.get(url); 22 } 23 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { 24 String category = entry.getKey(); 25 List<URL> categoryList = entry.getValue(); 26 categoryNotified.put(category, categoryList); 27 saveProperties(url); 28 listener.notify(categoryList); 29 } 30 }
更多网易技术、产品、运营经验分享请点击。