dubbo源码浅读(中篇)

服务注册到注册中心
服务起好之后,接下来是把提供者注册到注册中心,和注册中心建立联系。回到RegistryProtocol.export 方法,框架由RegistryFactory 加载出一个Registry 组件来处理注册中心相关的逻辑。
registryFactory 是啥时候设置进来的?dubbo 框架在创建组件实例的时候,它会扫描所有的set 方法,如果set 方法的参数类型是可扩展的组件类型,会加载对应的扩展点实例并设置进去。
com.alibaba.dubbo.common.extension.ExtensionLoader
Dubbo 内置支持zookeeper redis 、广播注册中心。实例配置的zookeeper 注册中心,对应ZookeeperRegistry 。提供者会在zookeeper 注册中心生成一个节点,节点路径为/dubbo/ interfaceClass/providers/{providerUrl} ,存储了服务提供方ip 、端口、group 、接口名称、版本、应用名称(redis 注册中心通过set 命令把这些信息缓存到redis 服务器)。
com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doRegister(URL)
为了感知注册中心的一些配置变化,提供者会监听注册中心路径 /dubbo/${interfaceClass}/configurators 的节点,监听该节点在注册中心的一些配置信息变更。 zookeeper 注册中心通过 zookeeper 框架的监听回调接口进行监听( redis 注册中心通过订阅命令( subscribe )监听),服务器缓存注册中心的配置,当配置发生变更时,服务会刷新本地缓存,代码在 ZookeeperRegistry doSubscribe 方法。
上面是服务提供者在初始化时做的一些主要动作,其实框架做的事远比这多了,先暂时先忽略一些细节,专项功能有时间再逐个击破。
4.服务消费者初始化
在分析标签解析的时候知道框架会把dubbo:reference 解析成一个ReferenceBean ,它是一个FactoryBean ,消费者的初始化在它的init 方法中执行,这个方法在两种情况下会被调用:
  1. 消费者设置了立即初始化(init属性设置成true),那么bean加载时会立刻调用消费者初始化。
  2. 消费者bean被使用者调用时,调用getObject->get->init
消费者初始化时主要做的事情就是引用对应的远程服务,执行了以下步骤:
  1. 监听注册中心
  2. 连接服务提供端
  3. 创建消费端服务代理
监听注册中心
消费者在初始化时也会生成一个registryUrl ,消费端的信息:side=consumer dubbo 版本、时间戳、应用名等拼成query 串放在registryUrl refer 参数中,消费者初始化时引用远程服务也由Protocol 组件来处理,加载自适应的Protocol 实现:
com.alibaba.dubbo.config.ReferenceConfig.createProxy(Map<String, String>)
传入的url 参数是注册中心URL ,同样地也会加载到ProtocolListenerWrapper ProtocolFilterWrapper 这两个Wrapper 扩展点,然后依次调用ProtocolListenerWrapper->ProtocolFilterWrapper->RegistryProtocol 实例的refer 方法:
registry 协议的url 两个wrapper 不会进行任何处理直接进入RegistryProtocol.refer 。在这个方法中先把消费者注册到注册中心,zookeeper注册中心 ZookeeperRegistry来 处理,在zookeeper 服务器上生成一个节点,节点路径是/dubbo/ interfaceName/customs/{customUrl} ,存储了服务消费方ip group 、接口名称、版本、应用名称等,customUrl consumer://10.240.176.159/com.netease.haitao.payment.remote.api.coupon.SendCouponRemoteApi?application=… 这种形式的,在注册之前生成,把本机ip 、接口名称等添加到URL 中:
com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL)
消费端本地会缓存远程服务提供者(每个提供者对应一个Invoker 对象)、注册中心配置、路由配置信息。监听注册中心路径是/dubbo/${interfaceClass}/providers /dubbo/ interfaceClass/configurators,/dubbo/{interfaceClass}/routers 的节点,当提供者、配置、路由信息发生变化之后注册中心会通知消费者刷新本地缓存。Dubbo 框架通过在消费端缓存提供者的信息消除对注册中心的强依赖,即使注册中心挂了服务依然可用。
com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL)


框架为每个接口消费者创建一个RegistryDirectory 对象,缓存接口所有的提供端Invoker 以及注册中心接口相关的接口配置configurators ,服务提供者Invoker 保存在RegistryDirectory methodInvokerMap 中,key 是方法名称或者* ,因为大系统中同一个服务一般会由多台服务器提供,所以value 是一个Invoker 列表存储该服务接口的所有提供者,一般情况下,假如服务接口中有a b 两个接口,那么map 中的内容是:{a=invokers,b=invokers,*=invokers} ,其中invokers 列表中的内容相同,服务被调用时会根据方法名称从这个map 中查找提供者对应Invoker 。当providers 节点发生变化时通知消费端更新缓存的providers 对应的Invoker 列表。
com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(List<URL>)


