服务远程暴露 - 创建Exporter与启动netty服务端(2)

勿忘初心2018-11-18 11:50

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


2.2  创建InvokerDelegete

1 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));

InvokerDelegete是RegistryProtocol的一个静态内部类,该类是一个originInvoker的委托类,该类存储了originInvoker,其父类InvokerWrapper还会存储providerUrl,InvokerWrapper会调用originInvoker的invoke方法,也会销毁invoker。可以管理invoker的生命周期。

 1     public static class InvokerDelegete<T> extends InvokerWrapper<T> {
 2         private final Invoker<T> invoker;
 3 
 4         /**
 5          * @param invoker
 6          * @param url     invoker.getUrl返回此值
 7          */
 8         public InvokerDelegete(Invoker<T> invoker, URL url) {
 9             super(invoker, url);
10             this.invoker = invoker;
11         }
12 
13         public Invoker<T> getInvoker() {
14             if (invoker instanceof InvokerDelegete) {
15                 return ((InvokerDelegete<T>) invoker).getInvoker();
16             } else {
17                 return invoker;
18             }
19         }
20     }

InvokerWrapper的核心代码:

 1 public class InvokerWrapper<T> implements Invoker<T> {
 2     private final Invoker<T> invoker;//originInvoker
 3     private final URL url;//providerUrl
 4 
 5     public InvokerWrapper(Invoker<T> invoker, URL url) {
 6         this.invoker = invoker;
 7         this.url = url;
 8     }
 9 
10     public boolean isAvailable() {
11         return invoker.isAvailable();
12     }
13 
14     public Result invoke(Invocation invocation) throws RpcException {
15         return invoker.invoke(invocation);
16     }
17 
18     public void destroy() {
19         invoker.destroy();
20     }
21 }

这样一个InvokerDelegete对象就创建好了,属性如下:

  • invoker:originInvoker(AbstractProxyInvoker对象)
  • InvokerWrapper.invoker:originInvoker(AbstractProxyInvoker对象)
  • url:providerUrl(dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1035&side=provider&timestamp=1507101286063)

2.3  使用DubboProtocol将InvokerDelegete转换为Exporter

1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)

2.3.1  Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker InvokerDelegete对象)

 1     public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
 4         if (arg0.getUrl() == null)
 5             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
 6         com.alibaba.dubbo.common.URL url = arg0.getUrl();
 7         String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//dubbo
 8         if(extName == null)
 9             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
10         com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
11         return extension.export(arg0);
12     }

该代码再贴最后一遍了。之后调用ProtocolListenerWrapper的ProtocolListenerWrapper.export(Invoker<T> InvokerDelegete),之后调用ProtocolFilterWrapper.export(Invoker<T> InvokerDelegete):首先对InvokerDelegete对象进行8个filter的递归包装,之后使用DubboProtocol对包装后的InvokerDelegete对象进行export。

层层包装的源码:

 1     /**
 2      * 1 根据key从url中获取相应的filter的values,再根据这个values和group去获取类上带有@Active注解的filter集合
 3      * 2 之后将这些filter对传入的invoker进行递归包装层invoker(就是一个链表)
 4      */
 5     private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
 6         Invoker<T> last = invoker;
 7         List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
 8         if (filters.size() > 0) {
 9             for (int i = filters.size() - 1; i >= 0; i--) {
10                 final Filter filter = filters.get(i);
11                 final Invoker<T> next = last;
12                 last = new Invoker<T>() {
13 
14                     public Class<T> getInterface() {
15                         return invoker.getInterface();
16                     }
17 
18                     public URL getUrl() {
19                         return invoker.getUrl();
20                     }
21 
22                     public boolean isAvailable() {
23                         return invoker.isAvailable();
24                     }
25 
26                     public Result invoke(Invocation invocation) throws RpcException {
27                         return filter.invoke(next, invocation);
28                     }
29 
30                     public void destroy() {
31                         invoker.destroy();
32                     }
33 
34                     @Override
35                     public String toString() {
36                         return invoker.toString();
37                     }
38                 };
39             }
40         }
41         return last;
42     }

上述方法中最重要的就是Invoker的Result invoke(Invocation invocation),在该方法中,是使用了filter.invoke(next, invocation),而这里的next又可能是另一个filter。这里我们打开一个filter来看一下源码:

