服务远程暴露 - 创建Exporter与启动netty服务端(3)

勿忘初心2018-11-18 11:52

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


使用Exchangers.bind(providerUrl, ExchangeHandlerAdapter对象)创建ExchangeServer

 1     public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
 2         if (url == null) {
 3             throw new IllegalArgumentException("url == null");
 4         }
 5         if (handler == null) {
 6             throw new IllegalArgumentException("handler == null");
 7         }
 8         url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
 9         return getExchanger(url).bind(url, handler);
10     }
11 
12     public static Exchanger getExchanger(URL url) {
13         String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);//header
14         return getExchanger(type);
15     }
16 
17     public static Exchanger getExchanger(String type) {
18         return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
19     }

getExchanger(URL url)返回一个HeaderExchanger实例。所以ExchangeServer的创建交由HeaderExchanger来实现。

 

HeaderExchanger.bind(providerUrl, ExchangeHandlerAdapter对象) 

1     /**
2      * 1 对handler进行三次包装:首先将ExchangeHandlerAdapter赋给HeaderExchangeHandler中的ExchangeHandler handler属性;然后将创建出来的HeaderExchangeHandler赋给DecodeHandler的父类AbstractChannelHandlerDelegate的ChannelHandler handler属性
3      */
4     public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
5         return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
6     }

说明:

  • 这里首先对传入的ExchangeHandlerAdapter进行了两次包装,最终得到DecodeHandler实例;
  • 之后,使用Transporters.bind(providerUrl, DecodeHandler对象)创建了一个NettyServer;
  • 最后使用HeaderExchangeServer包装了上边的NettyServer,并启动了心跳计数器。
    • HeaderExchangeServer实例也是最终返回的ExchangeServer实例,将最终被存储在Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->HeaderExchangeServer实例 }

包装ExchangeHandlerAdapter,获取DecodeHandler实例。代码比较简单,不列出来了。

最终获取到的DecodeHandler实例的层级关系:

1 DecodeHandler实例
2 -->HeaderExchangeHandler实例
3    -->ExchangeHandlerAdapter实例

 

使用Transporters.bind(providerUrl, DecodeHandler对象)创建了一个NettyServer

Transporters.bind(providerUrl, DecodeHandler对象)

 1     public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
 2         if (url == null) {
 3             throw new IllegalArgumentException("url == null");
 4         }
 5         if (handlers == null || handlers.length == 0) {
 6             throw new IllegalArgumentException("handlers == null");
 7         }
 8         ChannelHandler handler;
 9         if (handlers.length == 1) {
10             handler = handlers[0];
11         } else {
12             handler = new ChannelHandlerDispatcher(handlers);
13         }
14         return getTransporter().bind(url, handler);
15     }
16 
17     public static Transporter getTransporter() {
18         return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
19     }

 

Transporter$Adaptive.bind(providerUrl, DecodeHandler对象)

 1     public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg0;
 5         String extName = url.getParameter("server", url.getParameter("transporter", "netty"));//netty
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
 8         com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
 9         return extension.bind(arg0, arg1);
10     }

 

最后NettyServer的创建由NettyTransporter来创建。

NettyTransporter.bind(providerUrl, DecodeHandler对象)

 1 public class NettyTransporter implements Transporter {
 2     public static final String NAME = "netty";
 3 
 4     public Server bind(URL url, ChannelHandler listener) throws RemotingException {
 5         return new NettyServer(url, listener);
 6     }
 7 
 8     public Client connect(URL url, ChannelHandler listener) throws RemotingException {
 9         return new NettyClient(url, listener);
10     }
11 }

 

new NettyServer(providerUrl, DecodeHandler对象)

1     public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
2         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
3     }

这里首先为providerUrl添加参数:threadname=DubboServerHandler-10.10.10.10:20880(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));

之后,使用ChannelHandlers.wrap(DecodeHandler对象, providerUrl)对DecodeHandler对象进行了三层包装,最终得到MultiMessageHandler实例;

最后调用父类的构造器初始化NettyServer的各个属性,最后启动netty。

先看一下

