源码精品专栏
摘要: 原创出处 http://www.iocoder.cn/Eureka/server-cluster/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Eureka 1.8.X 版本
- 概述
- 集群节点初始化与更新
- 2.1 集群节点启动
- 2.2 更新集群节点信息
- 2.3 集群节点
- 获取初始注册信息
- 同步注册信息
- 4.1 同步操作类型
- 4.2 发起 Eureka-Server 同步操作
- 4.3 接收 Eureka-Server 同步操作
- 4.4 处理 Eureka-Server 同步结果
1. 概述
本文主要分享 Eureka-Server 集群同步注册信息。
Eureka-Server 集群如下图:
- Eureka-Server 集群不区分主从节点或者 Primary & Secondary 节点,所有节点相同角色( 也就是没有角色 ),完全对等。
- Eureka-Client 可以向任意 Eureka-Client 发起任意读写操作,Eureka-Server 将操作复制到另外的 Eureka-Server 以达到最终一致性。注意,Eureka-Server 是选择了 AP 的组件。
Eureka-Server 可以使用直接配置所有节点的服务地址,或者基于 DNS 配置。推荐阅读:《Spring Cloud构建微服务架构(六)高可用服务注册中心》 。
本文主要类在
com.netflix.eureka.cluster
包下。
OK,让我们开始愉快的遨游在代码的海洋。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
推荐 Spring Cloud 视频:
- Java 微服务实践 - Spring Boot
- Java 微服务实践 - Spring Cloud
- Java 微服务实践 - Spring Boot / Spring Cloud
ps :注意,本文提到的同步,准确来说是**复制( Replication )**。
2. 集群节点初始化与更新
com.netflix.eureka.cluster.PeerEurekaNodes
,Eureka-Server 集群节点集合 。构造方法如下 :
public class PeerEurekaNodes {
private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);
/**
* 应用实例注册表
*/
protected final PeerAwareInstanceRegistry registry;
/**
* Eureka-Server 配置
*/
protected final EurekaServerConfig serverConfig;
/**
* Eureka-Client 配置
*/
protected final EurekaClientConfig clientConfig;
/**
* Eureka-Server 编解码
*/
protected final ServerCodecs serverCodecs;
/**
* 应用实例信息管理器
*/
private final ApplicationInfoManager applicationInfoManager;
/**
* Eureka-Server 集群节点数组
*/
private volatile ListPeerEurekaNode peerEurekaNodes = Collections.emptyList();
/**
* Eureka-Server 服务地址数组
*/
private volatile SetString peerEurekaNodeUrls = Collections.emptySet();
/**
* 定时任务服务
*/
private ScheduledExecutorService taskExecutor;
@Inject
public PeerEurekaNodes(
PeerAwareInstanceRegistry registry,
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
ApplicationInfoManager applicationInfoManager) {
this.registry = registry;
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.applicationInfoManager = applicationInfoManager;
}
}
peerEurekaNodes
,peerEurekaNodeUrls
,taskExecutor
属性,在构造方法中未设置和初始化,而是在PeerEurekaNodes#start()
方法,设置和初始化,下文我们会解析这个方法。- Eureka-Server 在初始化时,调用
EurekaBootStrap#getPeerEurekaNodes(…)
方法,创建 PeerEurekaNodes ,点击 链接 查看该方法的实现。
2.1 集群节点启动
调用
PeerEurekaNodes#start()
方法,集群节点启动,主要完成两个逻辑:
- 初始化集群节点信息
- 初始化固定周期( 默认:10 分钟,可配置 )更新集群节点信息的任务
代码如下:
1: public void start() {
2: // 创建 定时任务服务
3: taskExecutor = Executors.newSingleThreadScheduledExecutor(
4: new ThreadFactory() {
5: @Override
6: public Thread newThread(Runnable r) {
7: Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
8: thread.setDaemon(true);
9: return thread;
10: }
11: }
12: );
13: try {
14: // 初始化 集群节点信息
15: updatePeerEurekaNodes(resolvePeerUrls());
16: // 初始化 初始化固定周期更新集群节点信息的任务
17: Runnable peersUpdateTask = new Runnable() {
18: @Override
19: public void run() {
20: try {
21: updatePeerEurekaNodes(resolvePeerUrls());
22: } catch (Throwable e) {
23: logger.error("Cannot update the replica Nodes", e);
24: }
25:
26: }
27: };
28: taskExecutor.scheduleWithFixedDelay(
29: peersUpdateTask,
30: serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
31: serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
32: TimeUnit.MILLISECONDS
33: );
34: } catch (Exception e) {
35: throw new IllegalStateException(e);
36: }
37: // 打印 集群节点信息
38: for (PeerEurekaNode node : peerEurekaNodes) {
39: logger.info("Replica node URL: " + node.getServiceUrl());
40: }
41: }
- 第 15 行 && 第 21 行 :调用
#updatePeerEurekaNodes()
方法,更新集群节点信息。
2.2 更新集群节点信息
调用
#resolvePeerUrls()
方法,获得 Eureka-Server 集群服务地址数组,代码如下:
1: protected ListString resolvePeerUrls() {
2: // 获得 Eureka-Server 集群服务地址数组
3: InstanceInfo myInfo = applicationInfoManager.getInfo();
4: String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
5: ListString replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
6:
7: // 移除自己(避免向自己同步)
8: int idx = 0;
9: while (idx replicaUrls.size()) {
10: if (isThisMyUrl(replicaUrls.get(idx))) {
11: replicaUrls.remove(idx);
12: } else {
13: idx++;
14: }
15: }
16: return replicaUrls;
17: }
- 第 2 至 5 行 :获得 Eureka-Server 集群服务地址数组。
EndpointUtils#getDiscoveryServiceUrls(…)
方法,逻辑与 《Eureka 源码解析 —— EndPoint 与 解析器》「3.4 ConfigClusterResolver」 基本类似。EndpointUtils 正在逐步,猜测未来这里会替换。 - 第 7 至 15 行 :移除自身节点,避免向自己同步。
调用
#updatePeerEurekaNodes()
方法,更新集群节点信息,主要完成两部分逻辑:
- 添加新增的集群节点
- 关闭删除的集群节点
代码如下:
1: protected void updatePeerEurekaNodes(ListString newPeerUrls) {
2: if (newPeerUrls.isEmpty()) {
3: logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
4: return;
5: }
6:
7: // 计算 新增的集群节点地址
8: SetString toShutdown = new HashSet(peerEurekaNodeUrls);
9: toShutdown.removeAll(newPeerUrls);
10:
11: // 计算 删除的集群节点地址
12: SetString toAdd = new HashSet(newPeerUrls);
13: toAdd.removeAll(peerEurekaNodeUrls);
14:
15: if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
16: return;
17: }
18:
19: // 关闭删除的集群节点
20: // Remove peers no long available
21: ListPeerEurekaNode newNodeList = new ArrayList(peerEurekaNodes);
22: if (!toShutdown.isEmpty()) {
23: logger.info("Removing no longer available peer nodes {}", toShutdown);
24: int i = 0;
25: while (i newNodeList.size()) {
26: PeerEurekaNode eurekaNode = newNodeList.get(i);
27: if (toShutdown.contains(eurekaNode.getServiceUrl())) {
28: newNodeList.remove(i);
29: eurekaNode.shutDown(); // 关闭
30: } else {
31: i++;
32: }
33: }
34: }
35:
36: // 添加新增的集群节点
37: // Add new peers
38: if (!toAdd.isEmpty()) {
39: logger.info("Adding new peer nodes {}", toAdd);
40: for (String peerUrl : toAdd) {
41: newNodeList.add(createPeerEurekaNode(peerUrl));
42: }
43: }
44:
45: // 赋值
46: this.peerEurekaNodes = newNodeList;
47: this.peerEurekaNodeUrls = new HashSet(newPeerUrls);
48: }
1: protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
2: HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
3: String targetHost = hostFromUrl(peerEurekaNodeUrl);
4: if (targetHost == null) {
5: targetHost = "host";
6: }
7: return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
8: }
-
- 第 2 行 :创建 Eureka-Server 集群通信客户端,在 《Eureka 源码解析 —— 网络通信》「4.2 JerseyReplicationClient」 有详细解析。
- 第 7 行 :创建 PeerEurekaNode ,在 「2.3 PeerEurekaNode」 有详细解析。
- 第 129 行 :创建 ReplicationTaskProcessor 。在 「4.1.2 同步操作任务处理器」 详细解析
- 第 131 至 140 行 :创建批量任务分发器,在 《Eureka 源码解析 —— 任务批处理》 有详细解析。
- 第 142 至 151 行 :创建单任务分发器,用于 Eureka-Server 向亚马逊 AWS 的 ASG (
Autoscaling Group
) 同步状态。暂时跳过。
第 11 至 13 行 :计算删除的集群节点地址。
第 36 至 43 行 :添加新增的集群节点。调用
#createPeerEurekaNode(peerUrl)
方法,创建集群节点,代码如下:
2.3 集群节点
com.netflix.eureka.cluster.PeerEurekaNode
,单个集群节点。
点击 链接 查看构造方法
3. 获取初始注册信息
Eureka-Server 启动时,调用
PeerAwareInstanceRegistryImpl#syncUp()
方法,从集群的一个 Eureka-Server 节点获取初始注册信息,代码如下:
1: @Override
2: public int syncUp() {
3: // Copy entire entry from neighboring DS node
4: int count = 0;
5:
6: for (int i = 0; ((i serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
7: // 未读取到注册信息,sleep 等待
8: if (i 0) {
9: try {
10: Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
11: } catch (InterruptedException e) {
12: logger.warn("Interrupted during registry transfer..");
13: break;
14: }
15: }
16:
17: // 获取注册信息
18: Applications apps = eurekaClient.getApplications();
19: for (Application app : apps.getRegisteredApplications()) {
20: for (InstanceInfo instance : app.getInstances()) {
21: try {
22: if (isRegisterable(instance)) { // 判断是否能够注册
23: register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 注册
24: count++;
25: }
26: } catch (Throwable t) {
27: logger.error("During DS init copy", t);
28: }
29: }
30: }
31: }
32: return count;
33: }
- 第 7 至 15 行 :未获取到注册信息,
sleep
等待再次重试。 - 第 17 至 30 行 :获取注册信息,若获取到,注册到自身节点。
- 第 22 行 :判断应用实例是否能够注册到自身节点。主要用于亚马逊 AWS 环境下的判断,若非部署在亚马逊里,都返回
true
。点击 链接 查看实现。 - 第 23 行 :调用
#register()
方法,注册应用实例到自身节点。在 《Eureka 源码解析 —— 应用实例注册发现(一)之注册》 有详细解析。
若调用
#syncUp()
方法,未获取到应用实例,则 Eureka-Server 会有一段时间( 默认:5 分钟,可配 )不允许被 Eureka-Client 获取注册信息,避免影响 Eureka-Client 。
// PeerAwareInstanceRegistryImpl.java
private boolean peerInstancesTransferEmptyOnStartup = true;
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// … 省略其他代码
if (count 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
// … 省略其他代码
}
// PeerAwareInstanceRegistryImpl.java
public boolean shouldAllowAccess(boolean remoteRegionRequired) {
if (this.peerInstancesTransferEmptyOnStartup) {
// 设置启动时间
this.startupTime = System.currentTimeMillis();
if (!(System.currentTimeMillis() this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
return false;
}
}
// ... 省略其他代码
return true;
}
判断 Eureka-Server 是否允许被 Eureka-Client 获取注册信息,代码如下:
// PeerAwareInstanceRegistryImpl.java
public boolean shouldAllowAccess(boolean remoteRegionRequired) {
if (this.peerInstancesTransferEmptyOnStartup) {
// 设置启动时间
this.startupTime = System.currentTimeMillis();
if (!(System.currentTimeMillis() this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
return false;
}
}
// ... 省略其他代码
return true;
}
4. 同步注册信息
Eureka-Server 集群同步注册信息如下图:
- Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定间隔( 默认值 :500 毫秒,可配 )向 Eureka-Server 集群内其他节点同步( 准实时,非实时 )。
4.1 同步操作类型
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.Action
,同步操作类型,代码如下:
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
// ... 省略监控相关属性
}
- Register ,在 《Eureka 源码解析 —— 应用实例注册发现(一)之注册》 有详细解析
- Heartbeat ,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租》 有详细解析
- Cancel ,在 《Eureka 源码解析 —— 应用实例注册发现(三)之下线》 有详细解析
- StatusUpdate ,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》 有详细解析
- DeleteStatusOverride ,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》 有详细解析
4.2 发起 Eureka-Server 同步操作
Eureka-Server 在完成 Eureka-Client 发起的上述操作在自身节点的执行后,向集群内其他 Eureka-Server 发起同步操作。以 Register 操作举例子,代码如下:
// PeerAwareInstanceRegistryImpl.java
public void register(final InstanceInfo info, final boolean isReplication) {
// 租约过期时间
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 注册应用实例信息
super.register(info, leaseDuration, isReplication);
// Eureka-Server 复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
- 最后一行,调用
#replicateToPeers(…)
方法,传递对应的同步操作类型,发起同步操作。
#replicateToPeers(...)
方法,代码如下:
1: private void replicateToPeers(Action action, String appName, String id,
2: InstanceInfo info /* optional */,
3: InstanceStatus newStatus /* optional */, boolean isReplication) {
4: Stopwatch tracer = action.getTimer().start();
5: try {
6: if (isReplication) {
7: numberOfReplicationsLastMin.increment();
8: }
9:
10: // Eureka-Server 发起的请求 或者 集群为空
11: // If it is a replication already, do not replicate again as this will create a poison replication
12: if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
13: return;
14: }
15:
16: for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
17: // If the url represents this host, do not replicate to yourself.
18: if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
19: continue;
20: }
21: replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
22: }
23: } finally {
24: tracer.stop();
25: }
26: }
- 第 10 至 14 行 :Eureka-Server 在处理上述操作( Action ),无论来自 Eureka-Client 发起请求,还是 Eureka-Server 发起同步,调用的内部方法相同,通过
isReplication=true
参数,避免死循环同步。 - 第 16 至 22 行 :循环集群内每个节点,调用
#replicateInstanceActionsToPeers(…)
方法,发起同步操作。
#replicateInstanceActionsToPeers(...)
方法,代码如下:
// ... 省略代码,太长了。
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
batchingDispatcher.process( taskId("${action}", appName, id), // id new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) { @Override public EurekaHttpResponse<Void> execute() { return replicationClient.doString(...); } @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { // do Something... } }, // ReplicationTask 子类 expiryTime )
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
// do Something…
}}, // ReplicationTask 子类
expiryTime
)
- **相同应用实例的相同同步操作使用相同任务编号**。在 《Eureka 源码解析 —— 任务批处理》「2. 整体流程」 中,我们看到" 接收线程( Runner )合并任务,将相同任务编号的任务合并,只执行一次。 ",因此,相同应用实例的相同同步操作就能被合并,减少操作量。例如,Eureka-Server 同步某个应用实例的 Heartbeat 操作,接收同步的 Eureak-Server 挂了,一方面这个应用的这次操作会**重试**,另一方面,这个应用实例会发起**新的** Heartbeat 操作,通过任务编号合并,接收同步的 Eureka-Server 恢复后,减少收到**重复积压**的任务。
-
`#task(...)` 方法,生成同步操作任务**编号**。代码如下:
private static String taskId(String requestType, String appName, String id) { return requestType + '#' + appName + '/' + id; }
- `com.netflix.eureka.cluster.ReplicationTask` ,同步任务**抽象类**
-
- 点击 链接 查看 ReplicationTask 代码。
- 定义了 `#getTaskName()` **抽象**方法。
- 定义了 `#execute()` **抽象**方法,执行同步任务。
- 实现了 `#handleSuccess()` 方法,处理成功执行同步结果。
- 实现了 `#handleFailure(…)` 方法,处理失败执行同步结果。
- 点击 链接 查看 InstanceReplicationTask 代码。
- 实现了父类
#getTaskName()
抽象方法。 - TaskProcessor ,在 《Eureka 源码解析 —— 任务批处理》「10. 任务执行器【执行任务】」 有详细解析。
- 点击 链接 查看 InstanceReplicationTask 代码。
从上面
PeerEurekaNode#同步操作(...)
方法,全部实现了 InstanceReplicationTask 类的#execute()
方法,部分重写了#handleFailure(...)
方法。4.1.2 同步操作任务处理器
com.netflix.eureka.cluster.InstanceReplicationTask
,实现 TaskProcessor 接口,同步操作任务处理器。ReplicationTaskProcessor#process(task)
,处理单任务,用于 Eureka-Server 向亚马逊 AWS 的 ASG (Autoscaling Group
) 同步状态,暂时跳过,感兴趣的同学可以点击 链接 查看方法代码。ReplicationTaskProcessor#process(tasks)
,处理批量任务,用于 Eureka-Server 集群注册信息的同步操作任务,通过调用被同步的 Eureka-Server 的peerreplication/batch/
接口,一次性将批量( 多个 )的同步操作任务发起请求,代码如下:// ... 省略代码,太长了。
- 第 4 行 :创建批量提交同步操作任务的请求对象( ReplicationList ) 。比较易懂,咱就不啰嗦贴代码了。
-
- ReplicationList ,点击 链接 查看类。
- ReplicationInstance ,点击 链接 查看类。
- `#createReplicationListOf(…)` ,点击 链接 查看方法。
- `#createReplicationInstanceOf(…)` ,点击 链接 查看方法。
JerseyReplicationClient#submitBatchUpdates(…)
方法,点击 链接 查看方法。- ReplicationListResponse ,点击 链接 查看类。
- ReplicationInstanceResponse ,点击 链接 查看类。
4.3 接收 Eureka-Server 同步操作
com.netflix.eureka.resources.PeerReplicationResource
,同步操作任务 Resource ( Controller )。peerreplication/batch/
接口,映射PeerReplicationResource#batchReplication(...)
方法,代码如下:// ... 省略代码,太长了。
- 第 7 至 15 行 :逐个处理单个同步操作任务,并将处理结果( ReplicationInstanceResponse ) 添加到 ReplicationListResponse 。
- 第 23 至 50 行 :处理**单个**同步操作任务,返回处理结果( ReplicationInstanceResponse )。
- 第 24 至 25 行 :创建 ApplicationResource , InstanceResource 。我们看到,实际该方法是把单个同步操作任务提交到其他 Resource ( Controller ) 处理,Eureka-Server 收到 Eureka-Client 请求响应的 Resource ( Controller ) 是相同的逻辑。
- Register :点击 链接 查看
#handleRegister(…)
方法。 - Heartbeat :点击 链接 查看
#handleHeartbeat(…)
方法。 - Cancel :点击 链接 查看
#handleCancel(…)
方法。 - StatusUpdate :点击 链接 查看
#handleStatusUpdate(…)
方法。 - DeleteStatusOverride :点击 链接 查看
#handleDeleteStatusOverride(…)
方法。
4.4 处理 Eureka-Server 同步结果
😈 想想就有小激动,终于写到这里了。
接
ReplicationTaskProcessor#process(tasks)
方法,处理批量提交同步操作任务的响应,代码如下:// ... 省略代码,太长了。
- 第 10 行 ,调用 `#isSuccess(…)` 方法,判断请求是否成功,响应状态码是否在 [200, 300) 范围内。
- 第 11 至 13 行 :状态码 503 ,目前 Eureka-Server 返回 503 的原因是被限流。在 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 详细解析。**该情况为瞬时错误,会重试该同步操作任务**,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
- 第 14 至 18 行 :非**预期**状态码,目前 Eureka-Server 在代码上看下来,不会返回这样的状态码。**该情况为永久错误,会重试该同步操作任务**,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
- 第 20 行 :请求成功,调用 `#handleBatchResponse(…)` 方法,逐个处理**每个** ReplicationTask 和 ReplicationInstanceResponse 。**这里有一点要注意下,请求成功指的是整个请求成功,实际每个 ReplicationInstanceResponse 可能返回的状态码不在 [200, 300) 范围内**。该方法下文详细解析。
-
第 23 至 25 行 :请求发生网络异常,例如网络超时,打印网络异常日志。目前日志的打印为部分采样,条件为网络发生异常每间隔 10 秒打印一条,避免网络发生异常打印超级大量的日志。**该情况为永久错误,会重试该同步操作任务**,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
-
- `#isNetworkConnectException(…)` ,点击 链接 查看方法。
- `#logNetworkErrorSample(…)` ,点击 链接 查看方法。
第 11 至 13 行 :状态码 503 ,目前 Eureka-Server 返回 503 的原因是被限流。在 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 详细解析。该情况为瞬时错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
第 20 行 :请求成功,调用
#handleBatchResponse(…)
方法,逐个处理每个 ReplicationTask 和 ReplicationInstanceResponse 。这里有一点要注意下,请求成功指的是整个请求成功,实际每个 ReplicationInstanceResponse 可能返回的状态码不在 [200, 300) 范围内。该方法下文详细解析。第 26 至 29 行 :非预期异常,目前 Eureka-Server 在代码上看下来,不会抛出这样的异常。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
#handleBatchResponse(...)
方法,代码如下:// ... 省略代码,太长了。
-
`ReplicationTask#handleSuccess()` 方法,无任务同步操作任务重写,是个**空方法**,代码如下:
// ReplicationTask.java public void handleSuccess() { }
-
`ReplicationTask#handleFailure()` 方法,有**两个**同步操作任务重写:
-
- x
-
Cancel :当 Eureka-Server 不存在下线的应用实例时,返回 404 状态码,此时打印错误日志,代码如下:
// PeerEurekaNode#cancel(...) @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); } }
- Heartbeat :情况较为复杂,我们换一行继续说,避免排版有问题,影响阅读。
ReplicationTask#handleFailure()
方法,有两个同步操作任务重写:// PeerEurekaNode#cancel(...) @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); } }
Heartbeat :情况较为复杂,我们换一行继续说,避免排版有问题,影响阅读。
噔噔噔恰,本文的重要头戏来啦!Last But Very Importment !!!
Eureka-Server 是允许同一时刻允许在任意节点被 Eureka-Client 发起写入相关的操作,网络是不可靠的资源,Eureka-Client 可能向一个 Eureka-Server 注册成功,但是网络波动,导致 Eureka-Client 误以为失败,此时恰好 Eureka-Client 变更了应用实例的状态,重试向另一个 Eureka-Server 注册,那么两个 Eureka-Server 对该应用实例的状态产生冲突。
再例如…… 我们不要继续举例子,网络波动真的很复杂。我们来看看 Eureka 是怎么处理的。
应用实例( InstanceInfo ) 的
lastDirtyTimestamp
属性,使用时间戳,表示应用实例的版本号,当请求方( 不仅仅是 Eureka-Client ,也可能是同步注册操作的 Eureka-Server ) 向 Eureka-Server 发起注册时,若 Eureka-Server 已存在拥有更大lastDirtyTimestamp
该实例( 相同应用并且相同应用实例编号被认为是相同实例 ),则请求方注册的应用实例( InstanceInfo ) 无法覆盖注册此 Eureka-Server 的该实例( 见AbstractInstanceRegistry#register(...)
方法 )。例如我们上面举的例子,第一个 Eureka-Server 向 第二个 Eureka-Server 同步注册应用实例时,不会注册覆盖,反倒是第二个 Eureka-Server 同步注册应用到第一个 Eureka-Server ,注册覆盖成功,因为lastDirtyTimestamp
( 应用实例状态变更时,可以设置lastDirtyTimestamp
为当前时间,见ApplicationInfoManager#setInstanceStatus(status)
方法 )。但是光靠注册请求判断
lastDirtyTimestamp
显然是不够的,因为网络异常情况下时,同步操作任务多次执行失败到达过期时间后,此时在 Eureka-Server 集群同步起到最终一致性最最最关键性出现了:Heartbeat 。因为 Heartbeat 会周期性的执行,通过它一方面可以判断 Eureka-Server 是否存在心跳对应的应用实例,另外一方面可以比较应用实例的lastDirtyTimestamp
。当满足下面任意条件,Eureka-Server 返回 404 状态码:- 1)Eureka-Server 应用实例不存在,点击 链接 查看触发条件代码位置。
- 2)Eureka-Server 应用实例状态为
UNKNOWN
,点击 链接 查看触发条件代码位置。为什么会是UNKNOWN
,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》「 4.3 续租场景」 有详细解析。 - 3)请求的
lastDirtyTimestamp
更大,点击 链接 查看触发条件代码位置。
请求方接收到 404 状态码返回后,认为 Eureka-Server 应用实例实际是不存在的,重新发起应用实例的注册。以本文的 Heartbeat 为例子,代码如下:
// PeerEurekaNode#heartbeat(...) 1: @Override 2: public void handleFailure(int statusCode, Object responseEntity) throws Throwable { // ... 省略代码,太长了。 17: }
-
Cancel :当 Eureka-Server 不存在下线的应用实例时,返回 404 状态码,此时打印错误日志,代码如下:
-
第 4 至 10 行 :接收到 404 状态码,调用 `#register(...)` 方法,向该被心跳同步操作失败的 Eureka-Server 发起注册**本地的应用实例**的请求。
-
- 上述 **3)** ,会使用请求参数 `overriddenStatus` 存储到 Eureka-Server 的应用实例覆盖状态集合( `AbstractInstanceRegistry.overriddenInstanceStatusMap` ),点击 链接 查看触发条件代码位置。
- 01. 调试环境搭建
02. 项目结构一览
- 配置 Configuration
04. 核心流程一览
- 05. 拓展机制 SPI
- 线程池
- 07. 服务暴露 Export
- 08. 服务引用 Refer
- 注册中心 Registry
- 动态编译 Compile
- 动态代理 Proxy
- 服务调用 Invoke
- 调用特性
- 过滤器 Filter
- NIO 服务器
- P2P 服务器
- HTTP 服务器
- 序列化 Serialization
- 集群容错 Cluster
- 优雅停机
- 日志适配
- 状态检查
- 监控中心 Monitor
- 管理中心 Admin
- 运维命令 QOS
- 链路追踪 Tracing
- ...
第 11 至 16 行 :恰好是 3) 反过来的情况,本地的应用实例的
lastDirtyTimestamp
小于 Eureka-Server 该应用实例的,此时 Eureka-Server 返回 409 状态码,点击 链接 查看触发条件代码位置。调用#syncInstancesIfTimestampDiffers()
方法,覆盖注册本地应用实例,点击 链接 查看方法。OK,撒花!记住:Eureka 通过 Heartbeat 实现 Eureka-Server 集群同步的最终一致性。
666. 彩蛋
写的比较嗨皮,所以就送胖友一只胖友
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?
以下是草稿,可以凑合看
eureka server 集群假定是 s1 s2
1)client 向 s1 注册,有一个 lastDirtyTime ,正常情况下成功, s1 会向 s2 同步
2)client 向 s1 注册(成功,但是网络波动),然后 client 发生状态的变化,lastDirtyTime 变化,向 s2 注册。
这个时候,s1 s2 是冲突的,但是他们会互相同步,实际 s2 = s1 的注册会真正成功,s1 = s2 的注册不会返回失败,但是实际 s2 处理的时候,用的是自身的。心跳只是最终去校验。
理论来说,心跳不应该带 lastDirtyTime 参数。带的原因就是为了做固定周期的比较。
最优解是 注册 就处理掉数据不一致
次优解是 心跳 处理掉数据不一致如果在类比,
注册,相当于 insertOrUpdate
心跳,附加了校验是否要发起【注册】如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。
目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:
一共 60 篇++
- 01. 调试环境搭建
Heartbeat :调用
PeerEurekaNode#heartbeat(…)
方法,点击 链接 查看实现。
StatusUpdate :调用
PeerEurekaNode#statusUpdate(…)
方法,点击 链接 查看实现。
上面的每个方法实现,我们都会看到类似这么一段代码 :
InstanceReplicationTask ,同步操作任务,在 「4.1.1 同步操作任务」 详细解析。
expiryTime
,任务过期时间。
4.1.1 同步操作任务
源码不易↓↓↓↓↓
点赞****支持老艿艿↓↓