1 @Activate(group = Constants.PROVIDER, order = -110000)
2 public class EchoFilter implements Filter {
3     public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
4         if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
5             return new RpcResult(inv.getArguments()[0]);
6         return invoker.invoke(inv);
7     }
8 }

可以看到,该filter会调用传入的next的invoke方法。

这里给出被递归包装后的对象:(命名为InvokerDelegete的filter对象)

1 EchoFilter
2 -->ClassLoaderFilter
3    -->GenericFilter
4       -->ContextFilter
5          -->TraceFilter
6             -->TimeoutFilter
7                -->MonitorFilter
8                   -->ExceptionFilter
9                      -->InvokerDelegete对象

2.3.2  DubboProtocol.export(Invoker<T> InvokerDelegete的filter对象)

    /**
     * 1 从invoker的url中获取将要暴露的远程服务的key:com.alibaba.dubbo.demo.DemoService:20880(实际上是:serviceGroup/serviceName:serviceVersion:port)
     * 注意:本地暴露的key就是:com.alibaba.dubbo.demo.DemoService
     * 2 打开ExchangeServer
     */
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);

        return exporter;
    }

首先从“InvokerDelegete的filter对象”中的url获取key,这段代码很简单,就是获取serviceGroup/serviceName:serviceVersion:port这样形式的一个key,这里最后获取到的是com.alibaba.dubbo.demo.DemoService:20880。

之后创建DubboExporter。

2.3.2.1 DubboExporter<T>(InvokerDelegete的filter对象, "com.alibaba.dubbo.demo.DemoService:20880", exporterMap)

 1 public class DubboExporter<T> extends AbstractExporter<T> {
 2     //serviceGroup/serviceName:serviceVersion:port, 例如:com.alibaba.dubbo.demo.DemoService:20880
 3     private final String key;//
 4     //{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }
 5     private final Map<String, Exporter<?>> exporterMap;
 6 
 7     public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
 8         super(invoker);
 9         this.key = key;
10         this.exporterMap = exporterMap;
11     }
12 
13     @Override
14     public void unexport() {
15         super.unexport();
16         exporterMap.remove(key);
17     }
18 }

注意这里的exporterMap是引用传递。

父类:

 1 public abstract class AbstractExporter<T> implements Exporter<T> {
 2     protected final Logger logger = LoggerFactory.getLogger(getClass());
 3     private final Invoker<T> invoker;
 4     private volatile boolean unexported = false;
 5 
 6     public AbstractExporter(Invoker<T> invoker) {
 7         if (invoker == null)
 8             throw new IllegalStateException("service invoker == null");
 9         if (invoker.getInterface() == null)
10             throw new IllegalStateException("service type == null");
11         if (invoker.getUrl() == null)
12             throw new IllegalStateException("service url == null");
13         this.invoker = invoker;
14     }
15 
16     public Invoker<T> getInvoker() {
17         return invoker;
18     }
19 
20     public void unexport() {
21         if (unexported) {
22             return;
23         }
24         unexported = true;
25         getInvoker().destroy();
26     }
27 
28     public String toString() {
29         return getInvoker().toString();
30     }
31 }

这里,我们把一个“InvokerDelegete的filter对象”赋给了AbstractExporter的Invoker引用,也就是说从exporter中可以获取到invoker。最后在DubboProtocol.export(Invoker<T> invoker)中执行:exporterMap.put(key, exporter); 这样就将{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }存储起来了。

来看一下现在的DubboExporter实例:

  • key:com.alibaba.dubbo.demo.DemoService:20880
  • invoker:“InvokerDelegete的filter对象”
  • exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }

2.3.2.2 开启ExchangeServer

 1     /**
 2      * 从缓存Map<String, ExchangeServer> serverMap中根据"host:port"获取ExchangeServer,如果没有,创建ExchangeServer,之后放入缓存。
 3      * @param url
 4      */
 5     private void openServer(URL url) {
 6         // find server.
 7         String key = url.getAddress();//10.10.10.10:20880
 8         //client 也可以暴露一个只有server可以调用的服务。
 9         boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
10         if (isServer) {
11             ExchangeServer server = serverMap.get(key);
12             if (server == null) {
13                 serverMap.put(key, createServer(url));
14             } else {
15                 //server支持reset,配合override功能使用
16                 server.reset(url);
17             }
18         }
19     }