ChannelHandlers.wrap(DecodeHandler对象, providerUrl)

 1     /**
 2      * 这里又是层层包裹:
 3      * MultiMessageHandler
 4      * --HeartbeatHandler
 5      *   --AllChannelHandler
 6      *     --DecodeHandler
 7      *       --HeaderExchangeHandler
 8      *         --ExchangeHandlerAdapter
 9      * @param handler
10      * @param url
11      * @return
12      */
13     protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
14         return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
15                 .getAdaptiveExtension().dispatch(handler, url)));
16     }

ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()获取到一个Dispatcher$Adaptive适配类。

Dispatcher$Adaptive.dispatch(DecodeHandler对象, providerUrl)

 1     public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) {
 2         if (arg1 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg1;
 5         String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all")));//all
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])");
 8         com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName);
 9         return extension.dispatch(arg0, arg1);
10     }

这里获取到AllDispatcher,Dispatcher决定了dubbo的线程模型,指定了哪些做什么,哪些线程做什么。讲到dubbo通信的时候再写。

AllDispatcher.dispatch(DecodeHandler对象, providerUrl)

1     public ChannelHandler dispatch(ChannelHandler handler, URL url) {
2         return new AllChannelHandler(handler, url);
3     }

new AllChannelHandler(DecodeHandler对象, providerUrl)

1     public AllChannelHandler(ChannelHandler handler, URL url) {
2         super(handler, url);
3     }

来看其父类的WrappedChannelHandler的构造器:

WrappedChannelHandler(DecodeHandler对象, providerUrl)

 1     protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
 2     protected final ExecutorService executor;
 3     protected final ChannelHandler handler;
 4     protected final URL url;
 5 
 6     public WrappedChannelHandler(ChannelHandler handler, URL url) {
 7         this.handler = handler;
 8         this.url = url;
 9         executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
10 
11         String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
12         if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
13             componentKey = Constants.CONSUMER_SIDE;
14         }
15         DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
16         dataStore.put(componentKey, Integer.toString(url.getPort()), executor);//{"java.util.concurrent.ExecutorService":{"20880":executor}}
17     }

首先创建了一个共享线程池:SHARED_EXECUTOR;

之后为handler/url/executor赋值,其中executor是一个200个线程数的fixed线程池(队列为0,即同步队列);

 1     public Executor getExecutor(URL url) {
 2         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);//默认为dubbo,但是我们这里是DubboServerHandler-10.10.10.10:20880(就是之前设置到url上的threadname)
 3         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);//200
 4         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);//0
 5         return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
 6                 queues == 0 ? new SynchronousQueue<Runnable>() :
 7                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
 8                                 : new LinkedBlockingQueue<Runnable>(queues)),
 9                 new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
10     }

之后获取了一个数据存储器:SimpleDataStore;

最后将{"java.util.concurrent.ExecutorService":{"20880": executor}}数据存储在SimpleDataStore的ConcurrentMap<String, ConcurrentMap<String, Object>> data数据结构中。也就是说:每一个端口,有一个线程池。

注意:为什么SimpleDataSource可以做缓存来使用?

 1     public T getExtension(String name) {
 2         if (name == null || name.length() == 0)
 3             throw new IllegalArgumentException("Extension name == null");
 4         if ("true".equals(name)) {
 5             return getDefaultExtension();
 6         }
 7         Holder<Object> holder = cachedInstances.get(name);
 8         if (holder == null) {
 9             cachedInstances.putIfAbsent(name, new Holder<Object>());
10             holder = cachedInstances.get(name);
11         }
12         Object instance = holder.get();
13         if (instance == null) {
14             synchronized (holder) {
15                 instance = holder.get();
16                 if (instance == null) {
17                     instance = createExtension(name);
18                     holder.set(instance);
19                 }
20             }
21         }
22         return (T) instance;
23     }

其实,就是这样SimpleDataStore实例会存储在cachedInstances缓存中,下一次不会再创建,而是直接获取该缓存。

 

