源码分析Dubbo网络通信NettyClient实现原理

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo网络通信NettyClient实现原理

微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者

在源码分析Dubbo通讯篇之网络核心类一文中已给出Dubbo netty client的启动流程,如下图:

以Dubbo协议为例,DubboProtocol#refer中,在创建Invoker时,通过getClient方法,开始Client(连接的)创建过程,先重点看一下:


 1private ExchangeClient[] getClients(URL url) {    // @1
 2        // whether to share connection
 3        boolean service_share_connect = false;
 4        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);    // @2
 5        // if not configured, connection is shared, otherwise, one connection for one service      
 6        if (connections == 0) {
 7            service_share_connect = true;
 8            connections = 1;
 9        }
10
11        ExchangeClient[] clients = new ExchangeClient[connections];   // @3
12        for (int i = 0; i  clients.length; i++) {
13            if (service_share_connect) {
14                clients[i] = getSharedClient(url);       // @4
15            } else {
16                clients[i] = initClient(url);    // @5
17            }
18        }
19        return clients;
20    }

代码@1:参数URL,服务提供者URL。

代码@2:获取dubbo:reference connections = “” /,默认0表示客户端对同一个服务提供者的所有服务,使用共享一个连接,如果该值有设置,则使用非共享的客户端,所谓的共享客户端,以Netty为例,也即客户端对同一服务提供者发起的不同服务,使用同一个客户端(NettyClient)进行请求的发送与接收。

代码@3:根据connections,创建ExchangeClients数组。

代码@4:如果使用共享连接,则调用getSharedClient获取共享连接,如果客户端未建立,则创建客户端。

 代码@5:如果不使用共享连接,调用initClient创建客户端,其创建时序图如上图所示。

接下来,还是以Netty4为例,探究一下Dubbo NettyClient的创建细节。

