此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
使用Exchangers.bind(providerUrl, ExchangeHandlerAdapter对象)创建ExchangeServer
1 public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 if (handler == null) { 6 throw new IllegalArgumentException("handler == null"); 7 } 8 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); 9 return getExchanger(url).bind(url, handler); 10 } 11 12 public static Exchanger getExchanger(URL url) { 13 String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);//header 14 return getExchanger(type); 15 } 16 17 public static Exchanger getExchanger(String type) { 18 return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); 19 }
getExchanger(URL url)返回一个HeaderExchanger实例。所以ExchangeServer的创建交由HeaderExchanger来实现。
HeaderExchanger.bind(providerUrl, ExchangeHandlerAdapter对象)
1 /** 2 * 1 对handler进行三次包装:首先将ExchangeHandlerAdapter赋给HeaderExchangeHandler中的ExchangeHandler handler属性;然后将创建出来的HeaderExchangeHandler赋给DecodeHandler的父类AbstractChannelHandlerDelegate的ChannelHandler handler属性 3 */ 4 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 5 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 6 }
说明:
包装ExchangeHandlerAdapter,获取DecodeHandler实例。代码比较简单,不列出来了。
最终获取到的DecodeHandler实例的层级关系:
1 DecodeHandler实例 2 -->HeaderExchangeHandler实例 3 -->ExchangeHandlerAdapter实例
使用Transporters.bind(providerUrl, DecodeHandler对象)创建了一个NettyServer
Transporters.bind(providerUrl, DecodeHandler对象)
1 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 if (handlers == null || handlers.length == 0) { 6 throw new IllegalArgumentException("handlers == null"); 7 } 8 ChannelHandler handler; 9 if (handlers.length == 1) { 10 handler = handlers[0]; 11 } else { 12 handler = new ChannelHandlerDispatcher(handlers); 13 } 14 return getTransporter().bind(url, handler); 15 } 16 17 public static Transporter getTransporter() { 18 return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); 19 }
Transporter$Adaptive.bind(providerUrl, DecodeHandler对象)
1 public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException { 2 if (arg0 == null) 3 throw new IllegalArgumentException("url == null"); 4 com.alibaba.dubbo.common.URL url = arg0; 5 String extName = url.getParameter("server", url.getParameter("transporter", "netty"));//netty 6 if(extName == null) 7 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); 8 com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); 9 return extension.bind(arg0, arg1); 10 }
最后NettyServer的创建由NettyTransporter来创建。
NettyTransporter.bind(providerUrl, DecodeHandler对象)
1 public class NettyTransporter implements Transporter { 2 public static final String NAME = "netty"; 3 4 public Server bind(URL url, ChannelHandler listener) throws RemotingException { 5 return new NettyServer(url, listener); 6 } 7 8 public Client connect(URL url, ChannelHandler listener) throws RemotingException { 9 return new NettyClient(url, listener); 10 } 11 }
new NettyServer(providerUrl, DecodeHandler对象)
1 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { 2 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); 3 }
这里首先为providerUrl添加参数:threadname=DubboServerHandler-10.10.10.10:20880(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));
之后,使用ChannelHandlers.wrap(DecodeHandler对象, providerUrl)对DecodeHandler对象进行了三层包装,最终得到MultiMessageHandler实例;
最后调用父类的构造器初始化NettyServer的各个属性,最后启动netty。
先看一下
ChannelHandlers.wrap(DecodeHandler对象, providerUrl)
1 /** 2 * 这里又是层层包裹: 3 * MultiMessageHandler 4 * --HeartbeatHandler 5 * --AllChannelHandler 6 * --DecodeHandler 7 * --HeaderExchangeHandler 8 * --ExchangeHandlerAdapter 9 * @param handler 10 * @param url 11 * @return 12 */ 13 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { 14 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) 15 .getAdaptiveExtension().dispatch(handler, url))); 16 }
ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()获取到一个Dispatcher$Adaptive适配类。
Dispatcher$Adaptive.dispatch(DecodeHandler对象, providerUrl)
1 public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) { 2 if (arg1 == null) 3 throw new IllegalArgumentException("url == null"); 4 com.alibaba.dubbo.common.URL url = arg1; 5 String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all")));//all 6 if(extName == null) 7 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])"); 8 com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName); 9 return extension.dispatch(arg0, arg1); 10 }
这里获取到AllDispatcher,Dispatcher决定了dubbo的线程模型,指定了哪些做什么,哪些线程做什么。讲到dubbo通信的时候再写。
AllDispatcher.dispatch(DecodeHandler对象, providerUrl)
1 public ChannelHandler dispatch(ChannelHandler handler, URL url) { 2 return new AllChannelHandler(handler, url); 3 }
new AllChannelHandler(DecodeHandler对象, providerUrl)
1 public AllChannelHandler(ChannelHandler handler, URL url) { 2 super(handler, url); 3 }
来看其父类的WrappedChannelHandler的构造器:
WrappedChannelHandler(DecodeHandler对象, providerUrl)
1 protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); 2 protected final ExecutorService executor; 3 protected final ChannelHandler handler; 4 protected final URL url; 5 6 public WrappedChannelHandler(ChannelHandler handler, URL url) { 7 this.handler = handler; 8 this.url = url; 9 executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); 10 11 String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; 12 if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { 13 componentKey = Constants.CONSUMER_SIDE; 14 } 15 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); 16 dataStore.put(componentKey, Integer.toString(url.getPort()), executor);//{"java.util.concurrent.ExecutorService":{"20880":executor}} 17 }
首先创建了一个共享线程池:SHARED_EXECUTOR;
之后为handler/url/executor赋值,其中executor是一个200个线程数的fixed线程池(队列为0,即同步队列);
1 public Executor getExecutor(URL url) { 2 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);//默认为dubbo,但是我们这里是DubboServerHandler-10.10.10.10:20880(就是之前设置到url上的threadname) 3 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);//200 4 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);//0 5 return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 6 queues == 0 ? new SynchronousQueue<Runnable>() : 7 (queues < 0 ? new LinkedBlockingQueue<Runnable>() 8 : new LinkedBlockingQueue<Runnable>(queues)), 9 new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); 10 }
之后获取了一个数据存储器:SimpleDataStore;
最后将{"java.util.concurrent.ExecutorService":{"20880": executor}}数据存储在SimpleDataStore的ConcurrentMap<String, ConcurrentMap<String, Object>> data数据结构中。也就是说:每一个端口,有一个线程池。
注意:为什么SimpleDataSource可以做缓存来使用?
1 public T getExtension(String name) { 2 if (name == null || name.length() == 0) 3 throw new IllegalArgumentException("Extension name == null"); 4 if ("true".equals(name)) { 5 return getDefaultExtension(); 6 } 7 Holder<Object> holder = cachedInstances.get(name); 8 if (holder == null) { 9 cachedInstances.putIfAbsent(name, new Holder<Object>()); 10 holder = cachedInstances.get(name); 11 } 12 Object instance = holder.get(); 13 if (instance == null) { 14 synchronized (holder) { 15 instance = holder.get(); 16 if (instance == null) { 17 instance = createExtension(name); 18 holder.set(instance); 19 } 20 } 21 } 22 return (T) instance; 23 }
其实,就是这样SimpleDataStore实例会存储在cachedInstances缓存中,下一次不会再创建,而是直接获取该缓存。
这样之后,一个AllChannelHandler实例就完成了,该实例属性如下:
当然还有一个类变量WrappedChannelHandler.SHARED_EXECUTOR=CachedThreadPool实例。
之后AllChannelHandler实例会被HeartbeatHandler进行包裹,之后HeartbeatHandler实例又会被MultiMessageHandler所包裹,最后得到的MultiMessageHandler实例的层级结构如下:
1 MultiMessageHandler 2 -->handler: HeartbeatHandler 3 -->handler: AllChannelHandler 4 -->url: providerUrl 5 -->executor: FixedExecutor 6 -->handler: DecodeHandler 7 -->handler: HeaderExchangeHandler 8 -->handler: ExchangeHandlerAdapter
MultiMessageHandler实例创建出来之后,NettyServer就开始调用其各个父类进行属性的初始化了。首先来看一下NettyServer的父类层级图:
AbstractServer:
1 protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler"; 2 ExecutorService executor; 3 private InetSocketAddress localAddress; 4 private InetSocketAddress bindAddress; 5 private int accepts; 6 private int idleTimeout = 600; 7 8 public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { 9 super(url, handler); 10 localAddress = getUrl().toInetSocketAddress(); 11 String host = url.getParameter(Constants.ANYHOST_KEY, false) 12 || NetUtils.isInvalidLocalHost(getUrl().getHost()) 13 ? NetUtils.ANYHOST : getUrl().getHost(); 14 bindAddress = new InetSocketAddress(host, getUrl().getPort()); 15 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); 16 this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); 17 try { 18 doOpen(); 19 if (logger.isInfoEnabled()) { 20 logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); 21 } 22 } catch (Throwable t) { 23 throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 24 + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); 25 } 26 //fixme replace this with better method 27 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); 28 executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); 29 }
首先调用父类初始化属性,之后启动服务。
AbstractEndpoint:
1 private Codec2 codec; 2 private int timeout; 3 private int connectTimeout; 4 5 public AbstractEndpoint(URL url, ChannelHandler handler) { 6 super(url, handler); 7 this.codec = getChannelCodec(url); 8 this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);//1000 9 this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);//3000 10 }
AbstractPeer:
1 private final ChannelHandler handler; 2 private volatile URL url; 3 4 public AbstractPeer(URL url, ChannelHandler handler) { 5 if (url == null) { 6 throw new IllegalArgumentException("url == null"); 7 } 8 if (handler == null) { 9 throw new IllegalArgumentException("handler == null"); 10 } 11 this.url = url; 12 this.handler = handler; 13 }
来看一下最后初始化好的NettyServer实例:
之后,就要启动netty服务了。
1 /** 2 * 启动netty服务,监听客户端连接 3 */ 4 @Override 5 protected void doOpen() throws Throwable { 6 NettyHelper.setNettyLoggerFactory(); 7 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); 8 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); 9 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); 10 bootstrap = new ServerBootstrap(channelFactory); 11 12 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); 13 channels = nettyHandler.getChannels(); 14 // https://issues.jboss.org/browse/NETTY-365 15 // https://issues.jboss.org/browse/NETTY-379 16 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); 17 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 18 public ChannelPipeline getPipeline() { 19 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); 20 ChannelPipeline pipeline = Channels.pipeline(); 21 /*int idleTimeout = getIdleTimeout(); 22 if (idleTimeout > 10000) { 23 pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); 24 }*/ 25 pipeline.addLast("decoder", adapter.getDecoder()); 26 pipeline.addLast("encoder", adapter.getEncoder()); 27 pipeline.addLast("handler", nettyHandler); 28 return pipeline; 29 } 30 }); 31 // bind 32 channel = bootstrap.bind(getBindAddress()); 33 }
说明:
NettyHandler:
1 @Sharable 2 public class NettyHandler extends SimpleChannelHandler { 3 private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel> 4 private final URL url; 5 private final ChannelHandler handler; 6 7 public NettyHandler(URL url, ChannelHandler handler) { 8 if (url == null) { 9 throw new IllegalArgumentException("url == null"); 10 } 11 if (handler == null) { 12 throw new IllegalArgumentException("handler == null"); 13 } 14 this.url = url; 15 this.handler = handler; 16 } 17 18 public Map<String, Channel> getChannels() { 19 return channels; 20 } 21 22 @Override 23 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 24 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 25 try { 26 if (channel != null) { 27 channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel); 28 } 29 handler.connected(channel); 30 } finally { 31 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 32 } 33 } 34 35 @Override 36 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 37 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 38 try { 39 channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress())); 40 handler.disconnected(channel); 41 } finally { 42 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 43 } 44 } 45 46 @Override 47 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 48 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 49 try { 50 handler.received(channel, e.getMessage()); 51 } finally { 52 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 53 } 54 } 55 56 @Override 57 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 58 super.writeRequested(ctx, e); 59 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 60 try { 61 handler.sent(channel, e.getMessage()); 62 } finally { 63 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 64 } 65 } 66 67 @Override 68 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 69 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 70 try { 71 handler.caught(channel, e.getCause()); 72 } finally { 73 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 74 } 75 } 76 }
说明:
属性
监听连接完成/连接断开/接收到消息/发送完消息/异常捕捉事件,之后使用NettyServer实例进行相应的处理,NettyServer又会调用MultiMessageHandler实例(该handler属性位于NettyServer的父类AbstractPeer中)进行处理。
在来看编码器和解码器:
NettyCodecAdapter(DubboCountCodec实例, providerUrl, 当前的NettyServer实例)
1 final class NettyCodecAdapter { 2 private final ChannelHandler encoder = new InternalEncoder(); 3 private final ChannelHandler decoder = new InternalDecoder(); 4 private final Codec2 codec; 5 private final URL url; 6 private final int bufferSize; 7 private final com.alibaba.dubbo.remoting.ChannelHandler handler; 8 9 public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) { 10 this.codec = codec; 11 this.url = url; 12 this.handler = handler; 13 int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);//8*1024 14 this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;//8*1024 15 } 16 17 public ChannelHandler getEncoder() { 18 return encoder; 19 } 20 21 public ChannelHandler getDecoder() { 22 return decoder; 23 } 24 25 @Sharable 26 private class InternalEncoder extends OneToOneEncoder { 27 @Override 28 protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception { 29 ... 30 codec.encode(channel, buffer, msg); 31 ... 32 } 33 } 34 35 private class InternalDecoder extends SimpleChannelUpstreamHandler { 36 @Override 37 public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { 38 ... 39 msg = codec.decode(channel, message); 40 ... 41 } 42 ... 43 } 44 }
只列出核心代码:可以看到,InternalEncoder实例和InternalDecoder实例内部还是使用NettyServer的DubboCountCodec实例来编解码的。dubbo的编解码做的非常好,后续会写。
更多网易技术、产品、运营经验分享请点击。