微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者;
本文继续上文Dubbo服务提供者启动流程,在上篇文章中详细梳理了基于dubbo spring文件的配置方式,Dubbo是如何加载配置文件,服务提供者dubbo:service标签服务暴露全流程,本节重点关注RegistryProtocol#export中调用doLocalExport方法,根据服务暴露协议建立网络通讯服务器,在特定端口建立监听,监听来自消息消费端服务的请求。
RegistryProtocol#doLocalExport:
1private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker) {
2 String key = getCacheKey(originInvoker);
3 ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key);
4 if (exporter == null) {
5 synchronized (bounds) {
6 exporter = (ExporterChangeableWrapper) bounds.get(key);
7 if (exporter == null) {
8 final Invoker? invokerDelegete = new InvokerDelegete(originInvoker, getProviderUrl(originInvoker)); // @1
9 exporter = new ExporterChangeableWrapper((Exporter) protocol.export(invokerDelegete), originInvoker); // @2
10 bounds.put(key, exporter);
11 }
12 }
13 }
14 return exporter;
15 }
代码@1:如果服务提供者以dubbo协议暴露服务,getProviderUrl(originInvoker)返回的URL将以dubbo://开头。
代码@2:根据Dubbo内置的SPI机制,将调用DubboProtocol#export方法。
源码分析DubboProtocol#export
1public Exporter export(Invoker invoker) throws RpcException {
2 URL url = invoker.getUrl(); // @1
3 // export service.
4 String key = serviceKey(url); // @2
5 DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
6 exporterMap.put(key, exporter);
7
8 //export an stub service for dispatching event
9 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); //@3 start
10 Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
11 if (isStubSupportEvent && !isCallbackservice) {
12 String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
13 if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
14 if (logger.isWarnEnabled()) {
15 logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
16 "], has set stubproxy support event ,but no stub methods founded."));
17 }
18 } else {
19 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
20 }
21 } // @3 end
22
23 openServer(url); // @4
24 optimizeSerialization(url); // @5
25 return exporter;
26 }
代码@1:获取服务提供者URL,以协议名称,这里是dubbo://开头。
代码@2:从服务提供者URL中获取服务名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880。
代码@3:是否将转发事件导出成stub。
代码@4:根据url打开服务,下面将详细分析其实现。
代码@5:根据url优化器序列化方式。
源码分析DubboProtocol#openServer
1private void openServer(URL url) {
2 // find server.
3 String key = url.getAddress(); // @1
4 //client can export a service which's only for server to invoke
5 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
6 if (isServer) {
7 ExchangeServer server = serverMap.get(key); // @2
8 if (server == null) {
9 serverMap.put(key, createServer(url)); //@3
10 } else {
11 // server supports reset, use together with override
12 server.reset(url); //@4
13 }
14 }
15 }
代码@1:根据url获取网络地址:ip:port,例如:192.168.56.1:20880,服务提供者IP与暴露服务端口号。
代码@2:根据key从服务器缓存中获取,如果存在,则执行代码@4,如果不存在,则执行代码@3。
代码@3:根据URL创建一服务器,Dubbo服务提供者服务器实现类为ExchangeServer。
代码@4:如果服务器已经存在,用当前URL重置服务器,这个不难理解,因为一个Dubbo服务中,会存在多个dubbo:service标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务。
源码分析DubboProtocol#createServer
1private ExchangeServer createServer(URL url) {
2 // send readonly event when server closes, it's enabled by default
3 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // @1
4 // enable heartbeat by default
5 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @2
6 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @3
7
8 if (str != null && str.length() 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) // @4
9 throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10
11 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // @5
12 ExchangeServer server;
13 try {
14 server = Exchangers.bind(url, requestHandler); // @6
15 } catch (RemotingException e) {
16 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17 }
18 str = url.getParameter(Constants.CLIENT_KEY); //@7
19 if (str != null && str.length() 0) {
20 SetString supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21 if (!supportedTypes.contains(str)) {
22 throw new RpcException("Unsupported client type: " + str);
23 }
24 }
25 return server;
26 }
代码@1:为服务提供者url增加channel.readonly.sent属性,默认为true,表示在发送请求时,是否等待将字节写入socket后再返回,默认为true。
代码@2:为服务提供者url增加heartbeat属性,表示心跳间隔时间,默认为60*1000,表示60s。
代码@3:为服务提供者url增加server属性,可选值为netty,mina等等,默认为netty。
代码@4:根据SPI机制,判断server属性是否支持。
代码@5:为服务提供者url增加codec属性,默认值为dubbo,协议编码方式。
代码@6:根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例。requestHandler的实现具体在以后详细分析Dubbo服务调用时再详细分析。
代码@7:验证客户端类型是否可用。
源码分析Exchangers.bind
根据URL、ExchangeHandler构建服务器
1public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2 if (url == null) {
3 throw new IllegalArgumentException("url == null");
4 }
5 if (handler == null) {
6 throw new IllegalArgumentException("handler == null");
7 }
8 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
9 return getExchanger(url).bind(url, handler);
10 }
上述代码不难看出,首先根据url获取Exchanger实例,然后调用bind方法构建ExchangeServer,Exchanger接口如下
ExchangeClient connect(URL url, ExchangeHandler handler):服务消费者调用。
dubbo提供的实现类为:HeaderExchanger,其bind方法如下:
HeaderExchanger#bind
1public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
3}
从这里可以看出,端口的绑定由Transporters的bind方法实现。
源码分析Transporters.bind方法
1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
2 if (url == null) {
3 throw new IllegalArgumentException("url == null");
4 }
5 if (handlers == null || handlers.length == 0) {
6 throw new IllegalArgumentException("handlers == null");
7 }
8 ChannelHandler handler;
9 if (handlers.length == 1) {
10 handler = handlers[0];
11 } else {
12 handler = new ChannelHandlerDispatcher(handlers);
13 }
14 return getTransporter().bind(url, handler);
15 }
16
17public static Transporter getTransporter() {
18 return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
19}
从这里得知,Dubbo网络传输的接口有Transporter接口实现,其继承类图所示:
NettyTransporter源码如下:
1public class NettyTransporter implements Transporter {
2
3 public static final String NAME = "netty";
4
5 @Override
6 public Server bind(URL url, ChannelHandler listener) throws RemotingException {
7 return new NettyServer(url, listener);
8 }
9
10 @Override
11 public Client connect(URL url, ChannelHandler listener) throws RemotingException {
12 return new NettyClient(url, listener);
13 }
14}
NettyServer建立网络连接的实现方法为:
1protected void doOpen() throws Throwable {
2 NettyHelper.setNettyLoggerFactory();
3 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
4 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
5 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
6 bootstrap = new ServerBootstrap(channelFactory);
7
8 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // @1
9 channels = nettyHandler.getChannels();
10 // https://issues.jboss.org/browse/NETTY-365
11 // https://issues.jboss.org/browse/NETTY-379
12 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
13 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
14 @Override
15 public ChannelPipeline getPipeline() {
16 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
17 ChannelPipeline pipeline = Channels.pipeline();
18 /*int idleTimeout = getIdleTimeout();
19 if (idleTimeout 10000) {
20 pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
21 }*/
22 pipeline.addLast("decoder", adapter.getDecoder());
23 pipeline.addLast("encoder", adapter.getEncoder());
24 pipeline.addLast("handler", nettyHandler); // @2
25 return pipeline;
26 }
27 });
28 // bind
29 channel = bootstrap.bind(getBindAddress());
30 }
熟悉本方法需要具备Netty的知识,有关源码:阅读Netty系列文章,这里不对每一行代码进行解读,对于与网络相关的参数,将在后续文章中详细讲解,本方法@1、@2引起了我的注意,首先创建NettyServer必须传入一个服务提供者URL,但从DubboProtocol#createServer中可以看出,Server是基于网络套接字(ip:port)缓存的,一个JVM应用中,必然会存在多个dubbo:server标签,就会有多个URL,这里为什么可以这样做呢?从DubboProtocol#createServer中可以看出,在解析第二个dubbo:service标签时并不会调用createServer,而是会调用Server#reset方法,是不是这个方法有什么魔法,在reset方法时能将URL也注册到Server上,那接下来分析NettyServer#reset方法是如何实现的。
源码分析DdubboProtocol#reset
reset方法最终将用Server的reset方法,同样还是以netty版本的NettyServer为例,查看reset方法的实现原理。NettyServer#reset-父类(AbstractServer)
AbstractServer#reset
1public void reset(URL url) {
2 if (url == null) {
3 return;
4 }
5 try { // @1 start
6 if (url.hasParameter(Constants.ACCEPTS_KEY)) {
7 int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
8 if (a 0) {
9 this.accepts = a;
10 }
11 }
12 } catch (Throwable t) {
13 logger.error(t.getMessage(), t);
14 }
15 try {
16 if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
17 int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
18 if (t 0) {
19 this.idleTimeout = t;
20 }
21 }
22 } catch (Throwable t) {
23 logger.error(t.getMessage(), t);
24 }
25 try {
26 if (url.hasParameter(Constants.THREADS_KEY)
27 && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
28 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
29 int threads = url.getParameter(Constants.THREADS_KEY, 0);
30 int max = threadPoolExecutor.getMaximumPoolSize();
31 int core = threadPoolExecutor.getCorePoolSize();
32 if (threads 0 && (threads != max || threads != core)) {
33 if (threads core) {
34 threadPoolExecutor.setCorePoolSize(threads);
35 if (core == max) {
36 threadPoolExecutor.setMaximumPoolSize(threads);
37 }
38 } else {
39 threadPoolExecutor.setMaximumPoolSize(threads);
40 if (core == max) {
41 threadPoolExecutor.setCorePoolSize(threads);
42 }
43 }
44 }
45 }
46 } catch (Throwable t) {
47 logger.error(t.getMessage(), t);
48 } // @1 end
49 super.setUrl(getUrl().addParameters(url.getParameters())); // @2
50 }
代码@1:首先是调整线程池的相关线程数量,这个好理解。、
代码@2:然后设置调用setUrl覆盖原先NettyServer的private volatile URL url的属性,那为什么不会影响原先注册的dubbo:server呢?
原来NettyHandler上加了注解:@Sharable,由该注解去实现线程安全。
Dubbo服务提供者启动流程将分析到这里了,本文并未对网络细节进行详细分析,旨在梳理出启动流程,有关Dubbo服务网络实现原理将在后续章节中详细分析,敬请期待。
广告:作者的新书《RocketMQ技术内幕》已上市
《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
新书7折优惠!7折优惠!7折优惠!
更多文章请关注微信公众号:
推荐关注微信公众号:RocketMQ官方微信公众号
原文始发于微信公众号(中间件兴趣圈):