- 精尽 Dubbo 原理与源码专栏( 已经完成 69+ 篇,预计总共 75+ 篇 )
- 中文详细注释的开源项目
- Java 并发源码合集
- RocketMQ 源码合集
- Sharding-JDBC 源码解析合集
- Spring MVC 和 Security 源码合集
- MyCAT 源码解析合集
本文主要基于 Eureka 1.8.X 版本
- 概述
- EndPoint
- 2.1 EurekaEndpoint
- 2.2 DefaultEndpoint
- 2.3 AwsEndpoint
- 解析器
- 3.1 ClusterResolver
- 3.2 ClosableResolver
- 3.3 DnsTxtRecordClusterResolver
- 3.4 ConfigClusterResolver
- 3.5 ZoneAffinityClusterResolver
- 3.6 AsyncResolver
- 3.6.1 定时任务
- 3.6.2 解析 EndPoint 集群
1. 概述
本文主要分享 EndPoint 与 解析器。
- EndPoint ,服务端点。例如,Eureka-Server 的访问地址。
- EndPoint 解析器,将配置的 Eureka-Server 的访问地址解析成 EndPoint 。
目前有多种 Eureka-Server 访问地址的配置方式,本文只分享 Eureka 1.x 的配置,不包含 Eureka 1.x 对 Eureka 2.x 的兼容配置:
- 第一种,直接配置实际访问地址。例如,
eureka.serviceUrl.defaultZone=http://127.0.0.1:8080/v2
。 - 第二种,基于 DNS 解析出访问地址。例如,
eureka.shouldUseDns=true
并且eureka.eurekaServer.domainName=eureka.iocoder.cn
。
本文涉及类在
com.netflix.discovery.shared.resolver
包下,涉及到主体类的类图如下( 打开大图 ):
- 红色部分 —— EndPoint
- 黄色部分 —— EndPoint 解析器
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
推荐 Spring Cloud 视频:
- Java 微服务实践 - Spring Boot
- Java 微服务实践 - Spring Cloud
- Java 微服务实践 - Spring Boot / Spring Cloud
2. EndPoint
2.1 EurekaEndpoint
com.netflix.discovery.shared.resolver.EurekaEndpoint
,Eureka 服务端点接口,实现代码如下:
public interface EurekaEndpoint extends ComparableObject {
/**
* @return 完整的服务 URL
*/
String getServiceUrl();
/**
* @deprecated use {@link #getNetworkAddress()}
*/
@Deprecated
String getHostName();
/**
* @return 网络地址
*/
String getNetworkAddress();
/**
* @return 端口
*/
int getPort();
/**
* @return 是否安全( https )
*/
boolean isSecure();
/**
* @return 相对路径
*/
String getRelativeUri();
}
2.2 DefaultEndpoint
com.netflix.discovery.shared.resolver.DefaultEndpoint
,默认 Eureka 服务端点实现类。实现代码如下:
public class DefaultEndpoint implements EurekaEndpoint {
/**
* 网络地址
*/
protected final String networkAddress;
/**
* 端口
*/
protected final int port;
/**
* 是否安全( https )
*/
protected final boolean isSecure;
/**
* 相对地址
*/
protected final String relativeUri;
/**
* 完整的服务 URL
*/
protected final String serviceUrl;
public DefaultEndpoint(String serviceUrl) {
this.serviceUrl = serviceUrl;
// 将 serviceUrl 分解成 几个属性
try {
URL url = new URL(serviceUrl);
this.networkAddress = url.getHost();
this.port = url.getPort();
this.isSecure = "https".equals(url.getProtocol());
this.relativeUri = url.getPath();
} catch (Exception e) {
throw new IllegalArgumentException("Malformed serviceUrl: " + serviceUrl);
}
}
public DefaultEndpoint(String networkAddress, int port, boolean isSecure, String relativeUri) {
this.networkAddress = networkAddress;
this.port = port;
this.isSecure = isSecure;
this.relativeUri = relativeUri;
// 几个属性 拼接成 serviceUrl
StringBuilder sb = new StringBuilder().append(isSecure ? "https" : "http").append("://").append(networkAddress);
if (port = 0) {
sb.append(':').append(port);
}
if (relativeUri != null) {
if (!relativeUri.startsWith("/")) {
sb.append('/');
}
sb.append(relativeUri);
}
this.serviceUrl = sb.toString();
}
}
- 重写了
#equals(…)
和#hashCode(…)
方法,标准实现方式,这里就不贴代码了。 - 重写了
#compareTo(…)
方法,基于serviceUrl
属性做比较。
2.3 AwsEndpoint
com.netflix.discovery.shared.resolver.aws.AwsEndpoint
,基于
region
、
zone
的 Eureka 服务端点实现类 ( 请不要在意 AWS 开头 )。实现代码如下:
public class AwsEndpoint extends DefaultEndpoint {
/**
* 区域
*/
protected final String region;
/**
* 可用区
*/
protected final String zone;
}
- 重写了
#equals(…)
和#hashCode(…)
方法,标准实现方式,这里就不贴代码了。
3. 解析器
EndPoint 解析器使用委托设计模式实现。所以,上文图片中我们看到好多个解析器,实际代码非常非常非常清晰。
FROM 《委托模式》 委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。
我们在上图的基础上,增加委托的关系,如下图:
3.1 ClusterResolver
com.netflix.discovery.shared.resolver.ClusterResolver
,集群解析器接口。接口代码如下:
public interface ClusterResolverT extends EurekaEndpoint {
/**
* @return 地区
*/
String getRegion();
/**
* @return EndPoint 集群( 数组 )
*/
List getClusterEndpoints();
}
3.2 ClosableResolver
com.netflix.discovery.shared.resolver.ClosableResolver
,可关闭的解析器接口,继承自 ClusterResolver 接口。接口代码如下:
public interface ClosableResolverT extends EurekaEndpoint extends ClusterResolverT {
/**
* 关闭
*/
void shutdown();
}
3.3 DnsTxtRecordClusterResolver
com.netflix.discovery.shared.resolver.aws.DnsTxtRecordClusterResolver
,基于 DNS TXT 记录类型的集群解析器。类属性代码如下:
public class DnsTxtRecordClusterResolver implements ClusterResolverAwsEndpoint {
/**
* 地区
*/
private final String region;
/**
* 集群根地址,例如 txt.default.eureka.iocoder.cn
*/
private final String rootClusterDNS;
/**
* 是否解析可用区( zone )
*/
private final boolean extractZoneFromDNS;
/**
* 端口
*/
private final int port;
/**
* 是否安全
*/
private final boolean isSecure;
/**
* 相对地址
*/
private final String relativeUri;
}
-
- 主机记录 :格式为 `TXT.${ZONE}.${自定义二级域名}` 或者 `${ZONE}.${自定义二级域名}`。
- 记录类型 :**TXT 记录类型**。
- 记录值 :EndPoint 的网络地址。如有多个 EndPoint,使用**空格**分隔。
- 主机记录 :格式为 `TXT.${REGION}.${自定义二级域名}` 。
- 记录类型 :**TXT 记录类型**。
- 记录值 :第二层的**主机记录**。如有多个第二层级,使用**空格**分隔。
- 第一层 :
- 第二层:
举个例子:
rootClusterDNS
,集群根地址。例如:
txt.default.eureka.iocoder.cn
,其·
txt.default.eureka
为 DNS 解析记录的第一层的主机记录。
region
:地区。需要和
rootClusterDNS
的
${REGION}
一致。
extractZoneFromDNS
:是否解析 DNS 解析记录的第二层级的主机记录的
${ZONE}
可用区。
#getClusterEndpoints(...)
方法,实现代码如下:
1: @Override
2: public ListAwsEndpoint getClusterEndpoints() {
3: ListAwsEndpoint eurekaEndpoints = resolve(region, rootClusterDNS, extractZoneFromDNS, port, isSecure, relativeUri);
4: if (logger.isDebugEnabled()) {
5: logger.debug("Resolved {} to {}", rootClusterDNS, eurekaEndpoints);
6: }
7: return eurekaEndpoints;
8: }
9:
10: private static ListAwsEndpoint resolve(String region, String rootClusterDNS, boolean extractZone, int port, boolean isSecure, String relativeUri) {
11: try {
12: // 解析 第一层 DNS 记录
13: SetString zoneDomainNames = resolve(rootClusterDNS);
14: if (zoneDomainNames.isEmpty()) {
15: throw new ClusterResolverException("Cannot resolve Eureka cluster addresses; there are no data in TXT record for DN " + rootClusterDNS);
16: }
17: // 记录 第二层 DNS 记录
18: ListAwsEndpoint endpoints = new ArrayList();
19: for (String zoneDomain : zoneDomainNames) {
20: String zone = extractZone ? ResolverUtils.extractZoneFromHostName(zoneDomain) : null; //
21: SetString zoneAddresses = resolve(zoneDomain);
22: for (String address : zoneAddresses) {
23: endpoints.add(new AwsEndpoint(address, port, isSecure, relativeUri, region, zone));
24: }
25: }
26: return endpoints;
27: } catch (NamingException e) {
28: throw new ClusterResolverException("Cannot resolve Eureka cluster addresses for root: " + rootClusterDNS, e);
29: }
30: }
1: private static SetString resolve(String rootClusterDNS) throws NamingException {
2: SetString result;
3: try {
4: result = DnsResolver.getCNamesFromTxtRecord(rootClusterDNS);
5: // TODO 芋艿:这块是bug,不需要这一段
6: if (!rootClusterDNS.startsWith("txt.")) {
7: result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS);
8: }
9: } catch (NamingException e) {
10: if (!rootClusterDNS.startsWith("txt.")) {
11: result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS);
12: } else {
13: throw e;
14: }
15: }
16: return result;
17: }
-
- 第 4 行 : 调用 `DnsResolver#getCNamesFromTxtRecord(…)` 方法,解析 TXT 主机记录。点击链接查看带中文注释的 DnsResolver 的代码,比较解析,笔者就不啰嗦了。
- 第 5 至 8 行 :当传递参数 `rootClusterDNS` 不以 `txt.` 开头时,即使第 4 行解析成功,也会报错,此时是个 Eureka 的 BUG 。因此,配置 DNS 解析记录时,主机记录暂时必须以 `txt.` 开头。
- 第 20 行 :解析可用区(
zone
)。 - 第 21 行 :调用
#resolve(rootClusterDNS)
解析第二层 DNS 记录。
第 17 至 25 行 :循环第一层 DNS 记录的解析结果,进一步解析第二层 DNS 记录。
3.4 ConfigClusterResolver
com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver
,基于配置文件的集群解析器。类属性代码如下:
public class ConfigClusterResolver implements ClusterResolverAwsEndpoint {
private final EurekaClientConfig clientConfig;
private final InstanceInfo myInstanceInfo;
public ConfigClusterResolver(EurekaClientConfig clientConfig, InstanceInfo myInstanceInfo) {
this.clientConfig = clientConfig;
this.myInstanceInfo = myInstanceInfo;
}
}
#getClusterEndpoints(...)
方法,实现代码如下:
// ... 省略代码,超过微信文章的长度
`// ... 省略代码,超过微信文章的长度`
-
- 必须配置 `eureka.shouldUseDns=true` ,开启基于 DNS 获取 EndPoint 集群。
- 必须配置 `eureka.eurekaServer.domainName=${xxxxx}` ,配置集群根地址。
- 选填配 `eureka.eurekaServer.port` ,`eureka.eurekaServer.context` 。
- 从代码中我们可以看出,使用 DnsTxtRecordClusterResolver 解析出 EndPoint 集群。
第 9 至 13 行 :直接配置文件填写实际 EndPoint 集群,调用
#getClusterEndpointsFromConfig()
方法,实现代码如下:
// ... 省略代码,超过微信文章的长度
`// ... 省略代码,超过微信文章的长度`
-
- 当方法参数 `preferSameZone=true` ,即 `eureka.preferSameZone=true`( 默认值 :`true` ) 时,**开始位置**为可用区数组( `availZones` )的**第一个**和应用实例所在的可用区( `myZone` )【**相等**】元素的位置。
- 当方法参数 `preferSameZone=false` ,即 `eureka.preferSameZone=false`( 默认值 :`true` ) 时,**开始位置**为可用区数组( `availZones` )的**第一个**和应用实例所在的可用区( `myZone` )【**不相等**】元素的位置。
-
第 13 行 :获得**开始位置**。实现代码如下:
`// ... 省略代码,超过微信文章的长度`
- 第 20 至 33 行 :从开始位置**顺序**将剩余的可用区的 `serviceUrls` 添加到结果。**顺序**理解如下图:
第 5 行 :调用
InstanceInfo#getZone(…)
方法,获得应用实例自己所在的可用区(
zone
)。非亚马逊 AWS 环境下,可用区数组的第一个元素就是应用实例自己所在的可用区。
第 13 行 :获得开始位置。实现代码如下:
// ... 省略代码,超过微信文章的长度
第 9 至 18 行 :拼装 EndPoint 集群结果。
3.5 ZoneAffinityClusterResolver
com.netflix.discovery.shared.resolver.aws.ZoneAffinityClusterResolver
,使用可用区亲和的集群解析器。类属性代码如下:
// ... 省略代码,超过微信文章的长度
- 属性
delegate
,委托的解析器。目前代码里使用的是 ConfigClusterResolver 。 - 属性 `zoneAffinity` ,是否可用区亲和。
true
:EndPoint 可用区为本地的优先被放在前面。false
:EndPoint 可用区非本地的优先被放在前面。
#getClusterEndpoints(...)
方法,实现代码如下:
// ... 省略代码,超过微信文章的长度
`// ... 省略代码,超过微信文章的长度`
-
- 多个主机,实现对同一个 EndPoint 集群负载均衡的效果。
- 单个主机,同一个 EndPoint 集群按照固定顺序访问。Eureka-Server 不是强一致性的注册中心,Eureka-Client 对同一个 Eureka-Server 拉取注册信息,保证两者之间增量同步的一致性。
- **注意,`ResolverUtils#randomize(…)` 使用以本机IP为随机种子**,有如下好处:
第 8 行 :调用
#randomizeAndMerge(...)
方法,分别随机打乱每个 EndPoint 集群,并进行合并数组,实现代码如下:
第 10 至 12 行 :非可用区亲和,将非本地的可用区的 EndPoint 集群放在前面。
3.6 AsyncResolver
com.netflix.discovery.shared.resolver.AsyncResolver
,异步执行解析的集群解析器。AsyncResolver 属性较多,而且复杂的多,我们拆分到具体方法里分享。
3.6.1 定时任务
AsyncResolver 内置定时任务,定时刷新 EndPoint 集群解析结果。
为什么要刷新?例如,Eureka-Server 的
serviceUrls
基于 DNS 配置。
定时任务代码如下:
// ... 省略代码,超过微信文章的长度
-
- `delegate` ,委托的解析器,目前代码为 ZoneAffinityClusterResolver。
- TimedSupervisorTask ,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租》「2.3 TimedSupervisorTask」 有详细解析。
-
`updateTask` 实现代码如下:
`// ... 省略代码,超过微信文章的长度`
- 后台任务的**发起**在 `#getClusterEndpoints()` 方法,在 「3.6.2 解析 EndPoint 集群」 详细解析。
TimedSupervisorTask ,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租》「2.3 TimedSupervisorTask」 有详细解析。
后台任务的发起在
#getClusterEndpoints()
方法,在 「3.6.2 解析 EndPoint 集群」 详细解析。
3.6.2 解析 EndPoint 集群
调用
#getClusterEndpoints()
方法,解析 EndPoint 集群,实现代码如下:
// ... 省略代码,超过微信文章的长度
`// ... 省略代码,超过微信文章的长度`
-
- 调用 `updateTask` ,解析 EndPoint 集群。
第 10 至 13 行 : 若未调度定时任务,进行调度,调用
#scheduleTask()
方法,实现代码如下:
// ... 省略代码,超过微信文章的长度
- x
第 15 行 :返回 EndPoint 集群。当第一次预热失败,会返回空,直到定时任务获得到结果。
4. 初始化解析器
Eureka-Client 在初始化时,调用
DiscoveryClient#scheduleServerEndpointTask()
方法,初始化 AsyncResolver 解析器。实现代码如下:
// ... 省略代码,超过微信文章的长度
` // ... 省略代码,超过微信文章的长度`
-
- x
- 第 10 至 23 行 :组合解析器,用于 Eureka 1.x 对 Eureka 2.x 的兼容配置,暂时不需要了解。TODO[0028]写入集群和读取集群
- 第 26 行 :调用 `#defaultBootstrapResolver()` 方法,创建默认的解析器 AsyncResolver 。
- 第 40 至 45 行 :创建 ZoneAffinityClusterResolver 。在 ZoneAffinityClusterResolver 构造方法的参数,我们看到创建 ConfigClusterResolver 作为 `delegate` 参数。
- 第 48 行 :调用 `ZoneAffinityClusterResolver#getClusterEndpoints()` 方法,**第一次 Eureka-Server EndPoint 集群解析**。
-
第 51 至 55 行 :解析不到 Eureka-Server EndPoint 集群时,可以通过配置( `eureka.experimental.clientTransportFailFastOnInit=true` ),使 Eureka-Client 初始化失败。`#failFastOnInitCheck(...)` 方法,实现代码如下:
// potential future feature, guarding with experimental flag for now // ... 省略代码,超过微信文章的长度
第 10 至 23 行 :组合解析器,用于 Eureka 1.x 对 Eureka 2.x 的兼容配置,暂时不需要了解。TODO[0028]写入集群和读取集群
第 40 至 45 行 :创建 ZoneAffinityClusterResolver 。在 ZoneAffinityClusterResolver 构造方法的参数,我们看到创建 ConfigClusterResolver 作为
delegate
参数。
第 51 至 55 行 :解析不到 Eureka-Server EndPoint 集群时,可以通过配置(
eureka.experimental.clientTransportFailFastOnInit=true
),使 Eureka-Client 初始化失败。
#failFastOnInitCheck(...)
方法,实现代码如下:
// potential future feature, guarding with experimental flag for now
// ... 省略代码,超过微信文章的长度
第 58 至 64 行 :创建 AsyncResolver 。从代码上,我们可以看到,
AsyncResolver.resultsRef
属性一开始已经用
initialValue
传递给 AsyncResolver 构造方法。实现代码如下:
Java public AsyncResolver(String name, ClusterResolver delegate, List initialValues, int executorThreadPoolSize, int refreshIntervalMs) { this( name, delegate, initialValues, executorThreadPoolSize, refreshIntervalMs, 0 ); ¨K78K }
- x
666. 彩蛋
T T 一开始看解析器,没反应过来是委托设计模式,一脸懵逼+一脸懵逼+一脸懵逼。后面理顺了,发现超级奈斯( Nice ) 啊 !!!!
胖友,你学会了么?
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?
如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。
目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:
02. 项目结构一览
- 配置 Configuration
04. 核心流程一览
- 05. 拓展机制 SPI
- 线程池
- 07. 服务暴露 Export
- 08. 服务引用 Refer
- 注册中心 Registry
- 动态编译 Compile
- 动态代理 Proxy
- 服务调用 Invoke
- 调用特性
- 过滤器 Filter
- NIO 服务器
- P2P 服务器
- HTTP 服务器
- 序列化 Serialization
- 集群容错 Cluster
- 优雅停机
- 日志适配
- 状态检查
- 监控中心 Monitor
- 管理中心 Admin
- 运维命令 QOS
- 链路追踪 Tracing
- ...
一共 60 篇++
源码不易↓↓↓↓↓
点赞****支持老艿艿↓↓