摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Eureka 1.8.X 版本
- 概述
- 2. Eureka-Client 发起全量获取
2.1 初始化全量获取
2.2 定时获取
2.3 刷新注册信息缓存
2.4 发起获取注册信息
3.1 接收全量获取请求
3.2 响应缓存 ResponseCache
3.3 缓存读取
3.4 主动过期读写缓存
3.5 被动过期读写缓存
3.6 定时刷新只读缓存
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程。
FROM 《深度剖析服务发现组件Netflix Eureka》
Eureka-Client 获取注册信息,分成全量获取和增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30 秒增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。
本文重点在于全量获取。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
推荐 Spring Cloud 视频:
- Java 微服务实践 - Spring Boot
- Java 微服务实践 - Spring Cloud
- Java 微服务实践 - Spring Boot / Spring Cloud
2. Eureka-Client 发起全量获取
本小节调用关系如下:
2.1 初始化全量获取
Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,首先代码如下:
// DiscoveryClient.java
/**
* Applications 在本地的缓存
*/
private final AtomicReferenceApplications localRegionApps = new AtomicReferenceApplications();
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
ProviderBackupRegistry backupRegistryProvider) {
// ... 省略无关代码
// 【3.2.5】初始化应用集合在本地的缓存
localRegionApps.set(new Applications());
// ... 省略无关代码
// 【3.2.12】从 Eureka-Server 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// ... 省略无关代码
}
配置
eureka.shouldFetchRegistry = true
,开启从 Eureka-Server 获取注册信息。默认值:
true
。
2.2 定时获取
Eureka-Client 在初始化过程中,创建获取注册信息线程,固定间隔向 Eureka-Server 发起获取注册信息( fetch ),刷新本地注册信息缓存。实现代码如下:
// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
ProviderBackupRegistry backupRegistryProvider) {
// ... 省略无关代码
// 【3.2.9】初始化线程池
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueueRunnable(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// ... 省略无关代码
// 【3.2.14】初始化定时任务
initScheduledTasks();
// ... 省略无关代码
}
private void initScheduledTasks() {
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// ... 省略无关代码
}
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
-
- 调用 `#refreshRegistry(false)` 方法,刷新注册信息缓存,在 「2.3 刷新注册信息缓存」 详细解析。
com.netflix.discovery.DiscoveryClient.CacheRefreshThread
,注册信息缓存刷新任务,实现代码如下:
2.3 刷新注册信息缓存
调用
#refreshRegistry(false)
方法,刷新注册信息缓存,实现代码如下:
// DiscoveryClient.java
1: void refreshRegistry() {
2: try {
3: // TODO 芋艿:TODO[0009]:RemoteRegionRegistry
4: boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
5:
6: boolean remoteRegionsModified = false;
7: // This makes sure that a dynamic change to remote regions to fetch is honored.
8: String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
9: if (null != latestRemoteRegions) {
10: String currentRemoteRegions = remoteRegionsToFetch.get();
11: if (!latestRemoteRegions.equals(currentRemoteRegions)) {
12: // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
13: synchronized (instanceRegionChecker.getAzToRegionMapper()) {
14: if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
15: String[] remoteRegions = latestRemoteRegions.split(",");
16: remoteRegionsRef.set(remoteRegions);
17: instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
18: remoteRegionsModified = true;
19: } else {
20: logger.info("Remote regions to fetch modified concurrently," +
21: " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
22: }
23: }
24: } else {
25: // Just refresh mapping to reflect any DNS/Property change
26: instanceRegionChecker.getAzToRegionMapper().refreshMapping();
27: }
28: }
29:
30: boolean success = fetchRegistry(remoteRegionsModified);
31: if (success) {
32: // 设置 注册信息的应用实例数
33: registrySize = localRegionApps.get().size();
34: // 设置 最后获取注册信息时间
35: lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
36: }
37:
38: // 打印日志
39: if (logger.isDebugEnabled()) {
40: StringBuilder allAppsHashCodes = new StringBuilder();
41: allAppsHashCodes.append("Local region apps hashcode: ");
42: allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
43: allAppsHashCodes.append(", is fetching remote regions? ");
44: allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
45: for (Map.EntryString, Applications entry : remoteRegionVsApps.entrySet()) {
46: allAppsHashCodes.append(", Remote region: ");
47: allAppsHashCodes.append(entry.getKey());
48: allAppsHashCodes.append(" , apps hashcode: ");
49: allAppsHashCodes.append(entry.getValue().getAppsHashCode());
50: }
51: logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
52: allAppsHashCodes.toString());
53: }
54: } catch (Throwable e) {
55: logger.error("Cannot fetch registry from server", e);
56: }
57: }
/**
* 注册信息的应用实例数
*/
private volatile int registrySize = 0;
/**
* 最后成功从 Eureka-Server 拉取注册信息时间戳
*/
private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
第 30 行 :调用
#fetchRegistry(false)
方法,从 Eureka-Server 获取注册信息,在 「2.4 发起获取注册信息」 详细解析。
第 38 至 53 行 :打印调试日志。
2.4 发起获取注册信息
调用
#fetchRegistry(false)
方法,从 Eureka-Server 获取注册信息( 根据条件判断,可能是全量,也可能是增量 ),实现代码如下:
1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {
2: Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
3:
4: try {
5: // 获取 本地缓存的注册的应用实例集合
6: // If the delta is disabled or if it is the first time, get all
7: // applications
8: Applications applications = getApplications();
9:
10: // 全量获取
11: if (clientConfig.shouldDisableDelta() // 禁用增量获取
12: || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
13: || forceFullRegistryFetch
14: || (applications == null) // 空
15: || (applications.getRegisteredApplications().size() == 0) // 空
16: || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
17: {
18: logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
19: logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
20: logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
21: logger.info("Application is null : {}", (applications == null));
22: logger.info("Registered Applications size is zero : {}",
23: (applications.getRegisteredApplications().size() == 0));
24: logger.info("Application version is -1: {}", (applications.getVersion() == -1));
25: // 执行 全量获取
26: getAndStoreFullRegistry();
27: } else {
28: // 执行 增量获取
29: getAndUpdateDelta(applications);
30: }
31: // 设置 应用集合 hashcode
32: applications.setAppsHashCode(applications.getReconcileHashCode());
33: // 打印 本地缓存的注册的应用实例数量
34: logTotalInstances();
35: } catch (Throwable e) {
36: logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
37: return false;
38: } finally {
39: if (tracer != null) {
40: tracer.stop();
41: }
42: }
43:
44: // Notify about cache refresh before updating the instance remote status
45: onCacheRefreshed();
46:
47: // Update remote status based on refreshed data held in the cache
48: updateInstanceRemoteStatus();
49:
50: // registry was fetched successfully, so return true
51: return true;
52: }
public Applications getApplications() {
return localRegionApps.get();
}
-
- 第 11 行 :配置 `eureka.disableDelta = true` ,禁用**增量**获取注册信息。默认值:`false` 。
- 第 12 行 :只获得一个 `vipAddress` 对应的应用实例们的注册信息。
- 第 13 行 :方法参数 `forceFullRegistryFetch` 强制**全量**获取注册信息。
- 第 14 至 15 行 :本地缓存为空。
- 第 25 至 26 行 :调用 `#getAndStoreFullRegistry()` 方法,**全量**获取注册信息,并设置到本地缓存。下文详细解析。
第 10 至 26 行 :全量获取注册信息。
第 27 至 30 行 :增量获取注册信息,并刷新本地缓存,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
第 31 至 32 行 :计算应用集合
hashcode
。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
第 33 至 34 行 :打印调试日志,输出本地缓存的注册的应用实例数量。实现代码如下:
private void logTotalInstances() {
if (logger.isDebugEnabled()) {
int totInstances = 0;
for (Application application : getApplications().getRegisteredApplications()) {
totInstances += application.getInstancesAsIsFromEureka().size();
}
logger.debug("The total number of all instances in the client now is {}", totInstances);
}
}
第 44 至 45 行 :触发 CacheRefreshedEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。
- x
-
`#onCacheRefreshed()` 方法,实现代码如下:
/**
- Eureka 事件监听器
*/
private final CopyOnWriteArraySetEurekaEventListener eventListeners = new CopyOnWriteArraySet();
protected void onCacheRefreshed() {
fireEvent(new CacheRefreshedEvent());
}
protected void fireEvent(final EurekaEvent event) {
for (EurekaEventListener listener : eventListeners) {
listener.onEvent(event);
}
}
// 【3.2.12】从 Eureka-Server 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
笔者的YY :你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到持久化最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下:
// 【3.2.12】从 Eureka-Server 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
第47 至 48 行 :更新本地缓存的当前应用实例在 Eureka-Server 的状态。
1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
2:
3: private synchronized void updateInstanceRemoteStatus() {
4: // Determine this instance's status for this app and set to UNKNOWN if not found
5: InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
6: if (instanceInfo.getAppName() != null) {
7: Application app = getApplication(instanceInfo.getAppName());
8: if (app != null) {
9: InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
10: if (remoteInstanceInfo != null) {
11: currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
12: }
13: }
14: }
15: if (currentRemoteInstanceStatus == null) {
16: currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
17: }
18:
19: // Notify if status changed
20: if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
21: onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
22: lastRemoteInstanceStatus = currentRemoteInstanceStatus;
23: }
24: }
- Eureka-Client 本地应用实例与 Eureka-Server 的该应用实例状态不同的原因,因为应用实例的覆盖状态,在 《Eureka 源码解析 —— 应用实例注册发现 (八)之覆盖状态》 有详细解析。
- 第 4 至 14 行 :从注册信息中获取当前应用在 Eureka-Server 的状态。
-
第 19 至 23 行 :对比**本地缓存**和**最新的**的当前应用实例在 Eureka-Server 的状态,若不同,更新**本地缓存**( **注意,只更新该缓存变量,不更新本地当前应用实例的状态( `instanceInfo.status` )** ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。`#onRemoteStatusChanged(...)` 实现代码如下:
protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) { fireEvent(new StatusChangeEvent(oldStatus, newStatus)); }
第 19 至 23 行 :对比本地缓存和最新的的当前应用实例在 Eureka-Server 的状态,若不同,更新本地缓存( 注意,只更新该缓存变量,不更新本地当前应用实例的状态(
instanceInfo.status
) ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。
#onRemoteStatusChanged(...)
实现代码如下:
2.4.1 全量获取注册信息,并设置到本地缓存
调用
#getAndStoreFullRegistry()
方法,全量获取注册信息,并设置到本地缓存。下实现代码如下:
1: private void getAndStoreFullRegistry() throws Throwable {
2: long currentUpdateGeneration = fetchRegistryGeneration.get();
3:
4: logger.info("Getting all instance registry info from the eureka server");
5:
6: // 全量获取注册信息
7: Applications apps = null;
8: EurekaHttpResponseApplications httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
9: ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
10: : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
11: if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
12: apps = httpResponse.getEntity();
13: }
14: logger.info("The response status is {}", httpResponse.getStatusCode());
15:
16: // 设置到本地缓存
17: if (apps == null) {
18: logger.error("The application is null for some reason. Not storing this information");
19: } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
20: localRegionApps.set(this.filterAndShuffle(apps));
21: logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
22: } else {
23: logger.warn("Not updating applications as another thread is updating it already");
24: }
25: }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | // AbstractJerseyEurekaHttpClient.java @Override public EurekaHttpResponseApplications getApplications(String... regions) { return getApplicationsInternal("apps/", regions); } private EurekaHttpResponseApplications getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue == null ? "" : "regions=" + regionsParamValue, response == null ? "N/A" : response.getStatus() ); } if (response != null) { response.close(); } } } |
private EurekaHttpResponseApplications getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam(“regions”, regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug(“Jersey HTTP GET {}/{}?{}; statusCode={}”,
serviceUrl, urlPath,
regionsParamValue == null ? “” : “regions=” + regionsParamValue,
response == null ? “N/A” : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
-
- 调用 `AbstractJerseyEurekaHttpClient#getApplications(...)` 方法,GET 请求 Eureka-Server 的 `apps/` 接口,参数为 `regions` ,返回格式为 JSON ,实现**全量获取注册信息**。
com.netflix.eureka.resources.ApplicationsResource</code>,处理**所有**应用的请求操作的 Resource ( Controller )。</p>
接收全量获取请求,映射 `ApplicationsResource#getContainers()` 方法,实现代码如下:
<pre style="margin-top: 0px; margin-bottom: 0px; padding: 0px; font-size: 15px; color: #3e3e3e; line-height: inherit; letter-spacing: 2px; word-spacing: 2px; background-color: #ffffff;"><code class="Java language-Java hljs" style="margin-right: 2px; margin-left: 2px; padding: 0.5em; font-size: 14px; color: #a9b7c6; line-height: 18px; border-radius: 0px; background: #282b2e; font-family: Consolas, Inconsolata, Courier, monospace; display: block; overflow-x: auto; word-spacing: 0px; letter-spacing: 0px; word-wrap: normal !important; overflow-y: auto !important;"> 1: @GET
2: public Response getContainers(@PathParam("version") String version,
3: @HeaderParam(HEADER_ACCEPT) String acceptHeader,
4: @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
5: @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
6: @Context UriInfo uriInfo,
7: @Nullable @QueryParam("regions") String regionsStr) {
8: // TODO[0009]:RemoteRegionRegistry
9: boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
10: String[] regions = null;
11: if (!isRemoteRegionRequested) {
12: EurekaMonitors.GET_ALL.increment();
13: } else {
14: regions = regionsStr.toLowerCase().split(",");
15: Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
16: EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
17: }
18:
19: // 判断是否可以访问
20: // Check if the server allows the access to the registry. The server can
21: // restrict access if it is not
22: // ready to serve traffic depending on various reasons.
23: if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
24: return Response.status(Status.FORBIDDEN).build();
25: }
26:
27: // API 版本
28: CurrentRequestVersion.set(Version.toEnum(version));
29:
30: // 返回数据格式
31: KeyType keyType = Key.KeyType.JSON;
32: String returnMediaType = MediaType.APPLICATION_JSON;
33: if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
34: keyType = Key.KeyType.XML;
35: returnMediaType = MediaType.APPLICATION_XML;
36: }
37:
38: // 响应缓存键( KEY )
39: Key cacheKey = new Key(Key.EntityType.Application,
40: ResponseCacheImpl.ALL_APPS,
41: keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
42: );
43:
44: //
45: Response response;
46: if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
47: response = Response.ok(responseCache.getGZIP(cacheKey))
48: .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
49: .header(HEADER_CONTENT_TYPE, returnMediaType)
50: .build();
51: } else {
52: response = Response.ok(responseCache.get(cacheKey))
53: .build();
54: }
55: return response;
56: }
1 2 3 4 5 6 7 8 9 10 11 12 | public enum Version { V1, V2; public static Version toEnum(String v) { for (Version version : Version.values()) { if (version.name().equalsIgnoreCase(v)) { return version; } } //Defaults to v2 return V2; } } |
第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。
com.netflix.eureka.registry.ResponseCache</code>,响应缓存**接口**,接口代码如下:</p>
<pre style="margin-top: 0px; margin-bottom: 0px; padding: 0px; font-size: 15px; color: #3e3e3e; line-height: inherit; letter-spacing: 2px; word-spacing: 2px; background-color: #ffffff;"><code class="Java language-Java hljs" style="margin-right: 2px; margin-left: 2px; padding: 0.5em; font-size: 14px; color: #a9b7c6; line-height: 18px; border-radius: 0px; background: #282b2e; font-family: Consolas, Inconsolata, Courier, monospace; display: block; overflow-x: auto; word-spacing: 0px; letter-spacing: 0px; word-wrap: normal !important; overflow-y: auto !important;">public interface ResponseCache {
String get(Key key);
byte[] getGZIP(Key key);
void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
AtomicLong getVersionDelta();
AtomicLong getVersionDeltaWithRegions();
}
// Applications.java
@Deprecated
public void setVersion(Long version) {
this.versionDelta = version;
}
// AbstractInstanceRegistry.java
public Applications getApplicationDeltas() {
// … 省略其它无关代码
apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方
// … 省略其它无关代码
}
#get()
:获得缓存。
#invalidate()
:过期缓存。
3.2.1 缓存键
com.netflix.eureka.registry.Key
,缓存键。实现代码如下:
public class Key {
public enum KeyType {
JSON, XML
}
/**
* An enum to define the entity that is stored in this cache for this key.
*/
public enum EntityType {
Application, VIP, SVIP
}
/**
* 实体名
*/
private final String entityName;
/**
* TODO[0009]:RemoteRegionRegistry
*/
private final String[] regions;
/**
* 请求参数类型
*/
private final KeyType requestType;
/**
* 请求 API 版本号
*/
private final Version requestVersion;
/**
* hashKey
*/
private final String hashKey;
/**
* 实体类型
*
* {@link EntityType}
*/
private final EntityType entityType;
/**
* {@link EurekaAccept}
*/
private final EurekaAccept eurekaAccept;
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();
}
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();
}
@Override
public int hashCode() {
String hashKey = getHashKey();
return hashKey.hashCode();
}
@Override
public boolean equals(Object other) {
if (other instanceof Key) {
return getHashKey().equals(((Key) other).getHashKey());
} else {
return false;
}
}
}
3.2.2 响应缓存实现类
com.netflix.eureka.registry.ResponseCacheImpl
,响应缓存实现类。
在 ResponseCacheImpl 里,将缓存拆分成两层 :
- 只读缓存(
readOnlyCacheMap
) - 固定过期 + 固定大小的读写缓存(
readWriteCacheMap
)
默认配置下,缓存读取策略如下:
缓存过期策略如下:
- 应用实例注册、下线、过期时,只只只过期
readWriteCacheMap
。 readWriteCacheMap
写入一段时间( 可配置 )后自动过期。- 定时任务对比
readWriteCacheMap
和readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了readOnlyCacheMap
的定时过期。
注意:应用实例注册、下线、过期时,不会很快刷新到
readWriteCacheMap
缓存里。默认配置下,最大延迟在 30 秒。
为什么可以使用缓存?
在 CAP 的选择上,Eureka 选择了 AP ,不同于 Zookeeper 选择了 CP 。
推荐阅读:
- 《为什么不应该使用ZooKeeper做服务发现》
- 《Spring Cloud Netflix Eureka源码导读与原理分析》「4. 作为服务注册中心,Eureka比Zookeeper好在哪里」
3.3 缓存读取
调用
ResponseCacheImpl#get(...)
方法(
#getGzip(...)
类似 ),读取缓存,实现代码如下:
1: private final ConcurrentMapKey, Value readOnlyCacheMap = new ConcurrentHashMapKey, Value();
2:
3: private final LoadingCacheKey, Value readWriteCacheMap;
4:
5: public String get(final Key key) {
6: return get(key, shouldUseReadOnlyResponseCache);
7: }
8:
9: String get(final Key key, boolean useReadOnlyCache) {
10: Value payload = getValue(key, useReadOnlyCache);
11: if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
12: return null;
13: } else {
14: return payload.getPayload();
15: }
16: }
17:
18: Value getValue(final Key key, boolean useReadOnlyCache) {
19: Value payload = null;
20: try {
21: if (useReadOnlyCache) {
22: final Value currentPayload = readOnlyCacheMap.get(key);
23: if (currentPayload != null) {
24: payload = currentPayload;
25: } else {
26: payload = readWriteCacheMap.get(key);
27: readOnlyCacheMap.put(key, payload);
28: }
29: } else {
30: payload = readWriteCacheMap.get(key);
31: }
32: } catch (Throwable t) {
33: logger.error("Cannot get value for key :" + key, t);
34: }
35: return payload;
36: }
public class Value {
/**
* 原始值
/
private final String payload;
/*
* GZIP 压缩后的值
*/
private byte[] gzipped;
public Value(String payload) {
this.payload = payload;
if (!EMPTY_PAYLOAD.equals(payload)) {
// … 省略 GZIP 压缩代码
gzipped = bos.toByteArray();
} else {
gzipped = null;
}
}
public String getPayload() {
return payload;
}
public byte[] getGzipped() {
return gzipped;
}
}
-
- `readWriteCacheMap` 最大缓存数量为 1000 。
- 调用 `#generatePayload(key)` 方法,生成缓存值。
- 第 21 至 28 行 :先读取 `readOnlyCacheMap` 。读取不到,读取 `readWriteCacheMap` ,并设置到 `readOnlyCacheMap` 。
- 第 29 至 31 行 :读取 `readWriteCacheMap` 。
-
`readWriteCacheMap` 实现代码如下:
this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListenerKey, Value() { @Override public void onRemoval(RemovalNotificationKey, Value notification) { // TODO[0009]:RemoteRegionRegistry Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoaderKey, Value() { @Override public Value load(Key key) throws Exception { // // TODO[0009]:RemoteRegionRegistry if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } });
第 9 至 16 行 :调用
getValue(key, useReadOnlyCache)
方法,读取缓存。从
readOnlyCacheMap
和
readWriteCacheMap
变量可以看到缓存值的类为
com.netflix.eureka.registry.ResponseCacheImpl.Value
,实现代码如下:
第 21 至 28 行 :先读取
readOnlyCacheMap
。读取不到,读取
readWriteCacheMap
,并设置到
readOnlyCacheMap
。
readWriteCacheMap
实现代码如下:
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListenerKey, Value() {
@Override
public void onRemoval(RemovalNotificationKey, Value notification) {
// TODO[0009]:RemoteRegionRegistry
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoaderKey, Value() {
@Override
public Value load(Key key) throws Exception {
// // TODO[0009]:RemoteRegionRegistry
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
#generatePayload(key)
方法,实现代码如下:
1: private Value generatePayload(Key key) {
2: Stopwatch tracer = null;
3: try {
4: String payload;
5: switch (key.getEntityType()) {
6: case Application:
7: boolean isRemoteRegionRequested = key.hasRegions();
8:
9: if (ALL_APPS.equals(key.getName())) {
10: if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
11: tracer = serializeAllAppsWithRemoteRegionTimer.start();
12: payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
13: } else {
14: tracer = serializeAllAppsTimer.start();
15: payload = getPayLoad(key, registry.getApplications());
16: }
17: } else if (ALL_APPS_DELTA.equals(key.getName())) {
18: // ... 省略增量获取相关的代码
19: } else {
20: tracer = serializeOneApptimer.start();
21: payload = getPayLoad(key, registry.getApplication(key.getName()));
22: }
23: break;
24: // ... 省略部分代码
25: }
26: return new Value(payload);
27: } finally {
28: if (tracer != null) {
29: tracer.stop();
30: }
31: }
32: }
- 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
- 第 13 至 16 行 :调用
AbstractInstanceRegistry#getApplications()
方法,获得注册的应用集合。后调用#getPayLoad()
方法,将注册的应用集合转换成缓存值。🙂 这两个方法代码较多,下面详细解析。 - 第 17 至 18 行 :获取增量注册信息的缓存值,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
3.3.1 获得注册的应用集合
调用
AbstractInstanceRegistry#getApplications()
方法,获得注册的应用集合,实现代码如下:
// ... 省略代码,超过微信文章长度
`// ... 省略代码,超过微信文章长度`
-
- 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry
- 第 11 至 29 行 :获得获得注册的应用集合。
- 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry
- 第 61 行 :计算应用集合 `hashcode` 。该变量用于校验**增量**获取的注册信息和 Eureka-Server **全量**的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
第 9 至 16 行 :调用
#getApplicationsFromMultipleRegions(...)
方法,获得注册的应用集合,实现代码如下:
3.3.2 转换成缓存值
调用
#getPayLoad()
方法,将注册的应用集合转换成缓存值,实现代码如下:
// ... 省略代码,超过微信文章长度
3.4 主动过期读写缓存
应用实例注册、下线、过期时,调用
ResponseCacheImpl#invalidate()
方法,主动过期读写缓存(
readWriteCacheMap
),实现代码如下:
// ... 省略代码,超过微信文章长度
`// ... 省略代码,超过微信文章长度`
3.5 被动过期读写缓存
读写缓存(
readWriteCacheMap
) 写入后,一段时间自动过期,实现代码如下:
expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
- 配置
eureka.responseCacheAutoExpirationInSeconds
,设置写入过期时长。默认值 :180 秒。
3.6 定时刷新只读缓存
定时任务对比
readWriteCacheMap
和
readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了
readOnlyCacheMap
的定时过期。实现代码如下:
// ... 省略代码,超过微信文章长度
- 第 7 至 12 行 :初始化定时任务。配置
eureka.responseCacheUpdateIntervalMs
,设置任务执行频率,默认值 :30 * 1000 毫秒。 - 第 17 至 39 行 :创建定时任务。
- 第 22 行 :循环
readOnlyCacheMap
的缓存键。为什么不循环readWriteCacheMap
呢?readOnlyCacheMap
的缓存过期依赖readWriteCacheMap
,因此缓存键会更多。 - 第 28 行 至 33 行 :对比
readWriteCacheMap
和readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了readOnlyCacheMap
的定时过期。
666. 彩蛋
比预期,比想想,长老多老多的一篇文章。细思极恐。
估计下一篇增量获取会简洁很多。
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?