勿忘初心2018-11-18 11:49此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
为了安全:服务启动的ip全部使用10.10.10.10
远程服务的暴露总体步骤:
服务远程暴露的代码:
1 //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
2 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
3 if (logger.isInfoEnabled()) {
4 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
5 }
6 if (registryURLs != null && registryURLs.size() > 0
7 && url.getParameter("register", true)) {
8 for (URL registryURL : registryURLs) {
9 url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
10 URL monitorUrl = loadMonitor(registryURL);
11 if (monitorUrl != null) {
12 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
13 }
14 if (logger.isInfoEnabled()) {
15 logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
16 }
17 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
18 Exporter<?> exporter = protocol.export(invoker);
19 exporters.add(exporter);
20 }
21 } else {
22 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
23 Exporter<?> exporter = protocol.export(invoker);
24 exporters.add(exporter);
25 }
26 }
首先将实现类ref封装为Invoker,之后将invoker转换为exporter,最后将exporter放入缓存List<Exporter> exporters中。
一 将实现类ref封装为Invoker
1 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
1 为registryURL拼接export=providerUrl参数
一开始的registryURL:
registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&pid=887®istry=zookeeper×tamp=1507096022072
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())这句代码为registryURL添加了参数并编码:(这里给出没有编码的样子)
1 export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=887&side=provider×tamp=1507096024334
2 ProxyFactory$Adaptive.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)
1 public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
2 if (arg2 == null)
3 throw new IllegalArgumentException("url == null");
4 com.alibaba.dubbo.common.URL url = arg2;
5 String extName = url.getParameter("proxy", "javassist");//结果是javassist
6 if(extName == null)
7 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
8 com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
9 return extension.getInvoker(arg0, arg1, arg2);
10 }
这里,本来是调用JavassistProxyFactory的getInvoker方法,但是JavassistProxyFactory被StubProxyFactoryWrapper给aop了。
3 StubProxyFactoryWrapper.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)
1 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
2 return proxyFactory.getInvoker(proxy, type, url);
3 }
4 JavassistProxyFactory.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)
1 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
2 // TODO Wrapper类不能正确处理带$的类名
3 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
4 return new AbstractProxyInvoker<T>(proxy, type, url) {
5 @Override
6 protected Object doInvoke(T proxy, String methodName,
7 Class<?>[] parameterTypes,
8 Object[] arguments) throws Throwable {
9 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
10 }
11 };
12 }
首先是创建Wrapper类:Wrapper.getWrapper(Class<DemoServiceImpl>)。该类记录了DemoServiceImpl的属性名称,方法名称等信息。关键代码如下:(完整代码见:7.2 服务本地暴露)
1 import com.alibaba.dubbo.common.bytecode.Wrapper;
2 import java.util.HashMap;
3
4 public class Wrapper1 extends Wrapper {
5
6 public static String[] pns;//property name array
7 public static java.util.Map pts = new HashMap();//<property key, property value>
8 public static String[] mns;//method names
9 public static String[] dmns;//
10 public static Class[] mts0;
55 /**
56 * @param o 实现类
57 * @param n 方法名称
58 * @param p 参数类型
59 * @param v 参数名称
60 * @return
61 * @throws java.lang.reflect.InvocationTargetException
62 */
63 public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
64 com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
65 try {
66 w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
67 } catch (Throwable e) {
68 throw new IllegalArgumentException(e);
69 }
70 try {
71 if ("sayHello".equals(n) && p.length == 1) {
72 return ($w) w.sayHello((java.lang.String) v[0]);
73 }
74 } catch (Throwable e) {
75 throw new java.lang.reflect.InvocationTargetException(e);
76 }
77 throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
78 }
79 }
创建完DemoServiceImpl的Wrapper类之后(实际上该实例在本地暴露的时候已经存入缓存了,这里只是从缓存中拿出来而已),创建一个AbstractProxyInvoker实例。
1 private final T proxy;
2 private final Class<T> type;
3 private final URL url;
4
5 public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
6 if (proxy == null) {
7 throw new IllegalArgumentException("proxy == null");
8 }
9 if (type == null) {
10 throw new IllegalArgumentException("interface == null");
11 }
12 if (!type.isInstance(proxy)) {
13 throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
14 }
15 this.proxy = proxy;
16 this.type = type;
17 this.url = url;
18 }
最后创建完成的AbstractProxyInvoker实例属性如下:
这样我们就将ref实现类转换成了Invoker,之后在调用该invoker.invoke(Invocation invocation)的时候,会调用invoker.doInvoke(T proxy, String methodName,Class<?>[] parameterTypes, Object[] arguments)的时候,就会调用相应的实现类proxy的wrapper类的invokeMethod(proxy, methodName, parameterTypes, arguments),该方法又会调用真实的实现类methodName方法。这里可以先给出AbstractProxyInvoker.invoke(Invocation invocation)源码:
1 public Result invoke(Invocation invocation) throws RpcException {
2 try {
3 return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
4 } catch (InvocationTargetException e) {
5 return new RpcResult(e.getTargetException());
6 } catch (Throwable e) {
7 throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
8 }
9 }
这里的proxy就是上边赋好值的proxy:DemoServiceImpl实例。而方法信息会封装在Invocation对象中,该对象在服务引用时介绍。
二 将Invoker转换为Exporter
1 Exporter<?> exporter = protocol.export(invoker)
1 Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker AbstractProxyInvoker实例)
1 public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
2 if (arg0 == null)
3 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
4 if (arg0.getUrl() == null)
5 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
6 com.alibaba.dubbo.common.URL url = arg0.getUrl();
7 String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//registry
8 if(extName == null)
9 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
10 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
11 return extension.export(arg0);
12 }
这里,由于aop的原因,首先调用了ProtocolListenerWrapper的export(Invoker<T> invoker),如下:
1 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
3 return protocol.export(invoker);
4 }
5 return new ListenerExporterWrapper<T>(protocol.export(invoker),
6 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
7 }
由于协议是“registry”,所以不做任何处理,继续调用ProtocolFilterWrapper的export(Invoker<T> invoker),如下:
1 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
3 return protocol.export(invoker);
4 }
5 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
6 }
同理,由于协议是“registry”,所以不做任何处理,继续调用RegistryProtocol.export(final Invoker<T> originInvoker),如下:
1 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
2 //export invoker
3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
4 //registry provider
5 final Registry registry = getRegistry(originInvoker);
6 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
7 registry.register(registedProviderUrl);
8 // 订阅override数据
9 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
10 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
11 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
12 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
13 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
14 //保证每次export都返回一个新的exporter实例
15 return new Exporter<T>() {
16 public Invoker<T> getInvoker() {
17 return exporter.getInvoker();
18 }
19
20 public void unexport() {
21 try {
22 exporter.unexport();
23 } catch (Throwable t) {
24 logger.warn(t.getMessage(), t);
25 }
26 try {
27 registry.unregister(registedProviderUrl);
28 } catch (Throwable t) {
29 logger.warn(t.getMessage(), t);
30 }
31 try {
32 overrideListeners.remove(overrideSubscribeUrl);
33 registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
34 } catch (Throwable t) {
35 logger.warn(t.getMessage(), t);
36 }
37 }
38 };
39 }
该方法完成了远程暴露的全部流程。
2 将invoker转换为exporter并启动netty服务
1 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
doLocalExport(final Invoker<T> originInvoker)
1 /**
2 * 1 从invoker的URL中的Map<String, String> parameters中获取key为export的地址providerUrl,该地址将是服务注册在zk上的节点
3 * 2 从 Map<String, ExporterChangeableWrapper<?>> bounds 缓存中获取key为上述providerUrl的exporter,如果有,直接返回,如果没有,创建并返回
4 * @return
5 */
6 @SuppressWarnings("unchecked")
7 private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
8 String key = getCacheKey(originInvoker);//根据originInvoker获取providerUrl
9 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
10 if (exporter == null) {
11 synchronized (bounds) {
12 exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
13 if (exporter == null) {
14 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));//存储originInvoker和providerUrl
15 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
16 bounds.put(key, exporter);
17 }
18 }
19 }
20 return exporter;
21 }
2.1 从originInvoker中获取providerUrl
该方法直接首先调用getCacheKey(final Invoker<?> originInvoker)中获取providerUrl,这里的originInvoker就是上述创建出来的AbstractProxyInvoker实例,注意他的url是registry协议的,该url的export参数的value就是我们要获取的providerUrl。获取providerUrl的源码如下:
1 private String getCacheKey(final Invoker<?> originInvoker) {
2 URL providerUrl = getProviderUrl(originInvoker);
3 String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
4 return key;
5 }
6
7 private URL getProviderUrl(final Invoker<?> origininvoker) {
8 String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
9 if (export == null || export.length() == 0) {
10 throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
11 }
12
13 URL providerUrl = URL.valueOf(export);
14 return providerUrl;
15 }
之后一系列的操作,就是获取该providerUrl对应的exporter,之后放入缓存Map<String, ExporterChangeableWrapper<?>> bounds中,所以一个providerUrl只会对应一个exporter。
更多网易技术、产品、运营经验分享请点击。