这样之后,一个AllChannelHandler实例就完成了,该实例属性如下:

  • WrappedChannelHandler.url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1287&side=provider&threadname=DubboServerHandler-10.10.10.10:20880&timestamp=1507116859919
  • WrappedChannelHandler.handler:DecodeHandler对象
  • WrappedChannelHandler.executor:FixedThreadPool实例

当然还有一个类变量WrappedChannelHandler.SHARED_EXECUTOR=CachedThreadPool实例。

 

之后AllChannelHandler实例会被HeartbeatHandler进行包裹,之后HeartbeatHandler实例又会被MultiMessageHandler所包裹,最后得到的MultiMessageHandler实例的层级结构如下:

1 MultiMessageHandler
2 -->handler: HeartbeatHandler
3    -->handler: AllChannelHandler
4          -->url: providerUrl
5          -->executor: FixedExecutor
6          -->handler: DecodeHandler
7             -->handler: HeaderExchangeHandler
8                -->handler: ExchangeHandlerAdapter

 

MultiMessageHandler实例创建出来之后,NettyServer就开始调用其各个父类进行属性的初始化了。首先来看一下NettyServer的父类层级图:

AbstractServer:

 1     protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
 2     ExecutorService executor;
 3     private InetSocketAddress localAddress;
 4     private InetSocketAddress bindAddress;
 5     private int accepts;
 6     private int idleTimeout = 600;
 7 
 8     public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
 9         super(url, handler);
10         localAddress = getUrl().toInetSocketAddress();
11         String host = url.getParameter(Constants.ANYHOST_KEY, false)
12                 || NetUtils.isInvalidLocalHost(getUrl().getHost())
13                 ? NetUtils.ANYHOST : getUrl().getHost();
14         bindAddress = new InetSocketAddress(host, getUrl().getPort());
15         this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
16         this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
17         try {
18             doOpen();
19             if (logger.isInfoEnabled()) {
20                 logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
21             }
22         } catch (Throwable t) {
23             throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
24                     + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
25         }
26         //fixme replace this with better method
27         DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
28         executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
29     }

首先调用父类初始化属性,之后启动服务。


AbstractEndpoint:

 1     private Codec2 codec;
 2     private int timeout;
 3     private int connectTimeout;
 4 
 5     public AbstractEndpoint(URL url, ChannelHandler handler) {
 6         super(url, handler);
 7         this.codec = getChannelCodec(url);
 8         this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);//1000
 9         this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);//3000
10     }

AbstractPeer:

 1     private final ChannelHandler handler;
 2     private volatile URL url;
 3 
 4     public AbstractPeer(URL url, ChannelHandler handler) {
 5         if (url == null) {
 6             throw new IllegalArgumentException("url == null");
 7         }
 8         if (handler == null) {
 9             throw new IllegalArgumentException("handler == null");
10         }
11         this.url = url;
12         this.handler = handler;
13     }

来看一下最后初始化好的NettyServer实例:

  • url:providerUrl(dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1287&side=provider&timestamp=1507116859919)
  • handler:MultiMessageHandler实例
  • codec:DubboCountCodec实例
  • timeout:1000
  • connectTimeout:3000
  • idleTime:600*1000
  • localAddress:10.10.10.10:20880
  • bindAddress:0.0.0.0:20880
  • accepts:0
  • executor:null(此时的executor还没实例话,要等netty服务起来之后才会从缓存中获取之前存储在SimpleDataStore缓存中的那个200个线程数的FixedThreadPool实例)

 

之后,就要启动netty服务了。

 1     /**
 2      * 启动netty服务,监听客户端连接
 3      */
 4     @Override
 5     protected void doOpen() throws Throwable {
 6         NettyHelper.setNettyLoggerFactory();
 7         ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
 8         ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
 9         ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
10         bootstrap = new ServerBootstrap(channelFactory);
11 
12         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
13         channels = nettyHandler.getChannels();
14         // https://issues.jboss.org/browse/NETTY-365
15         // https://issues.jboss.org/browse/NETTY-379
16         // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
17         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
18             public ChannelPipeline getPipeline() {
19                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
20                 ChannelPipeline pipeline = Channels.pipeline();
21                 /*int idleTimeout = getIdleTimeout();
22                 if (idleTimeout > 10000) {
23                     pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
24                 }*/
25                 pipeline.addLast("decoder", adapter.getDecoder());
26                 pipeline.addLast("encoder", adapter.getEncoder());
27                 pipeline.addLast("handler", nettyHandler);
28                 return pipeline;
29             }
30         });
31         // bind
32         channel = bootstrap.bind(getBindAddress());
33     }

