勿忘初心2018-11-18 11:50此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
2.2 创建InvokerDelegete
1 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
InvokerDelegete是RegistryProtocol的一个静态内部类,该类是一个originInvoker的委托类,该类存储了originInvoker,其父类InvokerWrapper还会存储providerUrl,InvokerWrapper会调用originInvoker的invoke方法,也会销毁invoker。可以管理invoker的生命周期。
1 public static class InvokerDelegete<T> extends InvokerWrapper<T> {
2 private final Invoker<T> invoker;
3
4 /**
5 * @param invoker
6 * @param url invoker.getUrl返回此值
7 */
8 public InvokerDelegete(Invoker<T> invoker, URL url) {
9 super(invoker, url);
10 this.invoker = invoker;
11 }
12
13 public Invoker<T> getInvoker() {
14 if (invoker instanceof InvokerDelegete) {
15 return ((InvokerDelegete<T>) invoker).getInvoker();
16 } else {
17 return invoker;
18 }
19 }
20 }
InvokerWrapper的核心代码:
1 public class InvokerWrapper<T> implements Invoker<T> {
2 private final Invoker<T> invoker;//originInvoker
3 private final URL url;//providerUrl
4
5 public InvokerWrapper(Invoker<T> invoker, URL url) {
6 this.invoker = invoker;
7 this.url = url;
8 }
9
10 public boolean isAvailable() {
11 return invoker.isAvailable();
12 }
13
14 public Result invoke(Invocation invocation) throws RpcException {
15 return invoker.invoke(invocation);
16 }
17
18 public void destroy() {
19 invoker.destroy();
20 }
21 }
这样一个InvokerDelegete对象就创建好了,属性如下:
2.3 使用DubboProtocol将InvokerDelegete转换为Exporter
1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
2.3.1 Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker InvokerDelegete对象)
1 public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
2 if (arg0 == null)
3 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
4 if (arg0.getUrl() == null)
5 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
6 com.alibaba.dubbo.common.URL url = arg0.getUrl();
7 String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//dubbo
8 if(extName == null)
9 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
10 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
11 return extension.export(arg0);
12 }
该代码再贴最后一遍了。之后调用ProtocolListenerWrapper的ProtocolListenerWrapper.export(Invoker<T> InvokerDelegete),之后调用ProtocolFilterWrapper.export(Invoker<T> InvokerDelegete):首先对InvokerDelegete对象进行8个filter的递归包装,之后使用DubboProtocol对包装后的InvokerDelegete对象进行export。
层层包装的源码:
1 /**
2 * 1 根据key从url中获取相应的filter的values,再根据这个values和group去获取类上带有@Active注解的filter集合
3 * 2 之后将这些filter对传入的invoker进行递归包装层invoker(就是一个链表)
4 */
5 private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
6 Invoker<T> last = invoker;
7 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
8 if (filters.size() > 0) {
9 for (int i = filters.size() - 1; i >= 0; i--) {
10 final Filter filter = filters.get(i);
11 final Invoker<T> next = last;
12 last = new Invoker<T>() {
13
14 public Class<T> getInterface() {
15 return invoker.getInterface();
16 }
17
18 public URL getUrl() {
19 return invoker.getUrl();
20 }
21
22 public boolean isAvailable() {
23 return invoker.isAvailable();
24 }
25
26 public Result invoke(Invocation invocation) throws RpcException {
27 return filter.invoke(next, invocation);
28 }
29
30 public void destroy() {
31 invoker.destroy();
32 }
33
34 @Override
35 public String toString() {
36 return invoker.toString();
37 }
38 };
39 }
40 }
41 return last;
42 }
上述方法中最重要的就是Invoker的Result invoke(Invocation invocation),在该方法中,是使用了filter.invoke(next, invocation),而这里的next又可能是另一个filter。这里我们打开一个filter来看一下源码:
1 @Activate(group = Constants.PROVIDER, order = -110000)
2 public class EchoFilter implements Filter {
3 public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
4 if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
5 return new RpcResult(inv.getArguments()[0]);
6 return invoker.invoke(inv);
7 }
8 }
可以看到,该filter会调用传入的next的invoke方法。
这里给出被递归包装后的对象:(命名为InvokerDelegete的filter对象)
1 EchoFilter 2 -->ClassLoaderFilter 3 -->GenericFilter 4 -->ContextFilter 5 -->TraceFilter 6 -->TimeoutFilter 7 -->MonitorFilter 8 -->ExceptionFilter 9 -->InvokerDelegete对象
2.3.2 DubboProtocol.export(Invoker<T> InvokerDelegete的filter对象)
/**
* 1 从invoker的url中获取将要暴露的远程服务的key:com.alibaba.dubbo.demo.DemoService:20880(实际上是:serviceGroup/serviceName:serviceVersion:port)
* 注意:本地暴露的key就是:com.alibaba.dubbo.demo.DemoService
* 2 打开ExchangeServer
*/
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
首先从“InvokerDelegete的filter对象”中的url获取key,这段代码很简单,就是获取serviceGroup/serviceName:serviceVersion:port这样形式的一个key,这里最后获取到的是com.alibaba.dubbo.demo.DemoService:20880。
之后创建DubboExporter。
2.3.2.1 DubboExporter<T>(InvokerDelegete的filter对象, "com.alibaba.dubbo.demo.DemoService:20880", exporterMap)
1 public class DubboExporter<T> extends AbstractExporter<T> {
2 //serviceGroup/serviceName:serviceVersion:port, 例如:com.alibaba.dubbo.demo.DemoService:20880
3 private final String key;//
4 //{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }
5 private final Map<String, Exporter<?>> exporterMap;
6
7 public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
8 super(invoker);
9 this.key = key;
10 this.exporterMap = exporterMap;
11 }
12
13 @Override
14 public void unexport() {
15 super.unexport();
16 exporterMap.remove(key);
17 }
18 }
注意这里的exporterMap是引用传递。
父类:
1 public abstract class AbstractExporter<T> implements Exporter<T> {
2 protected final Logger logger = LoggerFactory.getLogger(getClass());
3 private final Invoker<T> invoker;
4 private volatile boolean unexported = false;
5
6 public AbstractExporter(Invoker<T> invoker) {
7 if (invoker == null)
8 throw new IllegalStateException("service invoker == null");
9 if (invoker.getInterface() == null)
10 throw new IllegalStateException("service type == null");
11 if (invoker.getUrl() == null)
12 throw new IllegalStateException("service url == null");
13 this.invoker = invoker;
14 }
15
16 public Invoker<T> getInvoker() {
17 return invoker;
18 }
19
20 public void unexport() {
21 if (unexported) {
22 return;
23 }
24 unexported = true;
25 getInvoker().destroy();
26 }
27
28 public String toString() {
29 return getInvoker().toString();
30 }
31 }
这里,我们把一个“InvokerDelegete的filter对象”赋给了AbstractExporter的Invoker引用,也就是说从exporter中可以获取到invoker。最后在DubboProtocol.export(Invoker<T> invoker)中执行:exporterMap.put(key, exporter); 这样就将{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }存储起来了。
来看一下现在的DubboExporter实例:
2.3.2.2 开启ExchangeServer
1 /**
2 * 从缓存Map<String, ExchangeServer> serverMap中根据"host:port"获取ExchangeServer,如果没有,创建ExchangeServer,之后放入缓存。
3 * @param url
4 */
5 private void openServer(URL url) {
6 // find server.
7 String key = url.getAddress();//10.10.10.10:20880
8 //client 也可以暴露一个只有server可以调用的服务。
9 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
10 if (isServer) {
11 ExchangeServer server = serverMap.get(key);
12 if (server == null) {
13 serverMap.put(key, createServer(url));
14 } else {
15 //server支持reset,配合override功能使用
16 server.reset(url);
17 }
18 }
19 }
首先从provderUrl中获取host:port作为key,之后从缓存serverMap中获取ExchangeServer,如果没有,创建ExchangeServer,最后以如下方式放入缓存:
Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->ExchangeServer实例 }。
创建ExchangeServer:createServer(URL providerUrl)
1 private ExchangeServer createServer(URL url) {
2 //默认开启server关闭时发送readonly事件
3 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
4 //默认开启heartbeat
5 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
6 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
7
8 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
9 throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10
11 url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
12 ExchangeServer server;
13 try {
14 server = Exchangers.bind(url, requestHandler);
15 } catch (RemotingException e) {
16 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17 }
18 str = url.getParameter(Constants.CLIENT_KEY);
19 if (str != null && str.length() > 0) {
20 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21 if (!supportedTypes.contains(str)) {
22 throw new RpcException("Unsupported client type: " + str);
23 }
24 }
25 return server;
26 }
首先是在原本providerUrl上添加参数:channel.readonly.sent=true&heartbeat=60000&codec=dubbo(其中的heartbeat参数会在HeaderExchangeServer启动心跳计时器时使用)
之后使用Exchangers.bind("添加参数后的providerUrl", requestHandler)创建ExchangeServer。首先来看一下DubboProtocol#requestHandler。这个类极其重要,后续经过层层包装后,会成为最终netty的服务端逻辑处理器。
1 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
2 public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
3 if (message instanceof Invocation) {
4 Invocation inv = (Invocation) message;
5 Invoker<?> invoker = getInvoker(channel, inv);
6 //如果是callback 需要处理高版本调用低版本的问题
7 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
8 String methodsStr = invoker.getUrl().getParameters().get("methods");
9 boolean hasMethod = false;
10 if (methodsStr == null || methodsStr.indexOf(",") == -1) {
11 hasMethod = inv.getMethodName().equals(methodsStr);
12 } else {
13 String[] methods = methodsStr.split(",");
14 for (String method : methods) {
15 if (inv.getMethodName().equals(method)) {
16 hasMethod = true;
17 break;
18 }
19 }
20 }
21 if (!hasMethod) {
22 logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
23 return null;
24 }
25 }
26 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
27 return invoker.invoke(inv);
28 }
29 throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
30 }
31
32 @Override
33 public void received(Channel channel, Object message) throws RemotingException {
34 if (message instanceof Invocation) {
35 reply((ExchangeChannel) channel, message);
36 } else {
37 super.received(channel, message);
38 }
39 }
40
41 @Override
42 public void connected(Channel channel) throws RemotingException {
43 invoke(channel, Constants.ON_CONNECT_KEY);
44 }
45
46 @Override
47 public void disconnected(Channel channel) throws RemotingException {
48 if (logger.isInfoEnabled()) {
49 logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
50 }
51 invoke(channel, Constants.ON_DISCONNECT_KEY);
52 }
53
54 private void invoke(Channel channel, String methodKey) {
55 Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
56 if (invocation != null) {
57 try {
58 received(channel, invocation);
59 } catch (Throwable t) {
60 logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
61 }
62 }
63 }
64
65 private Invocation createInvocation(Channel channel, URL url, String methodKey) {
66 String method = url.getParameter(methodKey);
67 if (method == null || method.length() == 0) {
68 return null;
69 }
70 RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
71 invocation.setAttachment(Constants.PATH_KEY, url.getPath());
72 invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
73 invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
74 invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
75 if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
76 invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
77 }
78 return invocation;
79 }
80 };
从上可以看出在该handler中,定义了与客户端连接成功/断开连接/接受到客户端消息/相应消息,以及创造Invocation的方法。其中的getInvoker(Channel channel, Invocation inv)方法简码如下:
1 String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); 2 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); 3 return exporter.getInvoker();
这不就是我们刚刚放置到exporterMap中的DubboExporter,而其中的invoker不就是我们的“filter的invokerdelegete对象”。
更多网易技术、产品、运营经验分享请点击。