本文主要基于 Eureka 1.8.X 版本
- 概述
- Eureka-Client 发起续租
- 2.1 初始化定时任务
- 2.2 HeartbeatThread
- 2.3 TimedSupervisorTask
- Eureka-Server 接收续租
- 3.1 接收续租请求
- 3.2 续租应用实例信息
- 彩蛋
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 续租应用实例的过程。
FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑
- 蓝框部分,为本文重点。
- 非蓝框部分,Eureka-Server 集群间复制注册的应用实例信息,不在本文内容范畴。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
推荐 Spring Cloud 视频:
- Java 微服务实践 - Spring Boot
- Java 微服务实践 - Spring Cloud
- Java 微服务实践 - Spring Boot / Spring Cloud
2. Eureka-Client 发起续租
Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 ( Lease )。
Eureka-Client 固定间隔向 Eureka-Server 发起续租( renew ),避免租约过期。
默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。
2.1 初始化定时任务
Eureka-Client 在初始化过程中,创建心跳线程,固定间隔向 Eureka-Server 发起续租( renew )。实现代码如下:
// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
ProviderBackupRegistry backupRegistryProvider) {
// ... 省略无关代码
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueueRunnable(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// ... 省略无关代码
// 【3.2.14】初始化定时任务
initScheduledTasks();
}
private void initScheduledTasks() {
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// ... 省略无关代码
}
// ... 省略无关代码
}
scheduler
,定时任务服务,用于定时触发心跳( 续租 )。细心如你,会发现任务提交的方式是ScheduledExecutorService#schedule(...)
方法,只延迟执行一次心跳,说好的固定频率执行心跳呢!!!答案在 「2.3 TimedSupervisorTask」 揭晓。heartbeatExecutor
,心跳任务执行线程池。为什么有scheduler
的情况下,还有heartbeatExecutor
???答案也在 「2.3 TimedSupervisorTask」 揭晓。- HeartbeatThread,心跳线程,在「2.2 TimedSupervisorTask」 详细解析。
2.2 HeartbeatThread
com.netflix.discovery.DiscoveryClient.HeartbeatThread
,心跳线程,实现执行 Eureka-Client 向 Eureka-Server 发起续租( renew )请求。实现代码如下:
// DiscoveryClient.java
/**
* 最后成功向 Eureka-Server 心跳时间戳
*/
private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
// DiscoveryClient.java
boolean renew() {
EurekaHttpResponseInstanceInfo httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
// 发起注册
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
-
- PUT 请求 Eureka-Server 的 `apps/${APP_NAME}/${INSTANCE_INFO_ID}` 接口,参数为 `status`、`lastDirtyTimestamp`、`overriddenstatus`,实现续租。
-
调用 `AbstractJerseyEurekaHttpClient#sendHeartBeat(...)` 方法,发起**续租请求**,实现代码如下:
// AbstractJerseyEurekaHttpClient.java @Override public EurekaHttpResponseInstanceInfo sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilderInstanceInfo eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
- 调用 `AbstractJerseyEurekaHttpClient#register(...)` 方法,当 Eureka-Server **不存在租约**时,重新发起注册,在《Eureka 源码解析 —— 应用实例注册发现 (一)之注册》有详细解析。
调用
AbstractJerseyEurekaHttpClient#sendHeartBeat(...)
方法,发起续租请求,实现代码如下:
// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponseInstanceInfo sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.put(ClientResponse.class);
EurekaHttpResponseBuilderInstanceInfo eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
if (response.hasEntity()) {
eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
}
return eurekaResponseBuilder.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
2.3 TimedSupervisorTask
com.netflix.discovery.TimedSupervisorTask
,监管定时任务的任务。
A supervisor task that schedules subtasks while enforce a timeout.
创建 TimedSupervisorTask 代码如下:
public class TimedSupervisorTask extends TimerTask {
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
/**
* 定时任务服务
*/
private final ScheduledExecutorService scheduler;
/**
* 执行子任务线程池
*/
private final ThreadPoolExecutor executor;
/**
* 子任务执行超时时间
*/
private final long timeoutMillis;
/**
* 子任务
*/
private final Runnable task;
/**
* 当前任子务执行频率
*/
private final AtomicLong delay;
/**
* 最大子任务执行频率
*
* 子任务执行超时情况下使用
*/
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counters and register.
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
}
scheduler
,定时任务服务,用于定时【发起】子任务。executor
,执行子任务线程池,用于【提交】子任务执行。task
,子任务。timeoutMillis
,子任务执行超时时间,单位:毫秒。delay
,当前子任务执行频率,单位:毫秒。值等于timeout
参数。maxDelay
,最大子任务执行频率,单位:毫秒。值等于timeout * expBackOffBound
参数。scheduler
初始化延迟执行 TimedSupervisorTask 。- TimedSupervisorTask 执行时,提交 `task` 到 `executor` 执行任务。
当
task
执行正常,TimedSupervisorTask 再次提交自己到scheduler
延迟timeoutMillis
执行。当
task
执行超时,重新计算延迟时间( 不允许超过maxDelay
),再次提交自己到scheduler
延迟执行。
实现代码如下:
// TimedSupervisorTask.java
1: @Override
2: public void run() {
3: Future? future = null;
4: try {
5: // 提交 任务
6: future = executor.submit(task);
7: //
8: threadPoolLevelGauge.set((long) executor.getActiveCount());
9: // 等待任务 执行完成 或 超时
10: future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
11: // 设置 下一次任务执行频率
12: delay.set(timeoutMillis);
13: //
14: threadPoolLevelGauge.set((long) executor.getActiveCount());
15: } catch (TimeoutException e) {
16: logger.error("task supervisor timed out", e);
17: timeoutCounter.increment(); //
18:
19: // 设置 下一次任务执行频率
20: long currentDelay = delay.get();
21: long newDelay = Math.min(maxDelay, currentDelay * 2);
22: delay.compareAndSet(currentDelay, newDelay);
23:
24: } catch (RejectedExecutionException e) {
25: if (executor.isShutdown() || scheduler.isShutdown()) {
26: logger.warn("task supervisor shutting down, reject the task", e);
27: } else {
28: logger.error("task supervisor rejected the task", e);
29: }
30:
31: rejectedCounter.increment(); //
32: } catch (Throwable e) {
33: if (executor.isShutdown() || scheduler.isShutdown()) {
34: logger.warn("task supervisor shutting down, can't accept the task");
35: } else {
36: logger.error("task supervisor threw an exception", e);
37: }
38:
39: throwableCounter.increment(); //
40: } finally {
41: // 取消 未完成的任务
42: if (future != null) {
43: future.cancel(true);
44: }
45:
46: // 调度 下次任务
47: if (!scheduler.isShutdown()) {
48: scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
49: }
50: }
51: }
- 第 5 至 6 行 :提交子任务
task
到执行子任务线程池executor
。 - 第 9 至 10 行 :等待子任务
task
执行完成或执行超时。 - 第 11 至 12 行 :子任务
task
执行完成,设置下一次执行延迟delay
。 - 第 19 至 22 行 :子任务
task
执行超时,重新计算下一次执行延迟delay
。计算公式为Math.min(maxDelay, currentDelay * 2)
。如果多次超时,超时时间不断乘以 2 ,不允许超过最大延迟时间(maxDelay
)。 - 第 41 至 44 行 :强制取消未完成的子任务。
- 第 46 至 49 行 :调度下一次 TimedSupervisorTask 。
3. Eureka-Server 接收续租
3.1 接收续租请求
com.netflix.eureka.resources.InstanceResource
,处理单个应用实例信息的请求操作的 Resource ( Controller )。
续租应用实例信息的请求,映射
InstanceResource#renewLease()
方法,实现代码如下:
1: @PUT
2: public Response renewLease(
3: @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
4: @QueryParam("overriddenstatus") String overriddenStatus,
5: @QueryParam("status") String status,
6: @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
7: boolean isFromReplicaNode = "true".equals(isReplication);
8: // 续租
9: boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
10:
11: // 续租失败
12: // Not found in the registry, immediately ask for a register
13: if (!isSuccess) {
14: logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
15: return Response.status(Status.NOT_FOUND).build();
16: }
17:
18: // 比较 InstanceInfo 的 lastDirtyTimestamp 属性
19: // Check if we need to sync based on dirty time stamp, the client
20: // instance might have changed some value
21: Response response = null;
22: if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
23: response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
24: // Store the overridden status since the validation found out the node that replicates wins
25: if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
26: && (overriddenStatus != null)
27: && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
28: && isFromReplicaNode) {
29: registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
30: }
31: } else { // 成功
32: response = Response.ok().build();
33: }
34: logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
35: return response;
36: }
// PeerAwareInstanceRegistryImpl.java
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) { // 续租
// Eureka-Server 复制
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
-
- 调用父类 `AbstractInstanceRegistry#renew(...)` 方法,注册应用实例信息。
- 第 7 至 11 行 :请求的
lastDirtyTimestamp
较大,意味着请求方( 可能是 Eureka-Client ,也可能是 Eureka-Server 集群内的其他 Server )存在 InstanceInfo 和 Eureka-Server 的 InstanceInfo 的数据不一致,返回 404 响应。请求方收到 404 响应后重新发起注册。 - 第 16 至 21 行 :《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
- 第 22 至 24 行 :Server 的
lastDirtyTimestamp
较大,并且请求方为 Eureka-Client,续租成功,返回 200 成功响应。 - 第 29 行 :
lastDirtyTimestamp
一致,返回 200 成功响应。 -
第 23 行 :调用 `#validateDirtyTimestamp(...)` 方法,比较 `lastDirtyTimestamp` 的差异。实现代码如下:
// InstanceResource.java 1: private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) { 2: // 获取 InstanceInfo 3: InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false); 4: if (appInfo != null) { 5: if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) { 6: Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication}; 7: // 请求 的 较大 8: if (lastDirtyTimestamp appInfo.getLastDirtyTimestamp()) { 9: logger.debug("Time to sync, since the last dirty timestamp differs -" 10: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args); 11: return Response.status(Status.NOT_FOUND).build(); 12: // Server 的 较大 13: } else if (appInfo.getLastDirtyTimestamp() lastDirtyTimestamp) { 14: // In the case of replication, send the current instance info in the registry for the 15: // replicating node to sync itself with this one. 16: if (isReplication) { 17: logger.debug( 18: "Time to sync, since the last dirty timestamp differs -" 19: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", 20: args); 21: return Response.status(Status.CONFLICT).entity(appInfo).build(); 22: } else { 23: return Response.ok().build(); 24: } 25: } 26: } 27: 28: } 29: return Response.ok().build(); 30: }
第 11 至 16 行 :续租失败,返回 404 响应。当 Eureka-Client 收到 404 响应后,会重新发起 InstanceInfo 的注册。
第 18 至 30 行 :比较请求的
lastDirtyTimestamp
和 Server 的 InstanceInfo 的
lastDirtyTimestamp
属性差异,需要配置
eureka.syncWhenTimestampDiffers = true
( 默认开启 )。
第 24 至 30 行 :《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
第 31 至 33 行 :续租成功,返回 200 成功响应。
3.2 续租应用实例信息
调用
AbstractInstanceRegistry#renew(...)
方法,续租应用实例信息,实现代码如下:
1: public boolean renew(String appName, String id, boolean isReplication) {
2: // 增加 续租次数 到 监控
3: RENEW.increment(isReplication);
4: // 获得 租约
5: MapString, LeaseInstanceInfo gMap = registry.get(appName);
6: LeaseInstanceInfo leaseToRenew = null;
7: if (gMap != null) {
8: leaseToRenew = gMap.get(id);
9: }
10: // 租约不存在
11: if (leaseToRenew == null) {
12: RENEW_NOT_FOUND.increment(isReplication);
13: logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
14: return false;
15: } else {
16: InstanceInfo instanceInfo = leaseToRenew.getHolder();
17: if (instanceInfo != null) {
18: // touchASGCache(instanceInfo.getASGName());
19: // override status
20: InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
21: instanceInfo, leaseToRenew, isReplication);
22: if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
23: logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
24: + "; re-register required", instanceInfo.getId());
25: RENEW_NOT_FOUND.increment(isReplication);
26: return false;
27: }
28: if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
29: Object[] args = {
30: instanceInfo.getStatus().name(),
31: instanceInfo.getOverriddenStatus().name(),
32: instanceInfo.getId()
33: };
34: logger.info(
35: "The instance status {} is different from overridden instance status {} for instance {}. "
36: + "Hence setting the status to overridden status", args);
37: instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
38: }
39: }
40: // 新增 续租每分钟次数
41: renewsLastMin.increment();
42: // 设置 租约最后更新时间(续租)
43: leaseToRenew.renew();
44: return true;
45: }
46: }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | // AbstractInstanceRegistry.java /** * 续租每分钟次数 */ private final MeasuredRate renewsLastMin; // MeasuredRate.java public class MeasuredRate { /** * 上一个间隔次数 */ private final AtomicLong lastBucket = new AtomicLong(0); /** * 当前间隔次数 */ private final AtomicLong currentBucket = new AtomicLong(0); /** * 间隔 */ private final long sampleInterval; /** * 定时器 */ private final Timer timer; private volatile boolean isActive; public MeasuredRate(long sampleInterval) { this.sampleInterval = sampleInterval; this.timer = new Timer("Eureka-MeasureRateTimer", true); this.isActive = false; } public synchronized void start() { if (!isActive) { timer.schedule(new TimerTask() { @Override public void run() { try { // Zero out the current bucket. lastBucket.set(currentBucket.getAndSet(0)); } catch (Throwable e) { logger.error("Cannot reset the Measured Rate", e); } } }, sampleInterval, sampleInterval); isActive = true; } } public synchronized void stop() { if (isActive) { timer.cancel(); isActive = false; } } /** * Returns the count in the last sample interval. */ public long getCount() { return lastBucket.get(); } /** * Increments the count in the current sample interval. */ public void increment() { currentBucket.incrementAndGet(); } } |
// MeasuredRate.java
public class MeasuredRate {
/**
* 上一个间隔次数
/
private final AtomicLong lastBucket = new AtomicLong(0);
/*
* 当前间隔次数
/
private final AtomicLong currentBucket = new AtomicLong(0);
/*
* 间隔
/
private final long sampleInterval;
/*
* 定时器
*/
private final Timer timer;
private volatile boolean isActive;
public MeasuredRate(long sampleInterval) {
this.sampleInterval = sampleInterval;
this.timer = new Timer(“Eureka-MeasureRateTimer”, true);
this.isActive = false;
}
public synchronized void start() {
if (!isActive) {
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket.
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error(“Cannot reset the Measured Rate”, e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
public synchronized void stop() {
if (isActive) {
timer.cancel();
isActive = false;
}
}
/**
- Returns the count in the last sample interval.
*/
public long getCount() {
return lastBucket.get();
}
/**
- Increments the count in the current sample interval.
*/
public void increment() {
currentBucket.incrementAndGet();
}
}
-
- 配合 Netflix Servo 实现监控信息采集**续租每分钟次数**。
- Eureka-Server 运维界面的显示**续租每分钟次数**。
- 自我保护机制,在 《Eureka 源码解析 —— 应用实例注册发现 (四)之自我保护机制》 详细解析。
- `timer` ,定时器,负责每个 `sampleInterval` 间隔重置**当前次数**( `currentBucket` ),并将**原当前次数**设置到**上一个次数**( `lastBucket` )。
- `#increment()` 方法,返回**当前次数**( `currentBucket` )。
- `#getCount()` 方法,返回**上一个次数**( `lastBucket` )。
- `renewsLastMin` 有如下用途:
- 续租每分钟次数
- Returns the count in the last sample interval.
- Increments the count in the current sample interval.
第 4 至 9 行 :获得租约( Lease )。
第 19 至 21 行 :获得应用实例的最终状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。
第 28 至 37 行 :应用实例的状态与最终状态不相等,使用最终状态覆盖应用实例的状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。
// AbstractInstanceRegistry.java
/**
*/
private final MeasuredRate renewsLastMin;
// MeasuredRate.java
public class MeasuredRate {
/**
* 上一个间隔次数
/
private final AtomicLong lastBucket = new AtomicLong(0);
/*
* 当前间隔次数
/
private final AtomicLong currentBucket = new AtomicLong(0);
/*
* 间隔
/
private final long sampleInterval;
/*
* 定时器
*/
private final Timer timer;
private volatile boolean isActive;
public MeasuredRate(long sampleInterval) {
this.sampleInterval = sampleInterval;
this.timer = new Timer(“Eureka-MeasureRateTimer”, true);
this.isActive = false;
}
public synchronized void start() {
if (!isActive) {
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket.
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error(“Cannot reset the Measured Rate”, e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
public synchronized void stop() {
if (isActive) {
timer.cancel();
isActive = false;
}
}
/**
*/
public long getCount() {
return lastBucket.get();
}
/**
*/
public void increment() {
currentBucket.incrementAndGet();
}
}
第 42 至 43 行 :调用
Lease#renew()
方法,设置租约最后更新时间( 续租 ),实现代码如下:
123
public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration;}
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
第 44 行 :返回续租成功(
true
)。
整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此像注册操作一样加锁。
666. 彩蛋
效率比想象的低一些,加油继续更新下一篇。
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?