dubbo源码浅读(下篇)

提供端请求解码
在看提供端初始化代码的时候看到,框架在创建NettyServer 时,也会创建netty 框架的IO 事件处理器链:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler
com.alibaba.dubbo.remoting.transport.netty.NettyServer.doOpen()
客户端发送数据到服务端时会触发服务端的上行IO事件并且启动处理器回调,NettyCodecAdapter.decoder NettyHandler 是上行事件处理器,上行事件处理器调用顺序是从前到后执行,即先添加的处理器先执行,所以先触发NettyCodecAdapter.decoder 再触发NettyHandler
NettyCodecAdapter.decoder 对请求进行解码,把消息翻译成提供端可理解的,上行事件调用decoder handleUpstream->messageReceived
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent)


com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.decode(Channel, ChannelBuffer, int, byte[])

把数据读取到ChannelBuffer 之后扔给Codec2 组件进行解码处理,这里有个半包传输处理,因为这里使用的是非阻塞式的IO 模型,非阻塞IO 的特点是线程的读取数据是事件触发式,是由一个Selector 组件轮询准备就绪的IO 事件,发现准备就绪的事件之后通知线程读取,这种模式的好处是可以极大的优化线程模型,只需少数几个线程处理所有客户端和服务端连接,而阻塞IO 需要线程和连接要一对一,但是非阻塞IO 远高于阻塞式IO ,不像阻塞式IO 读写数据时只有数据读完或者超时才会返回,这样能保证读到的数据肯定是完整,而非阻塞模式方法返回之后可能只读到一部分数据,框架的处理是在解析消息时检查消息的长度确定是否有完整的数据,如果数据不完整返回NEED_MORE_INPUT ,保存当前解析的位置等待链路的下次IO 事件,在下次IO 事件到达时从上次保存的位置开始解析。
读取到完整的数据之后解析数据头,读取魔数、序列化类型、以及请求id ,读取第3 个字节判断改数据是消费端请求数据还是提供端响应数据(因为消费端和提供端解码器代码是共用的),并且从1-7 位从读出序列化类型,并且根据此序列化类型加载序列化组件对消息进行反序列化按顺序读取消费端写入的dubbo 版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息,写入DecodeableRpcInvocation 对象对应的属性中。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[])


com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation.decode(Channel, InputStream)


创建一个Request 对象,把DecodeableRpcInvocation 对象对象设置到Request 对象的data 属性中。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[])
解码完成之后,激活下一个处理器的messageReceived 事件,并且把解码后的对象封装在MessageEvent 中。
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent)

org.jboss.netty.channel.Channels.fireMessageReceived(ChannelHandlerContext, Object, SocketAddress)


Decoder 执行完之后,事件进入到下一个处理器NettyHandler ,看下NettyHandler 中的代码:
这里直接交给handler 处理了,这个handler 封装了很多层:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler ,中间封装了好几万层这里只把重要的列出来,源头是从创建NettyServer 的时候传过来的。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL)


先会走到DecodeHandler.received
com.alibaba.dubbo.remoting.transport.DecodeHandler.received(Channel, Object)

这个message Request 类型的,要先decode 一下,因为在之前已经解码过了,所以这里不会做任何事情,直接走下一个handler.received ,这个handler 就是HeaderExchangeHandler
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(Channel, Object)

普通的同步接口twoWay 属性是true handleRequest 方法处理请求,处理结束之后调用channel.send 把结果返回到客户端。
提供端处理请求
请求处理再走下一个handler reply ,这个handler 就是DubboProtocol.requestHandler ,把request 对象中的data 取出来传到requestHandler 中,这个data 就是前面的解码后的DecodeableRpcInvocation 对象它是Invocation接口 的一个实现:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(ExchangeChannel, Request)


com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol

