前言
在源码分析Dubbo通讯篇之网络核心类一文中已给出Dubbo netty client的启动流程,如下图:
正文
以Dubbo协议为例,DubboProtocol#refer中,在创建Invoker时,通过getClient方法,开始Client(连接的)创建过程,先重点看一下:
private ExchangeClient[] getClients(URL url) { // @1
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // @2
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections]; // @3
for (int i = 0; i clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url); // @4
} else {
clients[i] = initClient(url); // @5
}
}
return clients;
}
代码@1:参数URL,服务提供者URL。
代码@2:获取 dubbo:reference connections = “” /,默认0表示客户端对同一个服务提供者的所有服务,使用共享一个连接,如果该值有设置,则使用非共享的客户端,所谓的共享客户端,以Netty为例,也即客户端对同一服务提供者发起的不同服务,使用同一个客户端(NettyClient)进行请求的发送与接收。
代码@3,根据connections,创建ExchangeClients数组。
代码@4:如果使用共享连接,则调用getSharedClient获取共享连接,如果客户端未建立,则创建客户端。
代码@5,如果不使用共享连接,调用initClient创建客户端,其创建时序图如上图所示。
接下来,还是以Netty4为例,探究一下Dubbo NettyClient的创建细节。
首先从全貌上大概看一下NettyClient对象所持有的属性:
AbstractPeer
1、private final ChannelHandler handler : 事件处理Handler。
2、private volatile URL url :该协议的第一个服务提供者的URL,Server只需要用到URL中的参数,与具体某一个服务没什么关系。
AbstractEndpoint
1、private Codec2 codec :编码解码器。
2、private int timeout : 超时时间
3、private int connectTimeout :连接超时时间
注,如果通过dubbo:provider改变codec,不同的服务提供者引用的默认服务提供者参数不同,那这个只能是以第一个为主了,应该不科学?
AbstractClient
1、protected static final String CLIENT_THREAD_POOL_NAME
= “DubboClientHandler” Dubbo 客户端线程名称。
private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger() 客户端线程池ID自增器。
2、private static final ScheduledThreadPoolExecutor
reconnectExecutorService 客户端连接重连线程池。
3、private final Lock connectLock = new ReentrantLock() 客户端连接服务端独占锁,保证一个客户端同时只会一个线程在执行连接动作。
4、private final boolean send_reconnect 消息发送时,如果当前客户端未连接,是否发起重连操作。
5、private final AtomicInteger reconnect_count = new AtomicInteger(0) 记录重连的次数。
6、private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false) 连接出错后是否打印过ERROR日志。
7、private final int reconnect_warning_period :对连接异常,以WARN级别日志输出的频率,默认第一次是以Error日志,然后每出现reconnect_warning_period次后,就打印一次warn级别日志。
8、private final long shutdown_timeout :关闭服务的超时时间。
9、protected volatile ExecutorService executor 客户端线程池。
10、private volatile ScheduledFuture ? reconnectExecutorFuture = null 重连的Future。
11、private long lastConnectedTime = System.currentTimeMillis() 上一次重连时间戳。
NettyClient
1、private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory(“NettyClientWorker”, true)); IO线程组,同一个JVM中所有的客户端公用一个IO线程组,且线程数固定为(32与CPU核数+1的最小值)。
2、private Bootstrap bootstrap :Netty客户端启动实例。
3、private volatile Channel channel:客户端连接,请copy其引用使用。
**1、源码分析NettyClient **
1.1 源码分析构造函数
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { // @1
super(url, wrapChannelHandler(url, handler)); // @2
}
代码@1:url:服务提供者URL;ChannelHandler handler:事件处理器。
代码@2:wrapChannelHandler在讲解NettyServer时已重点分析,构造其事件转发模型(Dispatch)。
接下来重点分析其父类的构造方法:
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler); // @1
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); // @2
try {
doOpen(); // @3
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect(); // @4
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) // @5
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
代码@1:调用父类的构造其,初始化url、ChannelHandler。
代码@2:初始化send_reconnect 、shutdown_timeout、reconnect_warning_period(默认1小时打印一次日志)
代码@3:调用doOpen初始化客户端调用模型,后续重点分析。
代码@4:调用connect方法,向服务端发起TCP连接。
代码@5:获取线程池,并从缓存中移除。
1.2 doOpen
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); // @1
bootstrap = new Bootstrap(); // @2
bootstrap.group(nioEventLoopGroup) // @3
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getTimeout() 3000) { // @4
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler); // @5
}
});
}
代码@1:创建NettyClientHandler。
代码@2:创建Netty客户端启动实例bootstrap.
代码@3:客户端绑定IO线程组(池),注意,一个JVM中所有的NettyClient共享其IO线程。
代码@4:设置连接超时时间,最小连接超时时间为3s。
代码@5:设置编码器、事件连接器,当触发事件后,将调用nettyClientHandler中相关的方法。
1.3 doConnect
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress()); // @1
try {
boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS); // @2
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
代码@1:调用bootstrap.connect方法发起TCP连接。
代码@2:future.awaitUninterruptibly,连接事件只等待3S,这里写成固定了,显然没有与doOpen方法中ChannelOption.CONNECT_TIMEOUT_MILLIS保持一致。
文/编辑 by Justin
原创 by 丁威
原文始发于微信公众号(Justin的后端书架):