接下来要给每个提供者创建一个对应的Invoker refreshInvoker 方法调用toInvokers ,在toInvokers 中加载自适应Protocol 实现,并且调用它的refer 方法,此时url 参数是从注册中心获取到的携带提供者服务信息的providerUrl ,根据扩展点加载规则,会依次调用ProtocolListenerWrapper->ProtocolFilterWrapper-> DubboProtocol refer 方法,在这两个Wrapper 中添加对应的InvokerListener 并且构建Invoker-Filter 链,最后在DubboProtocol.refer 中创建一个DubboInvoker 对象,该Invoker 对象持有服务Class providerUrl 、负责和提供端通信的ExchangeClient Invoker 对象保存在DubboProtocol invokers 集合中。
com.alibaba.dubbo.registry.integration.RegistryDirectory.toInvokers(List<URL>)


连接服务提供端
构建DubboInvoker 时,会构建一个或多个ExchangeClient 用来处理和提供端的连接,默认情况下一个url 只会创建一个ExchangeClient 负责和对应的提供端建立连接,如果应用方配了多个connections 会创建多个。如果lazy 属性没有设置成true (默认false ),则此时ExchangeClient 会马上和服务端建立连接,此时组件调用顺序:Exchanger.connect->Transporter.connect ->Client ,最终会调用Netty 框架和提供端建立连接,创建NettyClient 对象。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol


NettyServer 一样,NettyClient 在也会注册IO 事件处理链NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler Netty 框架,分别负责请求响应编解码、响应处理。
com.alibaba.dubbo.remoting.transport.netty.NettyClient.doOpen()


创建消费端服务代理
回到 RegistryProtocol .doRefer 方法的最后,由 Cluster 组件来创建一个 Invoker 并返回,此处的 Cluster 也是自适应的实现,示例配置的容错模式是 failfast ,返回扩展链 MockClusterWrapper->FailfastCluster (默认是 failover ),经过扩展链处理创建 MockClusterInvoker->FailfastClusterInvoker 对象链, FailfastClusterInvoker directory 属性引用上面创建的 RegistryDirectory 对象。


返回生成的Invoker 实例之后,由ProxyFactory 生成一个持有该Invoker 实例的代理,代理回调时会激活该Invoker MockClusterWrapper 类型)调用它的invoke 方法:
com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory



该代理会被注册到spring IO 容器中,之后业务中从容器中获取消费者bean 时容器会返回这个代理。
5.远程服务调用流程
消费端调用远程服务接口时,使用上和调用普通的java 接口是没有任何区别,但是服务消费者和提供者是跨JVM 和主机的,客户端如何封装请求让服务端理解请求并且解析服务端返回的接口调用结果,服务端如何解析客户端的请求并且向客户端返回调用结果,这些框架是如何实现的,下面就来看下这部分的代码。
消费端调用提供端服务的过程要执行下面几个步骤:
  1. 消费端触发请求
  2. 消费端请求编码
  3. 提供端请求解码
  4. 提供端处理请求
  5. 提供端返回结果编码
  6. 消费端返回结果解码
消费端触发请求
在消费者初始化的时候,会生成一个消费者代理注册到容器中,该代理回调中持有一个MockClusterInvoker 实例,消费调用服务接口时它的invoke 会被调用,此时会构建一个RpcInvocation 对象,把服务接口的method 对象和参数放到RpcInvocation 对象中,作为MockClusterInvoker.invoke 方法的参数,在这个invoke 方法中,判断请求是否需要mock ,是否配置了mock 属性,是强制mock 还是失败后mock ,关于mock 这里先不详细展开,这里只看下核心流程。
MockClusterInvoker.invoke 会调用FailfastClusterInvoker.invoke ,大系统中为了服务高可用同一个服务一般会有多个应用服务器提供,要先挑选一个提供者提供服务。在服务接口消费者初始化时,接口方法和提供者Invoker 对应关系保存在RegistryDirectory methodInvokerMap 中,通过调用的方法名称(或方法名称+ 第一个参数)改方法对应的提供者invoker 列表,如注册中心设置了路由规则,对这些invoker 根据路由规则进行过滤。
com.alibaba.dubbo.registry.integration.RegistryDirectory.doList(Invocation)
com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory.list(Invocation)
读取到所有符合条件的服务提供者invoker 之后,由LoadBalance 组件执行负载均衡,从中挑选一个invoker 进行调用,框架内置支持的负载均衡算法包括random (随机)、roundrobin R-R 循环)、leastactive (最不活跃)、consistenthash (一致性hash ),应用可配置,默认random
methodInvokerMap 保存的是持有DubboInvoker dubbo 协议)实例的InvokerDelegete 对象,是Invoker-Filter 链的头部,先激活Filter 连然后最终调到DubboInvoker.invoke(RpcInvocation) ,此时远程调用分三种类型:
  1. 单向调用,无需获取关注调用结果的,无需等待接口返回结果,注意调用结果不要单纯跟返回值混淆了,异常也是调用结果。
  2. 异步调用,需要关注返回结果,但是不会同步等待接口调用结束,会异步的获取返回返回结果,这种情况给调用者返回一个Future,但是不同步等待Future.get返回调用结果
  3. 同步调用,需要同步等待服务调用结束获取调用结果,给调用者返回一个Future并且Future.get等待结果,此时接口调用线程会挂起等待响应。