查找提供端请求对应的 Invoker ,在接口提供者初始化时,每个接口都会创建一个 Invoker Exporter Exporter 持有 invoker 实例, Exporter 对象保存在 DubboProtocol exporterMap 中, key 是由 URL 生成的 serviceKey ,此时通过 Invocation 中的信息就可还原该 serviceKey 并且找到对应的 Exporter Invoker ,在分析提供者初始化代码时知道它是 Invoker-Filter 的头节点,激活 Filter 后调用由 ProxyFactory 生成的 Invoker
调用invoker.invoke 时,通过反射调用最终的服务实现执行相关逻辑。
服务执行结束之后,创建一个Response 对象返回给客户端。在执行服务实现时会出现两种结果:成功和失败,如果成功,把返回值设置到Response result 中,Response status 设置成OK ,如果失败,把失败异常设置到Response errorMessage 中,status 设置成SERVICE_ERROR
回到HeaderExchangeHandler.received 中的代码,在handleRequest 之后,调用channel.send Response 发送到客户端,这个channel 封装客户端- 服务端通信链路,最终会调用Netty 框架,把响应写回到客户端。
提供端返回结果编码
提供端要按照和消费端的协议把Response 按照特定的协议进行编码,把编码后的数据写回到消费端,从上面的代码可以看到,在NettyServer 初始化的时候,定义了三个IO 事件处理器,服务端往客户端回写响应时产生下行事件,处理下行事件处理器,NettyCodecAdapter.encoder NettyHandler 是下行事件处理器,先激活NettyHandler ,再激活NettyCodecAdapter. encoder ,在NettyCodecAdapter. encoder 对响应结果进行编码,还是通过Code2 组件和请求编码时使用的组件一样,把响应类型和响应结果依次写回到客户端,根据协议会写入16 个字节的数据头,包括:
1 1-2 字节魔数
2 、第3 个字节,序列化组件类型,约定和客户端的序列化- 反序列化协议
3 、第4 个字节,响应状态,是OK 还是error
4 5-13 个字节,响应id ,这里的id request 中的id 一样
5 13-16 个字节,响应数据长度
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeResponse(Channel, ChannelBuffer, Response)


返回结果有三种结果:
1 、没有返回值即返回类型是void
2 、有返回值并且执行成功;
3 、服务调用异常。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeResponseData(Channel, ObjectOutput, Object)


解码后的数据会写入到通信链路中。
消费端返回结果解码
服务端给客户端回写数据之后,客户端会收到IO 事件,一个上行事件。NettyClient 中有两个上行事件处理器NettyCodecAdapter.decoder NettyHandler ,按照顺序decoder 先执行对服务端传过来的数据进行解码,解析出序列化协议、响应状态、响应id (即请求id )。把响应body 数据读到DecodeableRpcResult 对象中,进行解析同时加载处理原始Request 数据,这个Request 对象在请求时会被缓存到DefaultFuture 中,加载Request 的目的是因为Request Invocation 中携带了服务接口的返回值类型信息,需要根据这个类型把响应解析创建对应类型的对象。
com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult.decode(Channel, InputStream)


创建Response 对象并且把解析出结果或异常设置到Response 中。
decoder 把响应解析成Response 对象中,NettyHandler 接着往下处理,同样触发它的messageReceive 事件,在提供端解码的时候看到了,它的handler 封装关系是:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler ,主要处理在HeaderExchangeHandler 中:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleResponse(Channel, Response)


com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.doReceived(Response)


这里主要做的事情是唤醒调用者线程,并且把Response 设置到DefaultFuture 中,在消费者触发请求的代码中可以看到,消费端同步调用接口时请求写到提供端之后,会调用DefaultFuture.get 阻塞等待响应结果: 
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.get(int)
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.isDone()


done 这个Condition 上进行条件等待,DefaultFuture.doReceive 时,设置response 唤醒done ,此时调用线程被唤醒并且检查是否已经有了response (避免假唤醒),唤醒之后返回response 中的result ,调用端即拿到了接口的调用结果(返回值或异常),整个远程服务接口的调用流程就完成了。
超时处理
前面说了在进行接口调用时会出现两种情况:接口调用成功、接口调用异常,其实还有一种情况就是接口调用超时。在消费端等待接口返回时,有个 timeout 参数,这个时间是使用者设置的,可在消费端设置也可以在提供端设置, done.await 等待时,会出现两种情况跳出 while 循环,一是线程被唤醒并且已经有了 response ,二是等待时间已经超过 timeout ,此时也会跳出 while ,当跳出 while 循环并且 Future 中没有 response 时,就说明接口已超时抛出 TimeoutException ,框架把 TimeoutException 封装成 RpcException 抛给应用层。


网易云新用户大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者陈云云授权发布。