微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者
本节主要介绍了Dubbo协议的编码方式,涉及协议头、协议体具体的编码规则,默认使用Dubbo协议,其核心类图如下:
在Dubbo整个框架中,codec2的可选值为dubbo、thrift,本文将重点分析Dubbo协议的编码解码。
本文主要以Dubbo协议为例进行展开,其他通信方式,例如Thrift就不做过多分析,其实现思路基本是样的,Dubbo协议的编解码实现类为DubboCodec。
1@SPI
2public interface Codec2 {
3 @Adaptive({Constants.CODEC_KEY})
4 void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
5
6 @Adaptive({Constants.CODEC_KEY})
7 Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
8
9 enum DecodeResult {
10 NEED_MORE_INPUT, SKIP_SOME_INPUT
11 }
12}
Codec2主要定义两个接口,一个枚举类型:
DecodeResult.NEED_MORE_INPUT在解码过程中如果收到的字节流不是一个完整包时,结束此次读事件处理,等待更多数据到达, SKIP_SOME_INPUT:忽略掉一部分输入数据。
Object decode(Channel channel, ChannelBuffer buffer) :解码,在消息接受端,按照协议的规范,从二进制流中解码出一个一个的请求信息,以便处理。
编码解码实现类层次职责说明(从顶到下):
AbstractCodec:编码解码抽象实现类,主要定义与协议无关的帮助类。
AbstractCodec:编码解码抽象实现类,主要定义与协议无关的帮助类。
TelnetCodec Dubbo telnet协议实现类。
DubboCodec:dubbo协议。
既然ExchangeCodec是业务协议,包含Dubbo协议的模板实现类,我们就从ExchangeCodec开始,探究Dubbo编码解码实现原理。
ExchangeCodec概述
ExchangeCodec核心属性
1// header length.
2 protected static final int HEADER_LENGTH = 16;
3 // magic header.
4 protected static final short MAGIC = (short) 0xdabb;
5 protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
6 protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
7 // message flag.
8 protected static final byte FLAG_REQUEST = (byte) 0x80;
9 protected static final byte FLAG_TWOWAY = (byte) 0x40;
10 protected static final byte FLAG_EVENT = (byte) 0x20;
11 protected static final int SERIALIZATION_MASK = 0x1f;
</code>
</pre>
属性解读如下:
<ul style="list-style-type: disc;" class="list-paddingleft-2">
<li>
HEADER_LENGTH :协议头部长度,共16个字节。
</li>
<li>
MAGIC :魔数,固定为0xdabb,2个字节。
</li>
<li>
MAGIC_HIGH:魔数的高8位。
</li>
<li>
MAGIC_LOW:魔数的低8位。
</li>
<li>
FLAG_REQUEST:消息请求类型为消息请求。
</li>
<li>
FLAG_TWOWAY :消息请求类型为心跳。
</li>
<li>
FLAG_EVENT:消息请求类型为事件。
</li>
<li>
SERIALIZATION_MASK :serialization掩码。
</li>
</ul>
<h3 style="line-height: inherit;margin: 1.5em 0px;font-weight: bold;font-size: 1.3em;margin-bottom: 2em;margin-right: 5px;padding: 8px 15px;letter-spacing: 2px;background-image: linear-gradient(to right bottom, rgb(0, 188, 212), rgb(63, 81, 181));background-color: rgb(63, 81, 181);color: rgb(255, 255, 255);border-left: 10px solid rgb(51, 51, 51);border-radius: 5px;text-shadow: rgb(102, 102, 102) 1px 1px 1px;box-shadow: rgb(102, 102, 102) 1px 1px 2px;">ExchangeCodec实现原理</h3>
ExchangeCodec#encode
<pre style="font-size: inherit;color: inherit;line-height: inherit;margin: 0px;padding: 0px;"><code class="hljs java" style="overflow-wrap: break-word;margin: 0px 2px;line-height: 15px;font-size: 11px;font-weight: normal;word-spacing: -3px;letter-spacing: 0px;font-family: Consolas, Inconsolata, Courier, monospace;border-radius: 0px;overflow-x: auto;padding: 0.5em;color: rgb(51, 51, 51);background: rgb(248, 248, 248) none repeat scroll 0% 0%;display: block !important;white-space: pre !important;word-wrap: normal !important;word-break: normal !important;overflow: auto !important;">1public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { // @1
2 if (msg instanceof Request) { // @2
3 encodeRequest(channel, buffer, (Request) msg);
4 } else if (msg instanceof Response) { // @3
5 encodeResponse(channel, buffer, (Response) msg);
6 } else {
7 super.encode(channel, buffer, msg); // @4
8 }
9 }
MAGIC_HIGH:魔数的高8位。
FLAG_EVENT:消息请求类型为事件。
ExchangeCodec#encode
代码@1:参数说明:Channel channel:Dubbo网络通道的抽象,底层实现有NettyChannel、MinaChannel;ChannelBuffer buffer:buffer抽象类,屏蔽netty,mina等底层实现差别;Object msg:请求对象、响应对象或其他消息对象。
代码@2:如果msg是Request,则按照请求对象协议编码。
代码@3:如果是响应对象,则按照响应协议编码。
代码@4:如果是业务类对象(请求、响应),则使用父类默认的编码方式。
ExchangeCodec#encodeRequest
1Serialization serialization = getSerialization(channel); // @1
2// header.
3byte[] header = new byte[HEADER_LENGTH]; // @2
4// set magic number.
5Bytes.short2bytes(MAGIC, header); // @3
6// set request and serialization flag.
7header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); // @4
8
9if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
10if (req.isEvent()) header[2] |= FLAG_EVENT;
11 // set request id.
12 Bytes.long2bytes(req.getId(), header, 4); // @5
Step1:初始化协议头,同时填充部分字段。header[0]、header[1]、header[2]、header[4-11],注意,header[3]未填充。
代码@1:获取通道的序列化实现类。
代码@2:构建请求头部,header数组,长度为16个字节。
代码@3:首先填充头部的前两个字节,协议的魔数。header[0] = 魔数的高8个字节,header[1] = 魔数的低8个字节。
代码@4:头部的第3个字节存储的是消息请求标识与序列化器类别,那这8位是如何存储的呢?
首先看一下消息请求标志的定义:
1protected static final byte FLAG_REQUEST = (byte) 0x80; // 其二进制为 1000 0000
2protected static final byte FLAG_TWOWAY = (byte) 0x40; // 其二进制为 0100 0000
3protected static final byte FLAG_EVENT = (byte) 0x20; // 其二进制为 0010 0000
4protected static final int SERIALIZATION_MASK = 0x1f; // 其序列化的掩码,为什么是这样的呢?
serialization.getContentTypeId() 返回的类型如下:
CompactedJavaSerialization 4,二进制为0000 0010
FastJsonSerialization 6,二进制为0000 0110
FstSerialization 9,二进制为0000 1001
Hessian2Serialization 2,二进制为0000 0010
JavaSerialization 3,二进制为0000 0011
KryoSerialization 8,二进制为0000 1000
NativeJavaSerialization 7,二进制为0000 0111 结合代码:
FastJsonSerialization
6,二进制为0000 0110
Hessian2Serialization
2,二进制为0000 0010
KryoSerialization
8,二进制为0000 1000
1header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId())
可以得出一个结论:header[2]为8字节标志位,前4位,表示消息请求类型,依次为:请求、twoway、event,保留位。后4为:序列化的类型,也就是说dubbo协议只支持16种序列化协议。
代码@5:head[4]- head[11] 共8个字节为请求ID。Dubbo传输使用大端字节序列,也就说在接受端,首先读到的字节是高位字节。
1public static void long2bytes(long v, byte[] b, int off) {
2 b[off + 7] = (byte) v;
3 b[off + 6] = (byte) (v 8);
4 b[off + 5] = (byte) (v 16);
5 b[off + 4] = (byte) (v 24);
6 b[off + 3] = (byte) (v 32);
7 b[off + 2] = (byte) (v 40);
8 b[off + 1] = (byte) (v 48);
9 b[off + 0] = (byte) (v 56);
10 }
11
12ExchangeCodec#encodeRequest
13 //encode request data.
14 int savedWriteIndex = buffer.writerIndex();
15 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
16 ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // @1
17 ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // @2
18 if (req.isEvent()) { // @3
19 encodeEventData(channel, out, req.getData());
20 } else {
21 encodeRequestData(channel, out, req.getData());
22 }
23 out.flushBuffer();
24 if (out instanceof Cleanable) {
25 ((Cleanable) out).cleanup();
26 }
27 bos.flush();
28 bos.close();
29 int len = bos.writtenBytes(); //@4
30 checkPayload(channel, len);
31 Bytes.int2bytes(len, header, 12); //@5
Step2:编码请求体(body),协议的设计,一般是基于 请求头部+请求体构成。
代码@1:对buffer做一个简单封装,返回ChannelBufferOutputStream实例。
代码@2:根据序列化器,将通道的URL进行序列化,变存入buffer中。
代码@3:根据请求类型,事件或请求对Request.getData()请求体进行编码,encodeEventData、encodeRequestData不同的编码器会重写该方法,下文详细看一下DubboCode的实现。
代码@4:最后得到bos的总长度,该长度等于 (header+body)的总长度,也就是一个完整请求包的长度。
代码@5:将包总长度写入到header的header[12-15]中。
从ExchangeCodec#encodeRequest这个方法可以得知,Dubbo的整体传输协议由下图所示:
Dubbo协议体编码
Dubbo协议(body)编码规则
在ExchangeCodec#encodeRequest中,将会调用encodeRequestData对body进行编码
1protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
2 RpcInvocation inv = (RpcInvocation) data;
3
4 out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
5 out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
6 out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
7
8 out.writeUTF(inv.getMethodName());
9 out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
10 Object[] args = inv.getArguments();
11 if (args != null)
12 for (int i = 0; i args.length; i++) {
13 out.writeObject(encodeInvocationArgument(channel, inv, i));
14 }
15 out.writeObject(inv.getAttachments());
16 }
该方法,依次将 dubbo、服务path(interface name)、版本号、方法名、方法参数类型描述,参数值、附加属性(例如参数回调等,该部分会在服务调用相关章节重点分析)。上述内容,根据不同的序列化实现,其组织方式不同,当然,其基本组织方式(标记位、长度 、 具体内容),将在下节中重点分析序列化的实现。
Dubbo响应数据包编码规则
1protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
2 Result result = (Result) data;
3
4 Throwable th = result.getException();
5 if (th == null) {
6 Object ret = result.getValue();
7 if (ret == null) {
8 out.writeByte(RESPONSE_NULL_VALUE);
9 } else {
10 out.writeByte(RESPONSE_VALUE);
11 out.writeObject(ret);
12 }
13 } else {
14 out.writeByte(RESPONSE_WITH_EXCEPTION);
15 out.writeObject(th);
16 }
17 }
1字节(请求结果),取值:RESPONSE_NULL_VALUE:表示空结果;RESPONSE_WITH_EXCEPTION:表示异常,RESPONSE_VALUE:正常响应。N字节的请求响应,使用readObject读取即可。
ExchangeCodec解码实现原理
ExchangeCodec#decode
1public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
2 int readable = buffer.readableBytes();
3 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; // @1
4 buffer.readBytes(header); // @2
5 return decode(channel, buffer, readable, header); // @3
6 }
代码@1:创建一个byte数组,其长度为 头部长度和可读字节数取最小值。
代码@2:读取指定字节到header中。
代码@3:调用decode方法尝试解码。
ExchangeCodec#decode
1protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
2
Step1:解释一下方法的参数:
ChannelBuffer buffer : 通道读缓存区
byte[] header :已读字节数,(尝试读取一个完整头部)
ExchangeCodec#decode
1 // check magic number.
2 if (readable 0 && header[0] != MAGIC_HIGH
3 || readable 1 && header[1] != MAGIC_LOW) {
4 int length = header.length;
5 if (header.length readable) {
6 header = Bytes.copyOf(header, readable);
7 buffer.readBytes(header, length, readable - length);
8 }
9 for (int i = 1; i header.length - 1; i++) {
10 if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
11 buffer.readerIndex(buffer.readerIndex() - header.length + i);
12 header = Bytes.copyOf(header, i);
13 break;
14 }
15 }
16 return super.decode(channel, buffer, readable, header);
17 }
Step2:检查魔数,判断是否是dubbo协议,如果不是dubbo协议,则调用父类的解码方法,例如telnet协议。
如果至少读取到一个字节,如果第一个字节与魔数的高位字节不相等或至少读取了两个字节,并且第二个字节与魔数的地位字节不相等,则认为不是dubbo协议,则调用父类的解码方法,如果是其他协议的化,将剩余的可读字节从通道中读出,提交其父类解码。
ExchangeCodec#decode
1// check length.
2if (readable HEADER_LENGTH) {
3 return DecodeResult.NEED_MORE_INPUT;
4}
Step3:如果是dubbo协议,判断可读字节的长度是否大于协议头部的长度,如果可读字节小于头部字节,则跳过本次读事件处理,待读缓存区中更多的数据到达。
ExchangeCodec#decode
1// get data length.
2 int len = Bytes.bytes2int(header, 12);
3 checkPayload(channel, len);
4
5 int tt = len + HEADER_LENGTH;
6 if (readable tt) {
7 return DecodeResult.NEED_MORE_INPUT;
8 }
Step4:如果读取到一个完整的协议头,然后读取消息体长度,如果当前可读自己小于消息体+header的长度,返回NEED_MORE_INPUT,表示放弃本次解码,待更多数据到达缓冲区时再解码。
ExchangeCodec#decode
1// limit input stream.
2 ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); // @1
3
4 try {
5 return decodeBody(channel, is, header); // @2
6 } finally {
7 if (is.available() 0) {
8 try {
9 if (logger.isWarnEnabled()) {
10 logger.warn("Skip input stream " + is.available());
11 }
12 StreamUtils.skipUnusedStream(is); // @3
13 } catch (IOException e) {
14 logger.warn(e.getMessage(), e);
15 }
16 }
17 }
代码@1:创建一个ChannelBufferInputStream,并限制最多只读取len长度的字节。
代码@2:调用decodeBody方法解码协议体。
代码@3:如果本次并未读取len个字节,则跳过这些字节,保证下一个包从正确的位置开始处理。
这个其实就是典型的网络编程(自定义协议)的解码实现。
由于本文只关注Dubbo协议的解码,故decodeBody方法的实现,请看DubboCodec#decodeBody。
DubboCodec#decodeBody 详解
1byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
2Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
3// get request id.
4long id = Bytes.bytes2long(header, 4);
Step1:根据协议头获取标记为(header[2])(根据协议可知,包含请求类型、序列化器)。
DubboCodec#decodeBody
1if ((flag & FLAG_REQUEST) == 0) { // @1
2 // decode response.
3 Response res = new Response(id); // @2
4 if ((flag & FLAG_EVENT) != 0) {
5 res.setEvent(Response.HEARTBEAT_EVENT); // @3
6 }
7 // get status.
8 byte status = header[3]; // @4
9 res.setStatus(status);
10 if (status == Response.OK) {
11 try {
12 Object data;
13 if (res.isHeartbeat()) {
14 data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
15 } else if (res.isEvent()) { // @5
16 data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
17 } else {
18 DecodeableRpcResult result;
19 if (channel.getUrl().getParameter(
20 Constants.DECODE_IN_IO_THREAD_KEY,
21 Constants.DEFAULT_DECODE_IN_IO_THREAD)) { // @6
22 result = new DecodeableRpcResult(channel, res, is,
23 (Invocation) getRequestData(id), proto);
24 result.decode();
25 } else {
26 result = new DecodeableRpcResult(channel, res, // @7
27 new UnsafeByteArrayInputStream(readMessageData(is)),
28 (Invocation) getRequestData(id), proto);
29 }
30 data = result;
31 }
32 res.setResult(data);
33 } catch (Throwable t) {
34 if (log.isWarnEnabled()) {
35 log.warn("Decode response failed: " + t.getMessage(), t);
36 }
37 res.setStatus(Response.CLIENT_ERROR);
38 res.setErrorMessage(StringUtils.toString(t));
39 }
40 } else {
41 res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
42 }
43 return res;
44}
Step2:解码响应消息请求体。
代码@1:根据flag标记相应标记为,如果与FLAG_REQUEST进行逻辑与操作,为0说明不是请求类型,那对应的就是响应数据包。
代码@2:根据请求ID,构建响应结果。
代码@3:如果是事件类型。
代码@4:获取响应状态码。
代码@5:如果是心跳事件,则直接调用readObject完成解码即可。
代码@6:获取decode.in.io的配置值,默认为true,表示在IO线程中解码消息体,如果decode.in.io设置为false,则会在DecodeHanler中执行(受Dispatch事件派发模型影响)。
代码@7:不在IO线程池中完成解码操作,实现方式也就是不在io线程中调用DecodeableRpcInvocation#decode方法。
上述介绍了协议解码的经典实现流程,下文就不详细去探究具体针对dubbo协议进行解码,因为只要从一个完整的二进制流(ByteBuffer)按格式进行字节的读取,主要就是针对ByteBuffer API的应用。
广告:作者新书《RocketMQ技术内幕》已上市
《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
新书7折优惠!7折优惠!7折优惠!
更多文章请关注微信公众号:
推荐关注微信公众号:RocketMQ官方微信公众号:
原文始发于微信公众号(中间件兴趣圈):