首先从全貌上大概看一下NettyClient对象所持有的属性:

  • AbstractPeer 1、private final ChannelHandler handler

    事件处理Handler。 2、private volatile URL url

    该协议的第一个服务提供者的URL,Server只需要用到URL中的参数,与具体某一个服务没什么关系。
  • AbstractEndpoint 1、private Codec2 codec

    编码解码器。 2、private int timeout

    超时时间 3、private int connectTimeout

    连接超时时间

    注,如果通过dubbo:provider改变codec,不同的服务提供者引用的默认服务提供者参数不同,那这个只能是以第一个为主了,应该不科学?
  • AbstractClient 1、protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler"  

    Dubbo 客户端线程名称。 2、private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger()  

     客户端线程池ID自增器。 3、private static final ScheduledThreadPoolExecutor reconnectExecutorService

    客户端连接重连线程池。 4、private final Lock connectLock = new ReentrantLock()    

    客户端连接服务端独占锁,保证一个客户端同时只会一个线程在执行连接动作。 5、private final boolean send_reconnect

    消息发送时,如果当前客户端未连接,是否发起重连操作。 6、private final AtomicInteger reconnect_count = new AtomicInteger(0)    记录重连的次数。 7、private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false)

    连接出错后是否打印过ERROR日志。 8、private final int reconnect_warning_period

    对连接异常,以WARN级别日志输出的频率,默认第一次是以Error日志,然后每出现reconnect_warning_period次后,就打印一次warn级别日志。 9、private final long shutdown_timeout

    关闭服务的超时时间。 10、protected volatile ExecutorService executor    客户端线程池。 11、private volatile ScheduledFuture ? reconnectExecutorFuture = null  重连的Future。 12、private long lastConnectedTime = System.currentTimeMillis()

    上一次重连时间戳。
  • NettyClient 1、private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true))

    IO线程组,同一个JVM中所有的客户端公用一个IO线程组,且线程数固定为(32与CPU核数+1的最小值)。 2、private Bootstrap bootstrap

    Netty客户端启动实例。 3、private volatile Channel channel

    客户端连接,请copy其引用使用。
  • 事件处理Handler。
    2、private volatile URL url

    AbstractEndpoint
    1、private Codec2 codec

    超时时间
    3、private int connectTimeout

    注,如果通过dubbo:provider改变codec,不同的服务提供者引用的默认服务提供者参数不同,那这个只能是以第一个为主了,应该不科学?

    Dubbo 客户端线程名称。
    2、private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger()  

    客户端连接重连线程池。
    4、private final Lock connectLock = new ReentrantLock()    

    消息发送时,如果当前客户端未连接,是否发起重连操作。
    6、private final AtomicInteger reconnect_count = new AtomicInteger(0)    记录重连的次数。
    7、private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false)

    对连接异常,以WARN级别日志输出的频率,默认第一次是以Error日志,然后每出现reconnect_warning_period次后,就打印一次warn级别日志。
    9、private final long shutdown_timeout

    上一次重连时间戳。

    IO线程组,同一个JVM中所有的客户端公用一个IO线程组,且线程数固定为(32与CPU核数+1的最小值)。
    2、private Bootstrap bootstrap

    客户端连接,请copy其引用使用。

    源码分析NettyClient

    构造函数

    
    1public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {    // @1
    2        super(url, wrapChannelHandler(url, handler));    // @2
    3}
    

    代码@1:url:服务提供者URL;ChannelHandler handler:事件处理器。

    代码@2:wrapChannelHandler在讲解NettyServer时已重点分析,构造其事件转发模型(Dispatch)。

    接下来重点分析其父类的构造方法:

    
     1public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
     2        super(url, handler);     // @1
     3
     4        send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
     5
     6        shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
     7
     8        // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
     9        reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);    // @2
    10
    11        try {
    12            doOpen();        // @3
    13        } catch (Throwable t) {
    14            close();
    15            throw new RemotingException(url.toInetSocketAddress(), null,
    16                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    17                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    18        }
    19        try {
    20            // connect.
    21            connect();      // @4
    22            if (logger.isInfoEnabled()) {
    23                logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
    24            }
    25        } catch (RemotingException t) {
    26            if (url.getParameter(Constants.CHECK_KEY, true)) {
    27                close();
    28                throw t;
    29            } else {
    30                logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    31                        + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
    32            }
    33        } catch (Throwable t) {
    34            close();
    35            throw new RemotingException(url.toInetSocketAddress(), null,
    36                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    37                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    38        }
    39
    40        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)    // @5
    41                .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    42        ExtensionLoader.getExtensionLoader(DataStore.class)
    43                .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    44    }
    

    代码@1:调用父类的构造其,初始化url、ChannelHandler。

    代码@2:初始化send_reconnect、shutdown_timeout、reconnect_warning_period(默认1小时打印一次日志)

    代码@3:调用doOpen初始化客户端调用模型,后续重点分析。

    代码@4:调用connect方法,向服务端发起TCP连接。

    代码@5:获取线程池,并从缓存中移除。

    doOpen

    
     1protected void doOpen() throws Throwable {
     2        NettyHelper.setNettyLoggerFactory();
     3        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);    // @1 
     4        bootstrap = new Bootstrap();                                                                                            // @2
     5        bootstrap.group(nioEventLoopGroup)                                                                              // @3
     6                .option(ChannelOption.SO_KEEPALIVE, true)
     7                .option(ChannelOption.TCP_NODELAY, true)
     8                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
     9                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
    10                .channel(NioSocketChannel.class);
    11
    12        if (getTimeout()  3000) {     // @4
    13            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    14        } else {
    15            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
    16        }
    17
    18        bootstrap.handler(new ChannelInitializer() {
    19
    20            @Override
    21            protected void initChannel(Channel ch) throws Exception {
    22                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
    23                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
    24                        .addLast("decoder", adapter.getDecoder())
    25                        .addLast("encoder", adapter.getEncoder())
    26                        .addLast("handler", nettyClientHandler);      // @5
    27            }
    28        });
    29    }
    

    代码@1:创建NettyClientHandler。

    代码@2:创建Netty客户端启动实例bootstrap.

    代码@3:客户端绑定IO线程组(池),注意,一个JVM中所有的NettyClient共享其IO线程。

    代码@4:设置连接超时时间,最小连接超时时间为3s。

    代码@5:设置编码器、事件连接器,当触发事件后,将调用nettyClientHandler中相关的方法。

    doConnect

    
     1protected void doConnect() throws Throwable {
     2        long start = System.currentTimeMillis();
     3        ChannelFuture future = bootstrap.connect(getConnectAddress());   // @1
     4        try {
     5            boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);   // @2
     6
     7            if (ret && future.isSuccess()) {
     8                Channel newChannel = future.channel();
     9                try {
    10                    // Close old channel
    11                    Channel oldChannel = NettyClient.this.channel; // copy reference
    12                    if (oldChannel != null) {
    13                        try {
    14                            if (logger.isInfoEnabled()) {
    15                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
    16                            }
    17                            oldChannel.close();
    18                        } finally {
    19                            NettyChannel.removeChannelIfDisconnected(oldChannel);
    20                        }
    21                    }
    22                } finally {
    23                    if (NettyClient.this.isClosed()) {
    24                        try {
    25                            if (logger.isInfoEnabled()) {
    26                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
    27                            }
    28                            newChannel.close();
    29                        } finally {
    30                            NettyClient.this.channel = null;
    31                            NettyChannel.removeChannelIfDisconnected(newChannel);
    32                        }
    33                    } else {
    34                        NettyClient.this.channel = newChannel;
    35                    }
    36                }
    37            } else if (future.cause() != null) {
    38                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
    39                        + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
    40            } else {
    41                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
    42                        + getRemoteAddress() + " client-side timeout "
    43                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
    44                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
    45            }
    46        } finally {
    47            if (!isConnected()) {
    48                //future.cancel(true);
    49            }
    50        }
    51    }
    

    代码@1:调用bootstrap.connect方法发起TCP连接。

    代码@2:future.awaitUninterruptibly,连接事件只等待3S,这里写成固定了,显然没有与doOpen方法中ChannelOption.CONNECT_TIMEOUT_MILLIS保持一致。

    关于NettyClient的介绍就将到这里了,下一篇将会分析编码解码。

    广告:作者新书《RocketMQ技术内幕》已上市

    源码分析Dubbo网络通信NettyClient实现原理

    《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
    新书7折优惠!7折优惠!7折优惠!

    更多文章请关注微信公众号:

    源码分析Dubbo网络通信NettyClient实现原理

    推荐关注微信公众号:RocketMQ官方微信公众号:

    源码分析Dubbo网络通信NettyClient实现原理

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo网络通信NettyClient实现原理


     上一篇
    源码分析Dubbo编码解码实现原理(Dubbo协议) 源码分析Dubbo编码解码实现原理(Dubbo协议)
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 本节主要介绍了Dubbo协议的编码方式,涉及协议头、协议体具体的编码规则,默认使用Dubbo协议,其核心类图如下: 在Dubbo整个框架中,codec
    2021-04-05
    下一篇 
    源码分析Dubbo NettyServer与HeaderExchangeServer 源码分析Dubbo NettyServer与HeaderExchangeServer
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 本文主要分析一下NettyServer,HeaderExchangeServer实现细节。 NettyServerNettyServer整个类图如下:
    2021-04-05