提供端请求解码
在看提供端初始化代码的时候看到,框架在创建NettyServer
时,也会创建netty
框架的IO
事件处理器链:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler
com.alibaba.dubbo.remoting.transport.netty.NettyServer.doOpen()
客户端发送数据到服务端时会触发服务端的上行IO事件并且启动处理器回调,NettyCodecAdapter.decoder
和NettyHandler
是上行事件处理器,上行事件处理器调用顺序是从前到后执行,即先添加的处理器先执行,所以先触发NettyCodecAdapter.decoder
再触发NettyHandler
。
由NettyCodecAdapter.decoder
对请求进行解码,把消息翻译成提供端可理解的,上行事件调用decoder
的handleUpstream->messageReceived
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent)
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.decode(Channel, ChannelBuffer, int, byte[])
把数据读取到ChannelBuffer
之后扔给Codec2
组件进行解码处理,这里有个半包传输处理,因为这里使用的是非阻塞式的IO
模型,非阻塞IO
的特点是线程的读取数据是事件触发式,是由一个Selector
组件轮询准备就绪的IO
事件,发现准备就绪的事件之后通知线程读取,这种模式的好处是可以极大的优化线程模型,只需少数几个线程处理所有客户端和服务端连接,而阻塞IO
需要线程和连接要一对一,但是非阻塞IO
远高于阻塞式IO
,不像阻塞式IO
读写数据时只有数据读完或者超时才会返回,这样能保证读到的数据肯定是完整,而非阻塞模式方法返回之后可能只读到一部分数据,框架的处理是在解析消息时检查消息的长度确定是否有完整的数据,如果数据不完整返回NEED_MORE_INPUT
,保存当前解析的位置等待链路的下次IO
事件,在下次IO
事件到达时从上次保存的位置开始解析。
读取到完整的数据之后解析数据头,读取魔数、序列化类型、以及请求id
,读取第3
个字节判断改数据是消费端请求数据还是提供端响应数据(因为消费端和提供端解码器代码是共用的),并且从1-7
位从读出序列化类型,并且根据此序列化类型加载序列化组件对消息进行反序列化按顺序读取消费端写入的dubbo
版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息,写入DecodeableRpcInvocation
对象对应的属性中。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[])
com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation.decode(Channel, InputStream)
创建一个Request
对象,把DecodeableRpcInvocation
对象对象设置到Request
对象的data
属性中。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[])
解码完成之后,激活下一个处理器的messageReceived
事件,并且把解码后的对象封装在MessageEvent
中。
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent)
org.jboss.netty.channel.Channels.fireMessageReceived(ChannelHandlerContext, Object, SocketAddress)
Decoder
执行完之后,事件进入到下一个处理器NettyHandler
,看下NettyHandler
中的代码:
这里直接交给handler
处理了,这个handler
封装了很多层:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler
,中间封装了好几万层这里只把重要的列出来,源头是从创建NettyServer
的时候传过来的。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL)
先会走到DecodeHandler.received
:
com.alibaba.dubbo.remoting.transport.DecodeHandler.received(Channel, Object)
这个message
是Request
类型的,要先decode
一下,因为在之前已经解码过了,所以这里不会做任何事情,直接走下一个handler.received
,这个handler
就是HeaderExchangeHandler
:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(Channel, Object)
普通的同步接口twoWay
属性是true
走handleRequest
方法处理请求,处理结束之后调用channel.send
把结果返回到客户端。
提供端处理请求
请求处理再走下一个handler
的reply
,这个handler
就是DubboProtocol.requestHandler
,把request
对象中的data
取出来传到requestHandler
中,这个data
就是前面的解码后的DecodeableRpcInvocation
对象它是Invocation接口
的一个实现:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(ExchangeChannel, Request)
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
查找提供端请求对应的
Invoker
,在接口提供者初始化时,每个接口都会创建一个
Invoker
和
Exporter
,
Exporter
持有
invoker
实例,
Exporter
对象保存在
DubboProtocol
的
exporterMap
中,
key
是由
URL
生成的
serviceKey
,此时通过
Invocation
中的信息就可还原该
serviceKey
并且找到对应的
Exporter
和
Invoker
,在分析提供者初始化代码时知道它是
Invoker-Filter
的头节点,激活
Filter
后调用由
ProxyFactory
生成的
Invoker
:
调用invoker.invoke
时,通过反射调用最终的服务实现执行相关逻辑。
服务执行结束之后,创建一个Response
对象返回给客户端。在执行服务实现时会出现两种结果:成功和失败,如果成功,把返回值设置到Response
的result
中,Response
的status
设置成OK
,如果失败,把失败异常设置到Response
的errorMessage
中,status
设置成SERVICE_ERROR
。
回到HeaderExchangeHandler.received
中的代码,在handleRequest
之后,调用channel.send
把Response
发送到客户端,这个channel
封装客户端-
服务端通信链路,最终会调用Netty
框架,把响应写回到客户端。
提供端返回结果编码
提供端要按照和消费端的协议把Response
按照特定的协议进行编码,把编码后的数据写回到消费端,从上面的代码可以看到,在NettyServer
初始化的时候,定义了三个IO
事件处理器,服务端往客户端回写响应时产生下行事件,处理下行事件处理器,NettyCodecAdapter.encoder
和NettyHandler
是下行事件处理器,先激活NettyHandler
,再激活NettyCodecAdapter. encoder
,在NettyCodecAdapter. encoder
对响应结果进行编码,还是通过Code2
组件和请求编码时使用的组件一样,把响应类型和响应结果依次写回到客户端,根据协议会写入16
个字节的数据头,包括:
1
、1-2
字节魔数
2
、第3
个字节,序列化组件类型,约定和客户端的序列化-
反序列化协议
3
、第4
个字节,响应状态,是OK
还是error
4
、5-13
个字节,响应id
,这里的id
和request
中的id
一样
5
、13-16
个字节,响应数据长度
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeResponse(Channel, ChannelBuffer, Response)
返回结果有三种结果:
1
、没有返回值即返回类型是void
;
2
、有返回值并且执行成功;
3
、服务调用异常。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeResponseData(Channel, ObjectOutput, Object)
解码后的数据会写入到通信链路中。
消费端返回结果解码
服务端给客户端回写数据之后,客户端会收到IO
事件,一个上行事件。NettyClient
中有两个上行事件处理器NettyCodecAdapter.decoder
和NettyHandler
,按照顺序decoder
先执行对服务端传过来的数据进行解码,解析出序列化协议、响应状态、响应id
(即请求id
)。把响应body
数据读到DecodeableRpcResult
对象中,进行解析同时加载处理原始Request
数据,这个Request
对象在请求时会被缓存到DefaultFuture
中,加载Request
的目的是因为Request
中Invocation
中携带了服务接口的返回值类型信息,需要根据这个类型把响应解析创建对应类型的对象。
com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult.decode(Channel, InputStream)
创建Response
对象并且把解析出结果或异常设置到Response
中。
decoder
把响应解析成Response
对象中,NettyHandler
接着往下处理,同样触发它的messageReceive
事件,在提供端解码的时候看到了,它的handler
封装关系是:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler
,主要处理在HeaderExchangeHandler
中:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleResponse(Channel, Response)
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.doReceived(Response)
这里主要做的事情是唤醒调用者线程,并且把Response
设置到DefaultFuture
中,在消费者触发请求的代码中可以看到,消费端同步调用接口时请求写到提供端之后,会调用DefaultFuture.get
阻塞等待响应结果:
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.get(int)
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.isDone()
在done
这个Condition
上进行条件等待,DefaultFuture.doReceive
时,设置response
唤醒done
,此时调用线程被唤醒并且检查是否已经有了response
(避免假唤醒),唤醒之后返回response
中的result
,调用端即拿到了接口的调用结果(返回值或异常),整个远程服务接口的调用流程就完成了。
超时处理
前面说了在进行接口调用时会出现两种情况:接口调用成功、接口调用异常,其实还有一种情况就是接口调用超时。在消费端等待接口返回时,有个
timeout
参数,这个时间是使用者设置的,可在消费端设置也可以在提供端设置,
done.await
等待时,会出现两种情况跳出
while
循环,一是线程被唤醒并且已经有了
response
,二是等待时间已经超过
timeout
,此时也会跳出
while
,当跳出
while
循环并且
Future
中没有
response
时,就说明接口已超时抛出
TimeoutException
,框架把
TimeoutException
封装成
RpcException
抛给应用层。
网易云新用户大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者陈云云授权发布。