说明:

  • boss线程数默认只有一个;
  • worker线程数:Runtime.getRuntime().availableProcessors() + 1,为计算机核数+1;
  • 服务端逻辑处理器为NettyHandler:
  • 编码器为:InternalEncoder实例,内部使用NettyServer的DubboCountCodec实例来编码
  • 解码器为:InternalDecoder实例,内部使用NettyServer的DubboCountCodec实例来解码

 NettyHandler:

 1 @Sharable
 2 public class NettyHandler extends SimpleChannelHandler {
 3     private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
 4     private final URL url;
 5     private final ChannelHandler handler;
 6 
 7     public NettyHandler(URL url, ChannelHandler handler) {
 8         if (url == null) {
 9             throw new IllegalArgumentException("url == null");
10         }
11         if (handler == null) {
12             throw new IllegalArgumentException("handler == null");
13         }
14         this.url = url;
15         this.handler = handler;
16     }
17 
18     public Map<String, Channel> getChannels() {
19         return channels;
20     }
21 
22     @Override
23     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
24         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
25         try {
26             if (channel != null) {
27                 channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
28             }
29             handler.connected(channel);
30         } finally {
31             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
32         }
33     }
34 
35     @Override
36     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
37         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
38         try {
39             channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()));
40             handler.disconnected(channel);
41         } finally {
42             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
43         }
44     }
45 
46     @Override
47     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
48         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
49         try {
50             handler.received(channel, e.getMessage());
51         } finally {
52             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
53         }
54     }
55 
56     @Override
57     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
58         super.writeRequested(ctx, e);
59         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
60         try {
61             handler.sent(channel, e.getMessage());
62         } finally {
63             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
64         }
65     }
66 
67     @Override
68     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
69         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
70         try {
71             handler.caught(channel, e.getCause());
72         } finally {
73             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
74         }
75     }
76 }

说明:

属性

  • handler:就是当前的NettyServer实例
  • url:providerUrl
  • channels:存放连接到来的channel

监听连接完成/连接断开/接收到消息/发送完消息/异常捕捉事件,之后使用NettyServer实例进行相应的处理,NettyServer又会调用MultiMessageHandler实例(该handler属性位于NettyServer的父类AbstractPeer中)进行处理。

 

在来看编码器和解码器:

NettyCodecAdapter(DubboCountCodec实例, providerUrl, 当前的NettyServer实例)

 1 final class NettyCodecAdapter {
 2     private final ChannelHandler encoder = new InternalEncoder();
 3     private final ChannelHandler decoder = new InternalDecoder();
 4     private final Codec2 codec;
 5     private final URL url;
 6     private final int bufferSize;
 7     private final com.alibaba.dubbo.remoting.ChannelHandler handler;
 8 
 9     public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
10         this.codec = codec;
11         this.url = url;
12         this.handler = handler;
13         int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);//8*1024
14         this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;//8*1024
15     }
16 
17     public ChannelHandler getEncoder() {
18         return encoder;
19     }
20 
21     public ChannelHandler getDecoder() {
22         return decoder;
23     }
24 
25     @Sharable
26     private class InternalEncoder extends OneToOneEncoder {
27         @Override
28         protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
29             ...
30             codec.encode(channel, buffer, msg);
31             ...
32         }
33     }
34 
35     private class InternalDecoder extends SimpleChannelUpstreamHandler {
36         @Override
37         public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
38            ...
39             msg = codec.decode(channel, message);
40            ...
41         }
42         ...
43     }
44 }

只列出核心代码:可以看到,InternalEncoder实例和InternalDecoder实例内部还是使用NettyServer的DubboCountCodec实例来编解码的。dubbo的编解码做的非常好,后续会写。


免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击