首先从provderUrl中获取host:port作为key,之后从缓存serverMap中获取ExchangeServer,如果没有,创建ExchangeServer,最后以如下方式放入缓存:

Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->ExchangeServer实例 }。

创建ExchangeServer:createServer(URL providerUrl)

 1     private ExchangeServer createServer(URL url) {
 2         //默认开启server关闭时发送readonly事件
 3         url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
 4         //默认开启heartbeat
 5         url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 6         String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
 7 
 8         if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
 9             throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10 
11         url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
12         ExchangeServer server;
13         try {
14             server = Exchangers.bind(url, requestHandler);
15         } catch (RemotingException e) {
16             throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17         }
18         str = url.getParameter(Constants.CLIENT_KEY);
19         if (str != null && str.length() > 0) {
20             Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21             if (!supportedTypes.contains(str)) {
22                 throw new RpcException("Unsupported client type: " + str);
23             }
24         }
25         return server;
26     }

首先是在原本providerUrl上添加参数:channel.readonly.sent=true&heartbeat=60000&codec=dubbo(其中的heartbeat参数会在HeaderExchangeServer启动心跳计时器时使用)

之后使用Exchangers.bind("添加参数后的providerUrl", requestHandler)创建ExchangeServer。首先来看一下DubboProtocol#requestHandler。这个类极其重要,后续经过层层包装后,会成为最终netty的服务端逻辑处理器。

 1     private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
 2         public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
 3             if (message instanceof Invocation) {
 4                 Invocation inv = (Invocation) message;
 5                 Invoker<?> invoker = getInvoker(channel, inv);
 6                 //如果是callback 需要处理高版本调用低版本的问题
 7                 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
 8                     String methodsStr = invoker.getUrl().getParameters().get("methods");
 9                     boolean hasMethod = false;
10                     if (methodsStr == null || methodsStr.indexOf(",") == -1) {
11                         hasMethod = inv.getMethodName().equals(methodsStr);
12                     } else {
13                         String[] methods = methodsStr.split(",");
14                         for (String method : methods) {
15                             if (inv.getMethodName().equals(method)) {
16                                 hasMethod = true;
17                                 break;
18                             }
19                         }
20                     }
21                     if (!hasMethod) {
22                         logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
23                         return null;
24                     }
25                 }
26                 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
27                 return invoker.invoke(inv);
28             }
29             throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
30         }
31 
32         @Override
33         public void received(Channel channel, Object message) throws RemotingException {
34             if (message instanceof Invocation) {
35                 reply((ExchangeChannel) channel, message);
36             } else {
37                 super.received(channel, message);
38             }
39         }
40 
41         @Override
42         public void connected(Channel channel) throws RemotingException {
43             invoke(channel, Constants.ON_CONNECT_KEY);
44         }
45 
46         @Override
47         public void disconnected(Channel channel) throws RemotingException {
48             if (logger.isInfoEnabled()) {
49                 logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
50             }
51             invoke(channel, Constants.ON_DISCONNECT_KEY);
52         }
53 
54         private void invoke(Channel channel, String methodKey) {
55             Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
56             if (invocation != null) {
57                 try {
58                     received(channel, invocation);
59                 } catch (Throwable t) {
60                     logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
61                 }
62             }
63         }
64 
65         private Invocation createInvocation(Channel channel, URL url, String methodKey) {
66             String method = url.getParameter(methodKey);
67             if (method == null || method.length() == 0) {
68                 return null;
69             }
70             RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
71             invocation.setAttachment(Constants.PATH_KEY, url.getPath());
72             invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
73             invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
74             invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
75             if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
76                 invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
77             }
78             return invocation;
79         }
80     };

从上可以看出在该handler中,定义了与客户端连接成功/断开连接/接受到客户端消息/相应消息,以及创造Invocation的方法。其中的getInvoker(Channel channel, Invocation inv)方法简码如下:

1         String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
2         DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
3         return exporter.getInvoker();

这不就是我们刚刚放置到exporterMap中的DubboExporter,而其中的invoker不就是我们的“filter的invokerdelegete对象”。


免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击