此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
2.2 创建InvokerDelegete
1 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
InvokerDelegete是RegistryProtocol的一个静态内部类,该类是一个originInvoker的委托类,该类存储了originInvoker,其父类InvokerWrapper还会存储providerUrl,InvokerWrapper会调用originInvoker的invoke方法,也会销毁invoker。可以管理invoker的生命周期。
1 public static class InvokerDelegete<T> extends InvokerWrapper<T> { 2 private final Invoker<T> invoker; 3 4 /** 5 * @param invoker 6 * @param url invoker.getUrl返回此值 7 */ 8 public InvokerDelegete(Invoker<T> invoker, URL url) { 9 super(invoker, url); 10 this.invoker = invoker; 11 } 12 13 public Invoker<T> getInvoker() { 14 if (invoker instanceof InvokerDelegete) { 15 return ((InvokerDelegete<T>) invoker).getInvoker(); 16 } else { 17 return invoker; 18 } 19 } 20 }
InvokerWrapper的核心代码:
1 public class InvokerWrapper<T> implements Invoker<T> { 2 private final Invoker<T> invoker;//originInvoker 3 private final URL url;//providerUrl 4 5 public InvokerWrapper(Invoker<T> invoker, URL url) { 6 this.invoker = invoker; 7 this.url = url; 8 } 9 10 public boolean isAvailable() { 11 return invoker.isAvailable(); 12 } 13 14 public Result invoke(Invocation invocation) throws RpcException { 15 return invoker.invoke(invocation); 16 } 17 18 public void destroy() { 19 invoker.destroy(); 20 } 21 }
这样一个InvokerDelegete对象就创建好了,属性如下:
2.3 使用DubboProtocol将InvokerDelegete转换为Exporter
1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
2.3.1 Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker InvokerDelegete对象)
1 public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { 2 if (arg0 == null) 3 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 4 if (arg0.getUrl() == null) 5 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); 6 com.alibaba.dubbo.common.URL url = arg0.getUrl(); 7 String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//dubbo 8 if(extName == null) 9 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 10 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); 11 return extension.export(arg0); 12 }
该代码再贴最后一遍了。之后调用ProtocolListenerWrapper的ProtocolListenerWrapper.export(Invoker<T> InvokerDelegete),之后调用ProtocolFilterWrapper.export(Invoker<T> InvokerDelegete):首先对InvokerDelegete对象进行8个filter的递归包装,之后使用DubboProtocol对包装后的InvokerDelegete对象进行export。
层层包装的源码:
1 /** 2 * 1 根据key从url中获取相应的filter的values,再根据这个values和group去获取类上带有@Active注解的filter集合 3 * 2 之后将这些filter对传入的invoker进行递归包装层invoker(就是一个链表) 4 */ 5 private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { 6 Invoker<T> last = invoker; 7 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); 8 if (filters.size() > 0) { 9 for (int i = filters.size() - 1; i >= 0; i--) { 10 final Filter filter = filters.get(i); 11 final Invoker<T> next = last; 12 last = new Invoker<T>() { 13 14 public Class<T> getInterface() { 15 return invoker.getInterface(); 16 } 17 18 public URL getUrl() { 19 return invoker.getUrl(); 20 } 21 22 public boolean isAvailable() { 23 return invoker.isAvailable(); 24 } 25 26 public Result invoke(Invocation invocation) throws RpcException { 27 return filter.invoke(next, invocation); 28 } 29 30 public void destroy() { 31 invoker.destroy(); 32 } 33 34 @Override 35 public String toString() { 36 return invoker.toString(); 37 } 38 }; 39 } 40 } 41 return last; 42 }
上述方法中最重要的就是Invoker的Result invoke(Invocation invocation),在该方法中,是使用了filter.invoke(next, invocation),而这里的next又可能是另一个filter。这里我们打开一个filter来看一下源码:
1 @Activate(group = Constants.PROVIDER, order = -110000) 2 public class EchoFilter implements Filter { 3 public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { 4 if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) 5 return new RpcResult(inv.getArguments()[0]); 6 return invoker.invoke(inv); 7 } 8 }
可以看到,该filter会调用传入的next的invoke方法。
这里给出被递归包装后的对象:(命名为InvokerDelegete的filter对象)
1 EchoFilter 2 -->ClassLoaderFilter 3 -->GenericFilter 4 -->ContextFilter 5 -->TraceFilter 6 -->TimeoutFilter 7 -->MonitorFilter 8 -->ExceptionFilter 9 -->InvokerDelegete对象
2.3.2 DubboProtocol.export(Invoker<T> InvokerDelegete的filter对象)
/** * 1 从invoker的url中获取将要暴露的远程服务的key:com.alibaba.dubbo.demo.DemoService:20880(实际上是:serviceGroup/serviceName:serviceVersion:port) * 注意:本地暴露的key就是:com.alibaba.dubbo.demo.DemoService * 2 打开ExchangeServer */ public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); return exporter; }
首先从“InvokerDelegete的filter对象”中的url获取key,这段代码很简单,就是获取serviceGroup/serviceName:serviceVersion:port这样形式的一个key,这里最后获取到的是com.alibaba.dubbo.demo.DemoService:20880。
之后创建DubboExporter。
2.3.2.1 DubboExporter<T>(InvokerDelegete的filter对象, "com.alibaba.dubbo.demo.DemoService:20880", exporterMap)
1 public class DubboExporter<T> extends AbstractExporter<T> { 2 //serviceGroup/serviceName:serviceVersion:port, 例如:com.alibaba.dubbo.demo.DemoService:20880 3 private final String key;// 4 //{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 } 5 private final Map<String, Exporter<?>> exporterMap; 6 7 public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { 8 super(invoker); 9 this.key = key; 10 this.exporterMap = exporterMap; 11 } 12 13 @Override 14 public void unexport() { 15 super.unexport(); 16 exporterMap.remove(key); 17 } 18 }
注意这里的exporterMap是引用传递。
父类:
1 public abstract class AbstractExporter<T> implements Exporter<T> { 2 protected final Logger logger = LoggerFactory.getLogger(getClass()); 3 private final Invoker<T> invoker; 4 private volatile boolean unexported = false; 5 6 public AbstractExporter(Invoker<T> invoker) { 7 if (invoker == null) 8 throw new IllegalStateException("service invoker == null"); 9 if (invoker.getInterface() == null) 10 throw new IllegalStateException("service type == null"); 11 if (invoker.getUrl() == null) 12 throw new IllegalStateException("service url == null"); 13 this.invoker = invoker; 14 } 15 16 public Invoker<T> getInvoker() { 17 return invoker; 18 } 19 20 public void unexport() { 21 if (unexported) { 22 return; 23 } 24 unexported = true; 25 getInvoker().destroy(); 26 } 27 28 public String toString() { 29 return getInvoker().toString(); 30 } 31 }
这里,我们把一个“InvokerDelegete的filter对象”赋给了AbstractExporter的Invoker引用,也就是说从exporter中可以获取到invoker。最后在DubboProtocol.export(Invoker<T> invoker)中执行:exporterMap.put(key, exporter); 这样就将{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }存储起来了。
来看一下现在的DubboExporter实例:
2.3.2.2 开启ExchangeServer
1 /** 2 * 从缓存Map<String, ExchangeServer> serverMap中根据"host:port"获取ExchangeServer,如果没有,创建ExchangeServer,之后放入缓存。 3 * @param url 4 */ 5 private void openServer(URL url) { 6 // find server. 7 String key = url.getAddress();//10.10.10.10:20880 8 //client 也可以暴露一个只有server可以调用的服务。 9 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); 10 if (isServer) { 11 ExchangeServer server = serverMap.get(key); 12 if (server == null) { 13 serverMap.put(key, createServer(url)); 14 } else { 15 //server支持reset,配合override功能使用 16 server.reset(url); 17 } 18 } 19 }
首先从provderUrl中获取host:port作为key,之后从缓存serverMap中获取ExchangeServer,如果没有,创建ExchangeServer,最后以如下方式放入缓存:
Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->ExchangeServer实例 }。
创建ExchangeServer:createServer(URL providerUrl)
1 private ExchangeServer createServer(URL url) { 2 //默认开启server关闭时发送readonly事件 3 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); 4 //默认开启heartbeat 5 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 6 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); 7 8 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) 9 throw new RpcException("Unsupported server type: " + str + ", url: " + url); 10 11 url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); 12 ExchangeServer server; 13 try { 14 server = Exchangers.bind(url, requestHandler); 15 } catch (RemotingException e) { 16 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); 17 } 18 str = url.getParameter(Constants.CLIENT_KEY); 19 if (str != null && str.length() > 0) { 20 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); 21 if (!supportedTypes.contains(str)) { 22 throw new RpcException("Unsupported client type: " + str); 23 } 24 } 25 return server; 26 }
首先是在原本providerUrl上添加参数:channel.readonly.sent=true&heartbeat=60000&codec=dubbo(其中的heartbeat参数会在HeaderExchangeServer启动心跳计时器时使用)
之后使用Exchangers.bind("添加参数后的providerUrl", requestHandler)创建ExchangeServer。首先来看一下DubboProtocol#requestHandler。这个类极其重要,后续经过层层包装后,会成为最终netty的服务端逻辑处理器。
1 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { 2 public Object reply(ExchangeChannel channel, Object message) throws RemotingException { 3 if (message instanceof Invocation) { 4 Invocation inv = (Invocation) message; 5 Invoker<?> invoker = getInvoker(channel, inv); 6 //如果是callback 需要处理高版本调用低版本的问题 7 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { 8 String methodsStr = invoker.getUrl().getParameters().get("methods"); 9 boolean hasMethod = false; 10 if (methodsStr == null || methodsStr.indexOf(",") == -1) { 11 hasMethod = inv.getMethodName().equals(methodsStr); 12 } else { 13 String[] methods = methodsStr.split(","); 14 for (String method : methods) { 15 if (inv.getMethodName().equals(method)) { 16 hasMethod = true; 17 break; 18 } 19 } 20 } 21 if (!hasMethod) { 22 logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); 23 return null; 24 } 25 } 26 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); 27 return invoker.invoke(inv); 28 } 29 throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); 30 } 31 32 @Override 33 public void received(Channel channel, Object message) throws RemotingException { 34 if (message instanceof Invocation) { 35 reply((ExchangeChannel) channel, message); 36 } else { 37 super.received(channel, message); 38 } 39 } 40 41 @Override 42 public void connected(Channel channel) throws RemotingException { 43 invoke(channel, Constants.ON_CONNECT_KEY); 44 } 45 46 @Override 47 public void disconnected(Channel channel) throws RemotingException { 48 if (logger.isInfoEnabled()) { 49 logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); 50 } 51 invoke(channel, Constants.ON_DISCONNECT_KEY); 52 } 53 54 private void invoke(Channel channel, String methodKey) { 55 Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); 56 if (invocation != null) { 57 try { 58 received(channel, invocation); 59 } catch (Throwable t) { 60 logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); 61 } 62 } 63 } 64 65 private Invocation createInvocation(Channel channel, URL url, String methodKey) { 66 String method = url.getParameter(methodKey); 67 if (method == null || method.length() == 0) { 68 return null; 69 } 70 RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); 71 invocation.setAttachment(Constants.PATH_KEY, url.getPath()); 72 invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); 73 invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); 74 invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); 75 if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { 76 invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); 77 } 78 return invocation; 79 } 80 };
从上可以看出在该handler中,定义了与客户端连接成功/断开连接/接受到客户端消息/相应消息,以及创造Invocation的方法。其中的getInvoker(Channel channel, Invocation inv)方法简码如下:
1 String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); 2 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); 3 return exporter.getInvoker();
这不就是我们刚刚放置到exporterMap中的DubboExporter,而其中的invoker不就是我们的“filter的invokerdelegete对象”。
更多网易技术、产品、运营经验分享请点击。