源码分析Dubbo事件派发机制

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo事件派发机制

微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者

本节将主要学习Dubbo是如何使用Netty来实现网络通讯的。

从官网我们得知,Dubbo协议是使用单一长连接来进行网络传输,也就是说服务调用方持久与服务提供者建立一条连接,所有的服务调用调用信息通过。
一条TCP连接进行传输,在网络层至少要考虑如下问题:

  • 服务端,客户端网络通讯模型(线程模型)
  • 传输(编码解码、序列化)
  • 服务端转发策略等
  • 传输(编码解码、序列化)

    Dubbo服务端的网络启动流程,在上篇中已给出序列图,本节还是以该图为切入点,引入本文的两个主人公:NettyServer、NettyClient。

    dubbo使用SPI机制,根据配置,可以支持如下框架实现网络通讯模型,例如netty3,netty4、mina、grizzly,本文重点分析基于Netty4的实现,包路径:dubbo-remoting-netty4。
    从上面的流程图,NettyTransport的职责就是调用new NettyServer的构造方法,从而构建NettyServer对象,在深入NettyServer对象构造过程之前,先来看一下NettyServer的类继承层次:

    NettyServer构造函数:

    
    1public NettyServer(URL url, ChannelHandler handler) throws RemotingException {  // @1
    2        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));    // @2
    3}
    

    代码@1:URL url:服务提供者URL;ChannelHandler网络事件处理器,

    也就是当相应网络事件触发时,执行的事件处理器。

  • void connected(Channel channel) throws RemotingException 连接事件,当收到客户端的连接事件时,执行该方法处理相关业务操作。

  • void disconnected(Channel channel) throws RemotingException:连接断开事件
  • void sent(Channel channel, Object message) throws RemotingException 当可写事件触发时,服务端向客户端返回响应数据,就是通过该方法发送的。

  • void received(Channel channel, Object message) throws RemotingException 当读事件触发时执行该方法,服务端在收到客户端的请求数据是,调用该方法执行解包等操作。

  • void caught(Channel channel, Throwable exception) throws RemotingException 发生异常时,调用该方法。

  • void disconnected(Channel channel) throws RemotingException:连接断开事件

    void received(Channel channel, Object message) throws RemotingException
    当读事件触发时执行该方法,服务端在收到客户端的请求数据是,调用该方法执行解包等操作。

    代码@2:调用ChannelHandlers.wrap对原生Handler进行包装,然后调用其父类的构造方法,首先,设置Dubbo服务端线程池中线程的名称,可以通过参数threadname来指定线程池中线程的前缀,默认为:DubboServerHandler + dubbo服务端IP与接口号。我比较好奇的是这里为什么需要对ChannelHandler进行包装呢?是增加了些什么逻辑呢?带着者问题,引出本节重点探讨的内容:事件派发机制。
    事件派发机制指的是网络事件(连接、读、写)等事件触发后,这些事件如何执行,是由IO线程还是派发到线程池中执行。Dubbo定义了如下5种事件派发机制:

    本文将详细分析各种事件的派发实现原理。
    ChannelHandlers#wrapInternal

    
    1protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    2        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
    3                .getAdaptiveExtension().dispatch(handler, url)));
    4}
    

    这里是典型的装饰模式,MultiMessageHandler,多消息处理Handler,HeartbeatHandler,心跳Handler,其主要功能是处理心跳返回与心跳请求,直接在IO线程中执行,每次收到信息,更新通道的读事件戳,每次发送数据时,记录通道的写事件戳。这里的核心关键是利用SPI自适配,返回合适的事件派发机制。Dispatcher的类层次结构如图所示:

    AllDispatcher实现原理

    线程派发机制:所有的消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。

    
    1public class AllDispatcher implements Dispatcher {
    2    public static final String NAME = "all";
    3    @Override
    4    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
    5        return new AllChannelHandler(handler, url);
    6    }
    7}
    

    从中可以看出,事件派发类继承图分两个维度,Dispatcher(事件派发器)、与之对应的ChannelHandler,例如AllChannelHandler。

    WrappedChannelHandler

    接下来分析事件派发机制,重点关注ChannelHandler类的实现体系。

    纵观Dubbo ChannelHanler体系的设计,是经典的类装饰器模式,上述派发器主要解决的问题,是相关网络事件(连接、读(请求)、写(响应)、心跳请求、心跳响应)是在IO线程、还是在额外定义的线程池,故WrappedChannelHandler的主要职责是定义线程池相关的逻辑,具体是在IO线程上执行,还是在定义的线程池中执行,则由子类具体去定制,WrappedChannelHandler默认实现ChannelHandler的所有方法,各个方法的实现直接调用被装饰Handler的方法,见下图:

    接下来先重点关注一下WrappedChannelHandler的成员变量和构造方法的实现。

    
    1protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    2protected final ExecutorService executor;
    3protected final ChannelHandler handler;
    4protected final URL url;
    
  • ExecutorService SHARED_EXECUTOR:共享线程池,默认线程池,如果 ExecutorService executor为空,则使用SHARED_EXECUTOR

  • ExecutorService executor 定义的线程池
  • ChannelHandler handler:被装饰的ChannelHandler
  • URL url 服务提供者URL 接下来关注一下其构造函数:

  • ExecutorService executor 定义的线程池

    URL url 服务提供者URL
    接下来关注一下其构造函数:

    
     1public WrappedChannelHandler(ChannelHandler handler, URL url) {
     2        this.handler = handler;
     3        this.url = url;
     4        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);    // @1
     5
     6        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
     7        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
     8            componentKey = Constants.CONSUMER_SIDE;
     9        }
    10        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    11        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);  // @2
    12    }
    

    代码@1:构建线程池,这里基于SPI机制,用户可选择cached、eager、fixed、limited,将在本节下面详细介绍,这里只需要知道是构建了一个线程池。
    代码@2:将服务端都与线程池缓存起来,在服务端,线程池的缓存级别是 服务提供者协议(端口):线程池。

    AllChannelHandler

    事件派发机制:所有网络事件在线程池中执行,其实现机制肯定是重写ChannelHandler的所有网络事件方法,将调用其修饰的ChannelHanlder在线程池中执行。由于AllChannelHandler是第一个事件派发机制,故对其实现做一个详细描述。

    AllChannelHandler#connected

    
    1public void connected(Channel channel) throws RemotingException {
    2        ExecutorService cexecutor = getExecutorService();
    3        try {
    4            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    5        } catch (Throwable t) {
    6            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    7        }
    8    }
    

    连接事件,其主要实现是,首先先获取执行线程池,其获取逻辑是如果executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).
    getAdaptiveExtension().getExecutor(url);获取不到线程池,则使用共享线程池。可以看出,连接事件的业务调用时异步执行,基于线程池。
    注:调用时机,服务端收到客户端连接后,该方法会被调用。

    AllChannelHandler#disconnected

    
    1public void disconnected(Channel channel) throws RemotingException {
    2        ExecutorService cexecutor = getExecutorService();
    3        try {
    4            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    5        } catch (Throwable t) {
    6            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
    7        }
    8    }
    

    其基本实现与connected相同,就是将具体的disconnected 事件所对应的业务扩展方法在线程池中执行。
    注:调用时机,服务端收到客户端断开连接后,该方法会被调用。

    AllChannelHandler#received

    
     1public void received(Channel channel, Object message) throws RemotingException {
     2        ExecutorService cexecutor = getExecutorService();
     3        try {
     4            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
     5        } catch (Throwable t) {
     6            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
     7            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
     8            if(message instanceof Request && t instanceof RejectedExecutionException){
     9                Request request = (Request)message;
    10                if(request.isTwoWay()){
    11                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
    12                    Response response = new Response(request.getId(), request.getVersion());
    13                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
    14                    response.setErrorMessage(msg);
    15                    channel.send(response);
    16                    return;
    17                }
    18            }
    19            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    20        }
    21    }
    

    调用时机:当服务端收到客户端发送的请求后,经过IO线程(Netty)会首先从二进制流中解码出一个个的请求,参数Object message,就是调用请求,然后在提交给线程池执行,执行完后,当业务处理完毕后,组装结果后,必然会在该线程中调用通道(Channel#write,flush)方法,向通道写入响应结果。
    注:all事件派发机制,ChannelHandler#recive是在线程池中执行。

    AllChannelHandler#caught

    
    1public void caught(Channel channel, Throwable exception) throws RemotingException {
    2        ExecutorService cexecutor = getExecutorService();
    3        try {
    4            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
    5        } catch (Throwable t) {
    6            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
    7        }
    8    }
    

    当发生异常时,ChannelHandler#caught也在线程池中执行。
    令人颇感意外的是,AllChannelHandler并未重写WrappedChannelHandler的sent方法,也就是说ChannelHandler#sent事件回调方法,是在IO线程中执行。
    WrappedChannelHandler#sent

    
    1public void sent(Channel channel, Object message) throws RemotingException {
    2        handler.sent(channel, message);
    3}
    

    这个和官方文档还是有一定出入的。

    ExecutionChannelHandler

    对应事件派件器:ExecutionDispatcher,其配置值:execution,从其源码的实现来看,与AllDispatcher实现基本类似,唯一的区别是,如果executor线程池为空时,并不会使用共享线程池,目前我还想不出什么情况下,线程池会初始化失败。

    DirectDispatcher

    直接派发,也就是所有的事件全部在IO线程中执行,故其实现非常简单:

    
    1public class DirectDispatcher implements Dispatcher {
    2    public static final String NAME = "direct";
    3    @Override
    4    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
    5        return handler;
    6    }
    7}
    

    MessageOnlyDispatcher

    其对应的ChannelHandler为MessageOnlyChannelHandler。
    事件派发器:只有请求事件在线程池中执行,其他响应事件、心跳,连接,断开连接等事件在IO线程上执行,故其只需要重写recive方法即可。

    
     1@Override
     2    public void received(Channel channel, Object message) throws RemotingException {
     3        ExecutorService cexecutor = executor;
     4        if (cexecutor == null || cexecutor.isShutdown()) {
     5            cexecutor = SHARED_EXECUTOR;
     6        }
     7        try {
     8            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
     9        } catch (Throwable t) {
    10            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    11        }
    12    }
    

    ConnectionOrderedDispatcher

    对应的事件处理器为:ConnectionOrderedChannelHandler。
    事件派发器:连接、断开连接事件排队执行,并可通过connect.queue.capacity属性设置队列长度,请求事件、异常事件在线程池中执行。

    构造方法如下:

    
     1public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
     2        super(handler, url);
     3        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
     4        connectionExecutor = new ThreadPoolExecutor(1, 1,
     5                0L, TimeUnit.MILLISECONDS,
     6                new LinkedBlockingQueueRunnable(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
     7                new NamedThreadFactory(threadName, true),
     8                new AbortPolicyWithReport(threadName, url)
     9        );  // FIXME There's no place to release connectionExecutor!
    10        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    11    }
    

    重点关注一下connectionExecutor ,用来执行连接、断开事件的线程池,线程池中只有一个线程,并且队列可以选择时有界队列,通过connect.queue.capacity属性配置,超过的事件,则拒绝执行。

    ConnectionOrderedChannelHandler#connected

    
    1public void connected(Channel channel) throws RemotingException {
    2        try {
    3            checkQueueLength();
    4            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    5        } catch (Throwable t) {
    6            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    7        }
    8    }
    

    检查队列长度,如果超过警告值,则输出警告信息,然后提交连接线程池中执行,disconnected事件类似。其他received、caught事件,则与AllDispatcher相同,就不在重复。

    总结:本文主要是分析阐述了Dubbo Dispatch机制,但与官方文档存在出入,先归纳如下:

    Dubbo事件派发机制:
    所有的sent事件方法、心跳请求全部在IO线程上执行。

  • all 除sent事件回调方法、心跳外,全部在线程池上执行。
  • execution 与all类似,唯一区就是all在线程池未指定时,可以使用共享线程池,这个差别等同于没有。
  • message 只有请求事件在线程池中执行,其他在IO线程上执行。
  • connection 请求事件在线程池中执行,连接、断开连接事件排队执行(含一个线程的线程池)
  • direct 所有事件都在IO线程中执行。
  • 除sent事件回调方法、心跳外,全部在线程池上执行。

    与all类似,唯一区就是all在线程池未指定时,可以使用共享线程池,这个差别等同于没有。

    只有请求事件在线程池中执行,其他在IO线程上执行。

    请求事件在线程池中执行,连接、断开连接事件排队执行(含一个线程的线程池)

    所有事件都在IO线程中执行。

    广告:作者新书《RocketMQ技术内幕》已上市

    源码分析Dubbo事件派发机制

    《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
    新书7折优惠!7折优惠!7折优惠!

    更多文章请关注微信公众号:

    源码分析Dubbo事件派发机制

    推荐关注微信公众号:RocketMQ官方微信公众号:

    源码分析Dubbo事件派发机制

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo事件派发机制


     上一篇
    源码分析Dubbo线程池实现原理 源码分析Dubbo线程池实现原理
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 本文主要分析Dubbo线程池的构建过程,主要介绍官方文档中有关于ThreadPool的种类: fixed 固定大小线程池,启动时建立线程,不关闭,
    2021-04-05
    下一篇 
    Dubbo网络通讯篇概述 Dubbo网络通讯篇概述
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 从本节开始将深入学习Dubbo网络通讯的底层实现细节,在深入学习Dubbo网络模型时,首先应从整体上了解Dubbo的网络通讯模型、线程模型是怎样的?下
    2021-04-05