我们大部分使用场景都是同步调用,所以主要看一下同步调用。如果使用者配置了多个connections 按顺序选择一个ExchangeClient 和服务器通信,同步调用时调用HeaderExchangeClient.request->HeaderExchangeChannel.request
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(Object, int)
这里的request 参数是RpcInvocation 对象,包含调用的方法、参数等信息,timeout 参数是接口超时时间,把这些信息封装在Request 对象中,调用channel.send ,这个channel 对象就是和服务端打交道的NettyClient 实例,NettyClient.send 调用NettyChannel.send
com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(Object, boolean)
这里的sent 参数决定是否等待请求消息发出,sent=true 等待消息发出,消息发送失败将抛出异常,sent=false 不等待消息发出,将消息放入IO 队列,即刻返回。默认情况下都是false NettyChannel 中有channel 属性,这个channel Netty 框架中的组件,负责客户端和服务端链路上的消息传递,channel.write 把请求消息写入,这里的message 是上面封装的Request 对象。这里的IO 模型是非阻塞的,线程不用同步等待所有消息写完,而是直接返回。调用Netty 框架的IO 事件之后会触发Netty 框架的IO 事件处理链。
消费端请求编码
在消费者初始化创建NettyClient 时了解到了,NettyClient 添加了三个事件处理器组成处理器链:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler ,其中NettyCodecAdapter.encoder 下行事件处理器(实现了ChannelDownstreamHandler 接口),NettyCodecAdapter. decoder 是上行事件处理器(实现了ChannelUpstreamHandler 接口),NettyHandler 是上行事件+ 下行时间处理器(同时实现了ChannelUpstreamHandler ChannelDownstreamHandler 接口)。channel.write Netty 框架中是一个下行事件,所以NettyCodecAdapter.encoder NettyHandler 处理器会被回调,下行事件的事件处理器调用顺序是从后到前,即后添加的处理器先执行。
NettyHandler 没有对请求消息做任何加工,只是触发dubbo 框架的一些回调,这些回调里面没有做任何核心的事情,
com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent)


encoder 顾名思义就是编码器,它的主要工作就是把数据按照客户端- 服务端的约定协议对请求信息和返回结果进行编码。看下它的encode 方法:
下行事件触发之后依次调用handleDownstream->doEncode->encode ,在encode 中对Request 对象进行编码。这个msg 参数就是上面被write Request 对象,这里的Codec2 组件是DubboCountCodec 实现,DubboCountCodec.encode 调用DubboCodec.Encode
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(Channel, ChannelBuffer, Object)

根据协议,消息中写入16个字节的消息头:

1 1-2 字节,固定的魔数
2 、第3 个字节,第8 位存储数据类型是请求数据还是响应数据,其它7 位存储序列化类型,约定和服务端的序列化- 反序列化协议
3 5-12 个字节,请求id
4 13-16 个字节,请求数据长度
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(Channel, ChannelBuffer, Request)


URL 中查找序列化扩展点名称,加载序列化组件把请求对象序列化成二进制。消费端和提供端的序列化反序列化协议要配套,所以这个序列化协议一般是在提供端指定的,指定的协议类型会在提供者和消费者初始化的时候写入到URL 对象中,框架中默认的序列化协议是hessian2 。消息体数据包含dubbo 版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息,把它们按顺序依次序列化,数据写入到类型为ChannelBuffer buffer 参数中,然后把ChannelBuffer 封装成Netty 框架的org.jboss.netty.buffer.ChannelBuffer 。如果参数中有回调接口,还需要在消费端启动端口监听提供端的回调,这里不展开。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(Channel, ObjectOutput, Object)


然后把封装好的ChannelBuffer写到链路 发送到服务端,这里消费端前半部分的工作就完成,接下来目光要转移到服务端。 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(ChannelHandlerContext, MessageEvent)



网易云新用户大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者陈云云授权发布。