源码分析Dubbo NettyServer与HeaderExchangeServer

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo NettyServer与HeaderExchangeServer

微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者

本文主要分析一下NettyServer,HeaderExchangeServer实现细节。

NettyServer

NettyServer整个类图如下:

 首先从全貌上大概看一下NettyServer对象所持有的属性:

  • AbstractPeer 1、private final ChannelHandler handler : 事件处理Handler。 2、private volatile URL url :该协议的第一个服务提供者的URL, Server只需要用到URL中的参数,与具体某一个服务没什么关系。

  • AbstractEndpoint 这些属性都是以第一个加载到的服务提供者的配置,不科学?超时机制需要后续关注】 1、private Codec2 codec :编码解码器。 2、private int timeout :      超时时间 3、private int connectTimeout :连接超时时间 注,如果通过dubbo:provider改变codec,不同的服务提供者引用的默 认服务提供者参数不同,那这个只能是以第一个为主了,应该不科学?

  • AbstractServer 1、private InetSocketAddress localAddress :url host:port地址。 2、private InetSocketAddress bindAddress:如果是多网卡,并且指定了 bind.ip、bind.port,如果为空,与localAddress相同。 3、private int accepts : AbstractServer#accepts未使用到。 4、private int idleTimeout = 600;  AbstractServer#accepts未使用到。

  • NettyServer 1、private Map String, Channel channels: ip:port, channel 所有通道。 2、private ServerBootstrap bootstrap : netty 服务端启动器。 3、private io.netty.channel.Channel channel:服务端监听通道。 4、private EventLoopGroup bossGroup;Netty boss线程组(负责连接事件) 5、private EventLoopGroup workerGroup : nety work线程组(负责IO事件) 上述只是简单介绍一下Server端的属性,有如下几个疑问需要关注。 AbstractEndpoint中定义的属性,为什么可以取第一个服务提供者的配置。 AbstractServer中的accepts、idleTimeout为什么是取最后一服务提供者的配置【从代码中看,这两个属性,未使用】。

  • AbstractEndpoint
    这些属性都是以第一个加载到的服务提供者的配置,不科学?超时机制需要后续关注】
    1、private Codec2 codec :编码解码器。
    2、private int timeout :      超时时间
    3、private int connectTimeout :连接超时时间
    注,如果通过dubbo:provider改变codec,不同的服务提供者引用的默
    认服务提供者参数不同,那这个只能是以第一个为主了,应该不科学?

    NettyServer
    1、private Map String, Channel channels: ip:port, channel 所有通道。
    2、private ServerBootstrap bootstrap : netty 服务端启动器。
    3、private io.netty.channel.Channel channel:服务端监听通道。
    4、private EventLoopGroup bossGroup;Netty boss线程组(负责连接事件)
    5、private EventLoopGroup workerGroup : nety work线程组(负责IO事件)
    上述只是简单介绍一下Server端的属性,有如下几个疑问需要关注。
    AbstractEndpoint中定义的属性,为什么可以取第一个服务提供者的配置。
    AbstractServer中的accepts、idleTimeout为什么是取最后一服务提供者的配置【从代码中看,这两个属性,未使用】。

    NettyServer 构造方法

    
    1public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    2        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    3}
    

    直接调用父类的public AbstractServer(URL url, ChannelHandler handler)方法,从前面的文章中得知,ChannelHandlers.wrap方法会对ChannelHandler handler进行封装,主要是加入事件分发模式(Dispatch)。

    AbstractServer构造方法

    
     1public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
     2        super(url, handler);       // @1
     3        localAddress = getUrl().toInetSocketAddress();   // @2
     4
     5        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
     6        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
     7        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
     8            bindIp = NetUtils.ANYHOST;
     9        }
    10        bindAddress = new InetSocketAddress(bindIp, bindPort);   // @3
    11        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);  
    12        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); // @4
    13        try {
    14            doOpen();   // @5
    15            if (logger.isInfoEnabled()) {
    16                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
    17            }
    18        } catch (Throwable t) {
    19            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
    20                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    21        }
    22        //fixme replace this with better method
    23        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    24        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    25    }
    

    代码@1:调用父类的构造方法,主要初始化AbstractPeer(channelHandler、url)和AbstractEndpoint(codec2、timeout、idleTimeout )

    代码@2:根据URL中的host与端口,创建localAddress。

    代码@3:如果配置了dubbo:parameter key = “bind.ip” value = “”/ 与 dubbo:parameter key = “bind.port” /,则用该IP与端口创建bindAddress,通常用于多网卡,如果未配置,bindAddress与localAddress绑定的IP与端口一样。

    代码@4:初始化accepts与idleTimeout ,这两个参数未被其他地方使用。

    代码@5,调用doOpen方法,正式在相应端口建立网络监听。

    NettyServer#doOpen

    
     1protected void doOpen() throws Throwable {
     2        NettyHelper.setNettyLoggerFactory();
     3        bootstrap = new ServerBootstrap();       // @1
     4        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));    // @2
     5        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
     6                new DefaultThreadFactory("NettyServerWorker", true));    // @3
     7        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);   // @4
     8        channels = nettyServerHandler.getChannels();
     9        bootstrap.group(bossGroup, workerGroup)                                                                        // @5
    10                .channel(NioServerSocketChannel.class)
    11                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    12                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    13                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    14                .childHandler(new ChannelInitializerNioSocketChannel() {
    15                    @Override
    16                    protected void initChannel(NioSocketChannel ch) throws Exception {
    17                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
    18                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug                       
    19                                .addLast("decoder", adapter.getDecoder())   
    20                                .addLast("encoder", adapter.getEncoder())
    21                                .addLast("handler", nettyServerHandler);
    22                    }
    23                });
    24        // bind
    25        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());     // @6
    26        channelFuture.syncUninterruptibly();
    27        channel = channelFuture.channel();
    28    }
    

    代码@1:创建Netty服务端启动帮助类ServerBootstrap.

    代码@2:创建服务端Boss线程,线程名:.NettyServerBoss,主要负责客户端的连接事件,主从多Reactor线程模型中的主线程(连接事件)。

    代码@3:创建服务端Work线程组,线程名:NettyServerWorker-序号,线程个数取自参数:iothreads,默认为(CPU核数+1)与32取小值,顾名思义,IO线程数,主要处理读写事件,编码、解码都在IO线程中完成。

    代码@4:创建用户Handler,这里是NettyServerHandler。

    代码@5:Netty启动的常规写法,关注如下内容:

    
    1addLast("decoder", adapter.getDecoder())  : 添加解码器
    2addLast("encoder", adapter.getEncoder()) :添加编码器
    3addLast("handler", nettyServerHandler) :添加业务Handler。
    

    这里简单介绍一下流程:

  • 客户端建立与服务端连接,此时Boss线程的连接事件触发,建立TCP连接,并向IO线程注册该通道(Channel0)的读事件。
  • 当客户端向服务端发送请求消息后,IO线程中的读事件触发,会首先调用adapter.getDecoder() 根据对应的请求协议(例如dubbo)从二进制流中解码出一个完整的请求对象,然后传入到业务handler,例如nettyServerHandler,执行相应的事件方法,例如recive方法。
  • 当服务端向Channel写入响应结果时,首先编码器会按照协议编码成二进制流,供客户端解码。

  • **温馨提示** 如果对Netty想深入学习的话,请移步到作者的《源码分析Netty系列》https://blog.csdn.net/prestigeding/article/details/53977445

    HeaderExchangeServer

    根据Dubbo服务端初始化流程,我们可知,Dubbo为了封装各种不同的网络实现客户端(netty、mina)等,映入了Exchangers层,通用存在ExchangeServer,其实现Server并内部持有具体的Server实现端,例如NettyServer。

    接下来,我们重点来关注一下HeaderExchangeServer.
    核心属性如下:

  • ScheduledExecutorService scheduled:心跳线程数,线程名称前缀,dubbo-remoting-server-heartbeat-thread-序号
  • private final Server server:具体的Server实现类,例如NettyServer。
  • private ScheduledFuture ? heartbeatTimer:心跳调度Future,可以通过future取消心跳等动作。
  • private int heartbeat:心跳间隔时间
  • private int heartbeatTimeout:心跳超时时间,至少为heartbeat的两倍
  • private final Server server:具体的Server实现类,例如NettyServer。

    private int heartbeat:心跳间隔时间

    构造函数

    
     1public HeaderExchangeServer(Server server) {
     2        if (server == null) {
     3            throw new IllegalArgumentException("server == null");
     4        }
     5        this.server = server;
     6        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
     7        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
     8        if (heartbeatTimeout  heartbeat * 2) {
     9            throw new IllegalStateException("heartbeatTimeout  heartbeatInterval * 2");
    10        }
    11        startHeartbeatTimer();
    12    }
    

    说明,主要是通过heartbeat参数设置心跳间隔,如果不配置,则不启动心跳检测。从上面看来HeaderExchangeServer内部持有Server,并封装了心跳的功能,在这里就不细细分析了。

    广告:作者新书《RocketMQ技术内幕》已上市

    源码分析Dubbo NettyServer与HeaderExchangeServer

    《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
    新书7折优惠!7折优惠!7折优惠!

    更多文章请关注微信公众号:

    源码分析Dubbo NettyServer与HeaderExchangeServer

    推荐关注微信公众号:RocketMQ官方微信公众号:

    源码分析Dubbo NettyServer与HeaderExchangeServer

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo NettyServer与HeaderExchangeServer


     上一篇
    源码分析Dubbo网络通信NettyClient实现原理 源码分析Dubbo网络通信NettyClient实现原理
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 在源码分析Dubbo通讯篇之网络核心类一文中已给出Dubbo netty client的启动流程,如下图: 以Dubbo协议为例,DubboProt
    2021-04-05
    下一篇 
    源码分析Dubbo线程池实现原理 源码分析Dubbo线程池实现原理
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 本文主要分析Dubbo线程池的构建过程,主要介绍官方文档中有关于ThreadPool的种类: fixed 固定大小线程池,启动时建立线程,不关闭,
    2021-04-05