【RPC 专栏】深入理解 RPC 之传输篇

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 【RPC 专栏】深入理解 RPC 之传输篇

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

  • [**中文详细注释的开源项目**](http://mp.weixin.qq.com/s?__biz=MzUzMTA2NTU2Ng==&mid=2247484404&idx=1&sn=109f263e51b81ca9f270846dd16f6b3a&chksm=fa497c45cd3ef55358b09beb6e18ba04737799d3c0bc32baaa0796dc707b1275c0c555a249ba&scene=21#wechat_redirect)

  • **[Java 并发源码合集](http://mp.weixin.qq.com/s?__biz=MzUzMTA2NTU2Ng==&mid=2247484341&idx=1&sn=91d6fc7e8841a0f6046e1c2f4693a537&chksm=fa497c04cd3ef512f9249a5deb305a28b68d3ba44467f13fa8c6068711540b2f3e0a6f622ae3&scene=21#wechat_redirect)**
  • [**RocketMQ 源码合集**](http://mp.weixin.qq.com/s?__biz=MzUzMTA2NTU2Ng==&mid=2247484334&idx=1&sn=761e2659f474f06e7db935eae26e2b03&chksm=fa497c1fcd3ef509a02890b8e9f6bddb02e714f9c7e70cfbc37cd5bd75be64855225497fd3de&scene=21#wechat_redirect)
  • [**Sharding-JDBC 源码解析合集**](http://mp.weixin.qq.com/s?__biz=MzUzMTA2NTU2Ng==&mid=2247484360&idx=1&sn=0dae84944d2c388fdc1bbed868ac5b99&chksm=fa497c79cd3ef56f8487dda6d53e3772e0aa9812ee66376993c3445bc94920c01a03dd4a4b8f&scene=21#wechat_redirect)
  • [**Spring MVC 和 Security 源码合集**](http://mp.weixin.qq.com/s?__biz=MzUzMTA2NTU2Ng==&mid=2247484380&idx=1&sn=b4e0da1a314d77dcd170a25ed1ebb4c5&chksm=fa497c6dcd3ef57bcfb69a52c594bcb72e35d9bbe89fa87601b2a6c9f266d656b1ad2a5d4da4&scene=21#wechat_redirect)

  • [**MyCAT 源码解析合集**](http://mp.weixin.qq.com/s?__biz=MzUzMTA2NTU2Ng==&mid=2247484377&idx=3&sn=1323ac1a4099fac49c96686e58d1960d&chksm=fa497c68cd3ef57e5c3b683f9ead89f06ea5d01947672bfff8341cff2ab0c39c03274723c49a&scene=21#wechat_redirect)
  • RpcRequest 和 RpcResponse
  • Socket传输
  • Netty 传输
  • 同步与异步 阻塞与非阻塞
  • 总结
  • RPC 被称为“远程过程调用”,表明了一个方法调用会跨越网络,跨越进程,所以传输层是不可或缺的。一说到网络传输,一堆名词就蹦了出来:TCP、UDP、HTTP,同步 or 异步,阻塞 or 非阻塞,长连接 or 短连接…

    本文介绍两种传输层的实现:使用 Socket 和使用 Netty。前者实现的是阻塞式的通信,是一个较为简单的传输层实现方式,借此可以了解传输层的工作原理及工作内容;后者是非阻塞式的,在一般的 RPC 场景下,性能会表现的很好,所以被很多开源 RPC 框架作为传输层的实现方式。

    RpcRequest 和 RpcResponse

    传输层传输的主要对象其实就是这两个类,它们封装了请求 id,方法名,方法参数,返回值,异常等 RPC 调用中需要的一系列信息。

    
    public class RpcRequest implements Serializable {
        private String interfaceName;
        private String methodName;
        private String parametersDesc;
        private Object[] arguments;
        private MapString, String attachments;
        private int retries = 0;
        private long requestId;
        private byte rpcProtocolVersion;
    }
    
    
    public class RpcResponse implements Serializable {
        private Object value;
        private Exception exception;
        private long requestId;
        private long processTime;
        private int timeout;
        private MapString, String attachments;// rpc协议版本兼容时可以回传一些额外的信息
        private byte rpcProtocolVersion;
    }
    

    Socket传输

    Server

    
    public class RpcServerSocketProvider {
        public static void main(String[] args) throws Exception {
            //序列化层实现参考之前的章节
            Serialization serialization = new Hessian2Serialization();
            ServerSocket serverSocket = new ServerSocket(8088);
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            while (true) {
                final Socket socket = serverSocket.accept();
                executorService.execute(() - {
                    try {
                        InputStream is = socket.getInputStream();
                        OutputStream os = socket.getOutputStream();
                        try {
                            DataInputStream dis = new DataInputStream(is);
                            int length = dis.readInt();
                            byte[] requestBody = new byte[length];
                            dis.read(requestBody);
                            //反序列化requestBody = RpcRequest
                            RpcRequest rpcRequest = serialization.deserialize(requestBody, RpcRequest.class);
                            //反射调用生成响应 并组装成 rpcResponse
                            RpcResponse rpcResponse = invoke(rpcRequest);
                            //序列化rpcResponse = responseBody
                            byte[] responseBody = serialization.serialize(rpcResponse);
                            DataOutputStream dos = new DataOutputStream(os);
                            dos.writeInt(responseBody.length);
                            dos.write(responseBody);
                            dos.flush();
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            is.close();
                            os.close();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            socket.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
        public static RpcResponse invoke(RpcRequest rpcRequest) {
            //模拟反射调用
            RpcResponse rpcResponse = new RpcResponse();
            rpcResponse.setRequestId(rpcRequest.getRequestId());
            //... some operation
            return rpcResponse;
        }
    }
    

    Client

    
    public class RpcSocketConsumer {
        public static void main(String[] args) throws Exception {
            //序列化层实现参考之前的章节
            Serialization serialization = new Hessian2Serialization();
            Socket socket = new Socket("localhost", 8088);
            InputStream is = socket.getInputStream();
            OutputStream os = socket.getOutputStream();
            //封装rpc请求
            RpcRequest rpcRequest = new RpcRequest();
            rpcRequest.setRequestId(12345L);
            //序列化 rpcRequest = requestBody
            byte[] requestBody = serialization.serialize(rpcRequest);
            DataOutputStream dos = new DataOutputStream(os);
            dos.writeInt(requestBody.length);
            dos.write(requestBody);
            dos.flush();
            DataInputStream dis = new DataInputStream(is);
            int length = dis.readInt();
            byte[] responseBody = new byte[length];
            dis.read(responseBody);
            //反序列化 responseBody = rpcResponse
            RpcResponse rpcResponse = serialization.deserialize(responseBody, RpcResponse.class);
            is.close();
            os.close();
            socket.close();
            System.out.println(rpcResponse.getRequestId());
        }
    }
    

    dis.readInt() 和 dis.read(byte[] bytes) 决定了使用 Socket 通信是一种阻塞式的操作,报文头+报文体的传输格式是一种常见的格式,除此之外,使用特殊的字符如空行也可以划分出报文结构。在示例中,我们使用一个 int(4字节)来传递报问题的长度,之后传递报文体,在复杂的通信协议中,报文头除了存储报文体还会额外存储一些信息,包括协议名称,版本,心跳标识等。

    在网络传输中,只有字节能够被识别,所以我们在开头引入了 Serialization 接口,负责完成 RpcRequest 和 RpcResponse 与字节的相互转换。(Serialization 的工作机制可以参考之前的文章)

    使用 Socket 通信可以发现:每次 Server 处理 Client 请求都会从线程池中取出一个线程来处理请求,这样的开销对于一般的 Rpc 调用是不能够接受的,而 Netty 一类的网络框架便派上了用场。

    Netty 传输

    Server 和 ServerHandler

    
    public class RpcNettyProvider {
        public static void main(String[] args) throws Exception{
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                // 创建并初始化 Netty 服务端 Bootstrap 对象
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup);
                bootstrap.channel(NioServerSocketChannel.class);
                bootstrap.childHandler(new ChannelInitializerSocketChannel() {
                    @Override
                    public void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解码 RPC 请求
                        pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 编码 RPC 响应
                        pipeline.addLast(new RpcServerHandler()); // 处理 RPC 请求
                    }
                });
                bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
                bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
                ChannelFuture future = bootstrap.bind("127.0.0.1", 8087).sync();
                // 关闭 RPC 服务器
                future.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }
    
    
    public class RpcServerHandler extends SimpleChannelInboundHandlerRpcRequest {
        @Override
        public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
            RpcResponse rpcResponse = invoke(request);
            // 写入 RPC 响应对象并自动关闭连接
            ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
        }
        private RpcResponse invoke(RpcRequest rpcRequest) {
            //模拟反射调用
            RpcResponse rpcResponse = new RpcResponse();
            rpcResponse.setRequestId(rpcRequest.getRequestId());
            //... some operation
            return rpcResponse;
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    Client 和 ClientHandler

    
    public class RpcNettyConsumer {
        public static void main(String[] args) throws Exception{
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                // 创建并初始化 Netty 客户端 Bootstrap 对象
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.handler(new ChannelInitializerSocketChannel() {
                    @Override
                    public void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new RpcEncoder(RpcRequest.class)); // 编码 RPC 请求
                        pipeline.addLast(new RpcDecoder(RpcResponse.class)); // 解码 RPC 响应
                        pipeline.addLast(new RpcClientHandler()); // 处理 RPC 响应
                    }
                });
                bootstrap.option(ChannelOption.TCP_NODELAY, true);
                // 连接 RPC 服务器
                ChannelFuture future = bootstrap.connect("127.0.0.1", 8087).sync();
                // 写入 RPC 请求数据并关闭连接
                Channel channel = future.channel();
                RpcRequest rpcRequest = new RpcRequest();
                rpcRequest.setRequestId(123456L);
                channel.writeAndFlush(rpcRequest).sync();
                channel.closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    
    public class RpcClientHandler extends SimpleChannelInboundHandlerRpcResponse {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
            System.out.println(response.getRequestId());//处理响应
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    使用 Netty 的好处是很方便地实现了非阻塞式的调用,关键部分都给出了注释。上述的代码虽然很多,并且和我们熟悉的 Socket 通信代码大相径庭,但大多数都是 Netty 的模板代码,启动服务器,配置编解码器等。真正的 RPC 封装操作大多集中在 Handler 的 channelRead 方法(负责读取)以及 channel.writeAndFlush 方法(负责写入)中。

    
    public class RpcEncoder extends MessageToByteEncoder {
        private Class? genericClass;
        Serialization serialization = new Hessian2Serialization();
        public RpcEncoder(Class? genericClass) {
            this.genericClass = genericClass;
        }
        @Override
        public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
            if (genericClass.isInstance(in)) {
                byte[] data = serialization.serialize(in);
                out.writeInt(data.length);
                out.writeBytes(data);
            }
        }
    }
    
    
    public class RpcDecoder extends ByteToMessageDecoder {
        private Class? genericClass;
        public RpcDecoder(Class? genericClass) {
            this.genericClass = genericClass;
        }
        Serialization serialization = new Hessian2Serialization();
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {
            if (in.readableBytes()  4) {
                return;
            }
            in.markReaderIndex();
            int dataLength = in.readInt();
            if (in.readableBytes()  dataLength) {
                in.resetReaderIndex();
                return;
            }
            byte[] data = new byte[dataLength];
            in.readBytes(data);
            out.add(serialization.deserialize(data, genericClass));
        }
    }
    

    使用 Netty 不能保证返回的字节大小,所以需要加上 in.readableBytes() 4 这样的判断,以及 in.markReaderIndex() 这样的标记,用来区分报文头和报文体。

    同步与异步 阻塞与非阻塞

    这两组传输特性经常被拿来做对比,很多文章声称 Socket 是同步阻塞的,Netty 是异步非阻塞,其实有点问题。

    其实这两组并没有必然的联系,同步阻塞,同步非阻塞,异步非阻塞都有可能(同步非阻塞倒是没见过),而大多数使用 Netty 实现的 RPC 调用其实应当是同步非阻塞的(当然一般 RPC 也支持异步非阻塞)。

    同步和异步关注的是**消息通信机制** 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。 换句话说,就是由调用者主动等待这个调用的结果。

    而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

    而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

    如果需要 RPC 调用返回一个结果,该结果立刻被使用,那意味着着大概率需要是一个同步调用。如果不关心其返回值,则可以将其做成异步接口,以提升效率。

    阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

    阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

    阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
    非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

    在上述的例子中可以看出 Socket 通信我们显示声明了一个包含10个线程的线程池,每次请求到来,分配一个线程,等待客户端传递报文头和报文体的行为都会阻塞该线程,可以见得其整体是阻塞的。而在 Netty 通信的例子中,每次请求并没有分配一个线程,而是通过 Handler 的方式处理请求(联想 NIO 中 Selector),是非阻塞的。

    使用同步非阻塞方式的通信机制并不一定同步阻塞式的通信强,所谓没有最好,只有更合适,而一般的同步非阻塞 通信适用于 1.网络连接数量多 2.每个连接的io不频繁 的场景,与 RPC 调用较为契合。而成熟的 RPC 框架的传输层和协议层通常也会提供多种选择,以应对不同的场景。

    总结

    本文堆砌了一些代码,而难点主要是对 Socket 的理解,和 Netty 框架的掌握。Netty 的学习有一定的门槛,但实际需要掌握的知识点其实并不多(仅仅针对 RPC 框架所涉及的知识点而言),学习 Netty ,个人推荐《Netty IN ACTION》以及 https://waylau.gitbooks.io/netty-4-user-guide/Getting%20Started/Before%20Getting%20Started.html 该网站的例子。

    参考资料:

    http://javatar.iteye.com/blog/1123915 – 梁飞

    https://gitee.com/huangyong/rpc – 黄勇

    666. 彩蛋

    如果你对 RPC 并发感兴趣,欢迎加入我的知识一起交流。

    目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:

    01. 调试环境搭建
    02. 项目结构一览
    03. API 配置(一)之应用
    04. API 配置(二)之服务提供者
    05. API 配置(三)之服务消费者
    06. 属性配置
    07. XML 配置
    08. 核心流程一览

    一共 60 篇++

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 【RPC 专栏】深入理解 RPC 之传输篇