注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本

    1. 概述
    2. 2. Eureka-Client 发起全量获取
  • 2.1 初始化全量获取

  • 2.2 定时获取

  • 2.3 刷新注册信息缓存

  • 2.4 发起获取注册信息

  • 3.1 接收全量获取请求

  • 3.2 响应缓存 ResponseCache

  • 3.3 缓存读取

  • 3.4 主动过期读写缓存

  • 3.5 被动过期读写缓存

  • 3.6 定时刷新只读缓存

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程

FROM 《深度剖析服务发现组件Netflix Eureka》

注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

Eureka-Client 获取注册信息,分成全量获取增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30 秒增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。

本文重点在于全量获取

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot
  • Java 微服务实践 - Spring Cloud
  • Java 微服务实践 - Spring Boot / Spring Cloud

2. Eureka-Client 发起全量获取

本小节调用关系如下:

2.1 初始化全量获取

Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,首先代码如下:


// DiscoveryClient.java
/**
* Applications 在本地的缓存
*/
private final AtomicReferenceApplications localRegionApps = new AtomicReferenceApplications();

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    ProviderBackupRegistry backupRegistryProvider) {

     // ... 省略无关代码

    // 【3.2.5】初始化应用集合在本地的缓存
    localRegionApps.set(new Applications());

    // ... 省略无关代码     

    // 【3.2.12】从 Eureka-Server 拉取注册信息
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

     // ... 省略无关代码       
}
  • `com.netflix.discovery.shared.Applications`,注册的应用集合。较为容易理解,点击 链接 链接查看带中文注释的类,这里就不啰嗦了。Applications 与 InstanceInfo 类关系如下:
    注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取
  • 配置 `eureka.shouldFetchRegistry = true`,开启从 Eureka-Server 获取注册信息。默认值:`true` 。
  • 调用 `#fetchRegistry(false)` 方法,从 Eureka-Server **全量**获取注册信息,在 「2.4 发起获取注册信息」 详细解析。
  • 配置  eureka.shouldFetchRegistry = true,开启从 Eureka-Server 获取注册信息。默认值: true 。

    2.2 定时获取

    Eureka-Client 在初始化过程中,创建获取注册信息线程,固定间隔向 Eureka-Server 发起获取注册信息( fetch ),刷新本地注册信息缓存。实现代码如下:

    
    // DiscoveryClient.java
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                   ProviderBackupRegistry backupRegistryProvider) {
        // ... 省略无关代码
    
        // 【3.2.9】初始化线程池
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
             new ThreadFactoryBuilder()
                     .setNameFormat("DiscoveryClient-%d")
                     .setDaemon(true)
                     .build());
    
        cacheRefreshExecutor = new ThreadPoolExecutor(
             1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
             new SynchronousQueueRunnable(),
             new ThreadFactoryBuilder()
                     .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                     .setDaemon(true)
                     .build()
         );  // use direct handoff
    
        // ... 省略无关代码
    
        // 【3.2.14】初始化定时任务
        initScheduledTasks();
    
        // ... 省略无关代码
    }
    
    private void initScheduledTasks() {
        // 向 Eureka-Server 心跳(续租)执行器
        if (clientConfig.shouldFetchRegistry()) {
           // registry cache refresh timer
           int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
           int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
           scheduler.schedule(
                   new TimedSupervisorTask(
                           "cacheRefresh",
                           scheduler,
                           cacheRefreshExecutor,
                           registryFetchIntervalSeconds,
                           TimeUnit.SECONDS,
                           expBackOffBound,
                           new CacheRefreshThread()
                   ),
                   registryFetchIntervalSeconds, TimeUnit.SECONDS);
         }
         // ... 省略无关代码
    }
    
  • 初始化定时任务代码,和**续租**的定时任务代码类似,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租 》 有详细解析,这里不重复分享。
  • `com.netflix.discovery.DiscoveryClient.CacheRefreshThread`,注册信息缓存刷新任务,实现代码如下:
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }
    
      - 调用 `#refreshRegistry(false)` 方法,刷新注册信息缓存,在 「2.3 刷新注册信息缓存」 详细解析。

      com.netflix.discovery.DiscoveryClient.CacheRefreshThread,注册信息缓存刷新任务,实现代码如下:

      2.3 刷新注册信息缓存

      调用  #refreshRegistry(false) 方法,刷新注册信息缓存,实现代码如下:

      
      // DiscoveryClient.java
        1: void refreshRegistry() {
        2:     try {
        3:         // TODO 芋艿:TODO[0009]:RemoteRegionRegistry
        4:         boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
        5: 
        6:         boolean remoteRegionsModified = false;
        7:         // This makes sure that a dynamic change to remote regions to fetch is honored.
        8:         String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        9:         if (null != latestRemoteRegions) {
       10:             String currentRemoteRegions = remoteRegionsToFetch.get();
       11:             if (!latestRemoteRegions.equals(currentRemoteRegions)) {
       12:                 // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
       13:                 synchronized (instanceRegionChecker.getAzToRegionMapper()) {
       14:                     if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
       15:                         String[] remoteRegions = latestRemoteRegions.split(",");
       16:                         remoteRegionsRef.set(remoteRegions);
       17:                         instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
       18:                         remoteRegionsModified = true;
       19:                     } else {
       20:                         logger.info("Remote regions to fetch modified concurrently," +
       21:                                 " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
       22:                     }
       23:                 }
       24:             } else {
       25:                 // Just refresh mapping to reflect any DNS/Property change
       26:                 instanceRegionChecker.getAzToRegionMapper().refreshMapping();
       27:             }
       28:         }
       29: 
       30:         boolean success = fetchRegistry(remoteRegionsModified);
       31:         if (success) {
       32:             // 设置 注册信息的应用实例数
       33:             registrySize = localRegionApps.get().size();
       34:             // 设置 最后获取注册信息时间
       35:             lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
       36:         }
       37: 
       38:         // 打印日志
       39:         if (logger.isDebugEnabled()) {
       40:             StringBuilder allAppsHashCodes = new StringBuilder();
       41:             allAppsHashCodes.append("Local region apps hashcode: ");
       42:             allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
       43:             allAppsHashCodes.append(", is fetching remote regions? ");
       44:             allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
       45:             for (Map.EntryString, Applications entry : remoteRegionVsApps.entrySet()) {
       46:                 allAppsHashCodes.append(", Remote region: ");
       47:                 allAppsHashCodes.append(entry.getKey());
       48:                 allAppsHashCodes.append(" , apps hashcode: ");
       49:                 allAppsHashCodes.append(entry.getValue().getAppsHashCode());
       50:             }
       51:             logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
       52:                     allAppsHashCodes.toString());
       53:         }
       54:     } catch (Throwable e) {
       55:         logger.error("Cannot fetch registry from server", e);
       56:     }        
       57: }
      
    • 第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry
    • 第 30 行 :调用 `#fetchRegistry(false)` 方法,从 Eureka-Server 获取注册信息,在 「2.4 发起获取注册信息」 详细解析。
    • 第 31 至 36 行 :获取注册信息成功,设置注册信息的应用实例数,最后获取注册信息时间。变量代码如下:
      /**
      * 注册信息的应用实例数
      */
      private volatile int registrySize = 0;
      /**
      * 最后成功从 Eureka-Server 拉取注册信息时间戳
      */
      private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
      
    • 第 38 至 53 行 :打印调试日志。
    • 第 54 至 56 行 :打印**异常**日志。
    • 第 30 行 :调用  #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息,在 「2.4 发起获取注册信息」 详细解析。

      第 38 至 53 行 :打印调试日志。

      2.4 发起获取注册信息

      调用  #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息( 根据条件判断,可能是全量,也可能是增量 ),实现代码如下:

      
        1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        2:     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
        3: 
        4:     try {
        5:         // 获取 本地缓存的注册的应用实例集合
        6:         // If the delta is disabled or if it is the first time, get all
        7:         // applications
        8:         Applications applications = getApplications();
        9: 
       10:         // 全量获取
       11:         if (clientConfig.shouldDisableDelta() // 禁用增量获取
       12:                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
       13:                 || forceFullRegistryFetch
       14:                 || (applications == null) // 空
       15:                 || (applications.getRegisteredApplications().size() == 0) // 空
       16:                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
       17:         {
       18:             logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
       19:             logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
       20:             logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
       21:             logger.info("Application is null : {}", (applications == null));
       22:             logger.info("Registered Applications size is zero : {}",
       23:                     (applications.getRegisteredApplications().size() == 0));
       24:             logger.info("Application version is -1: {}", (applications.getVersion() == -1));
       25:             // 执行 全量获取
       26:             getAndStoreFullRegistry();
       27:         } else {
       28:             // 执行 增量获取
       29:             getAndUpdateDelta(applications);
       30:         }
       31:         // 设置 应用集合 hashcode
       32:         applications.setAppsHashCode(applications.getReconcileHashCode());
       33:         // 打印 本地缓存的注册的应用实例数量
       34:         logTotalInstances();
       35:     } catch (Throwable e) {
       36:         logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
       37:         return false;
       38:     } finally {
       39:         if (tracer != null) {
       40:             tracer.stop();
       41:         }
       42:     }
       43: 
       44:     // Notify about cache refresh before updating the instance remote status
       45:     onCacheRefreshed();
       46: 
       47:     // Update remote status based on refreshed data held in the cache
       48:     updateInstanceRemoteStatus();
       49: 
       50:     // registry was fetched successfully, so return true
       51:     return true;
       52: }
      
    • 第 5 至 8 行 :获取本地缓存的注册的应用实例集合,实现代码如下:
      public Applications getApplications() {
         return localRegionApps.get();
      }
      
    • 第 10 至 26 行 :**全量**获取注册信息。
        - 第 11 行 :配置 `eureka.disableDelta = true` ,禁用**增量**获取注册信息。默认值:`false` 。 - 第 12 行 :只获得一个 `vipAddress` 对应的应用实例们的注册信息。 - 第 13 行 :方法参数 `forceFullRegistryFetch` 强制**全量**获取注册信息。 - 第 14 至 15 行 :本地缓存为空。 - 第 25 至 26 行 :调用 `#getAndStoreFullRegistry()` 方法,**全量**获取注册信息,并设置到本地缓存。下文详细解析。

        第 10 至 26 行 :全量获取注册信息。

        第 27 至 30 行 :增量获取注册信息,并刷新本地缓存,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

        第 31 至 32 行 :计算应用集合  hashcode 。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

        第 33 至 34 行 :打印调试日志,输出本地缓存的注册的应用实例数量。实现代码如下:

        
        private void logTotalInstances() {
           if (logger.isDebugEnabled()) {
               int totInstances = 0;
               for (Application application : getApplications().getRegisteredApplications()) {
                   totInstances += application.getInstancesAsIsFromEureka().size();
               }
               logger.debug("The total number of all instances in the client now is {}", totInstances);
           }
        }
        

        第 44 至 45 行 :触发 CacheRefreshedEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。

        • x
        • `#onCacheRefreshed()` 方法,实现代码如下:
          /**
        • Eureka 事件监听器

        */
        private final CopyOnWriteArraySetEurekaEventListener eventListeners = new CopyOnWriteArraySet();

        protected void onCacheRefreshed() {
           fireEvent(new CacheRefreshedEvent());
        }

        protected void fireEvent(final EurekaEvent event) {
           for (EurekaEventListener listener : eventListeners) {
               listener.onEvent(event);
           }
        }

      • **笔者的YY** :你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到**持久化**最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下:
        // 【3.2.12】从 Eureka-Server 拉取注册信息
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
        
      • 笔者的YY :你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到持久化最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下:

        
        // 【3.2.12】从 Eureka-Server 拉取注册信息
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
        

        第47 至 48 行 :更新本地缓存的当前应用实例在 Eureka-Server 的状态。

        
          1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; 
          2: 
          3: private synchronized void updateInstanceRemoteStatus() {
          4:     // Determine this instance's status for this app and set to UNKNOWN if not found
          5:     InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
          6:     if (instanceInfo.getAppName() != null) {
          7:         Application app = getApplication(instanceInfo.getAppName());
          8:         if (app != null) {
          9:             InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
         10:             if (remoteInstanceInfo != null) {
         11:                 currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
         12:             }
         13:         }
         14:     }
         15:     if (currentRemoteInstanceStatus == null) {
         16:         currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
         17:     }
         18: 
         19:     // Notify if status changed
         20:     if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
         21:         onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
         22:         lastRemoteInstanceStatus = currentRemoteInstanceStatus;
         23:     }
         24: }
        
        • Eureka-Client 本地应用实例与 Eureka-Server 的该应用实例状态不同的原因,因为应用实例的覆盖状态,在 《Eureka 源码解析 —— 应用实例注册发现 (八)之覆盖状态》 有详细解析。
        • 第 4 至 14 行 :从注册信息中获取当前应用在 Eureka-Server 的状态。
        • 第 19 至 23 行 :对比**本地缓存**和**最新的**的当前应用实例在 Eureka-Server 的状态,若不同,更新**本地缓存**( **注意,只更新该缓存变量,不更新本地当前应用实例的状态( `instanceInfo.status` )** ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。`#onRemoteStatusChanged(...)` 实现代码如下:
          protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
             fireEvent(new StatusChangeEvent(oldStatus, newStatus));
          }
          

        第 19 至 23 行 :对比本地缓存最新的的当前应用实例在 Eureka-Server 的状态,若不同,更新本地缓存注意,只更新该缓存变量,不更新本地当前应用实例的状态(  instanceInfo.status ) ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。 #onRemoteStatusChanged(...) 实现代码如下:

        2.4.1 全量获取注册信息,并设置到本地缓存

        调用  #getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。下实现代码如下:

        
          1: private void getAndStoreFullRegistry() throws Throwable {
          2:     long currentUpdateGeneration = fetchRegistryGeneration.get();
          3: 
          4:     logger.info("Getting all instance registry info from the eureka server");
          5: 
          6:     // 全量获取注册信息
          7:     Applications apps = null;
          8:     EurekaHttpResponseApplications httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
          9:             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
         10:             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
         11:     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
         12:         apps = httpResponse.getEntity();
         13:     }
         14:     logger.info("The response status is {}", httpResponse.getStatusCode());
         15: 
         16:     // 设置到本地缓存
         17:     if (apps == null) {
         18:         logger.error("The application is null for some reason. Not storing this information");
         19:     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
         20:         localRegionApps.set(this.filterAndShuffle(apps));
         21:         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
         22:     } else {
         23:         logger.warn("Not updating applications as another thread is updating it already");
         24:     }
         25: }
        
      • 第 6 至 14 行 :**全量**获取注册信息,实现代码如下:
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        
        // AbstractJerseyEurekaHttpClient.java
        @Override
        public EurekaHttpResponseApplications getApplications(String... regions) {
           return getApplicationsInternal("apps/", regions);
        }
         
        private EurekaHttpResponseApplications getApplicationsInternal(String urlPath, String[] regions) {
           ClientResponse response = null;
           String regionsParamValue = null;
           try {
               WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
               if (regions != null && regions.length  0) {
                   regionsParamValue = StringUtil.join(regions);
                   webResource = webResource.queryParam("regions", regionsParamValue);
               }
               Builder requestBuilder = webResource.getRequestBuilder();
               addExtraHeaders(requestBuilder);
               response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
           Applications applications = null;
           if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
               applications = response.getEntity(Applications.class);
           }
           return anEurekaHttpResponse(response.getStatus(), Applications.class)
                   .headers(headersOf(response))
                   .entity(applications)
                   .build();
          } finally {
              if (logger.isDebugEnabled()) {
                  logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                          serviceUrl, urlPath,
                          regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                          response == null ? "N/A" : response.getStatus()
                  );
              }
              if (response != null) {
                  response.close();
              }
          }
        }

        private EurekaHttpResponseApplications getApplicationsInternal(String urlPath, String[] regions) {
          ClientResponse response = null;
          String regionsParamValue = null;
          try {
              WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
              if (regions != null && regions.length 0) {
                  regionsParamValue = StringUtil.join(regions);
                  webResource = webResource.queryParam(“regions”, regionsParamValue);
              }
              Builder requestBuilder = webResource.getRequestBuilder();
              addExtraHeaders(requestBuilder);
              response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
          Applications applications = null;
          if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
              applications = response.getEntity(Applications.class);
          }
          return anEurekaHttpResponse(response.getStatus(), Applications.class)
                  .headers(headersOf(response))
                  .entity(applications)
                  .build();
          } finally {
              if (logger.isDebugEnabled()) {
                  logger.debug(“Jersey HTTP GET {}/{}?{}; statusCode={}”,
                          serviceUrl, urlPath,
                          regionsParamValue == null ? “” : “regions=” + regionsParamValue,
                          response == null ? “N/A” : response.getStatus()
                  );
              }
              if (response != null) {
                  response.close();
              }
          }
        }

          - 调用 `AbstractJerseyEurekaHttpClient#getApplications(...)` 方法,GET 请求 Eureka-Server 的 `apps/` 接口,参数为 `regions` ,返回格式为 JSON ,实现**全量获取注册信息**。
          
          com.netflix.eureka.resources.ApplicationsResource</code>,处理**所有**应用的请求操作的 Resource ( Controller )</p>
          接收全量获取请求,映射 `ApplicationsResource#getContainers()` 方法,实现代码如下:
          <pre style="margin-top: 0px; margin-bottom: 0px; padding: 0px; font-size: 15px; color: #3e3e3e; line-height: inherit; letter-spacing: 2px; word-spacing: 2px; background-color: #ffffff;"><code class="Java language-Java hljs" style="margin-right: 2px; margin-left: 2px; padding: 0.5em; font-size: 14px; color: #a9b7c6; line-height: 18px; border-radius: 0px; background: #282b2e; font-family: Consolas, Inconsolata, Courier, monospace; display: block; overflow-x: auto; word-spacing: 0px; letter-spacing: 0px; word-wrap: normal !important; overflow-y: auto !important;">  1: @GET
            2: public Response getContainers(@PathParam("version") String version,
            3:                               @HeaderParam(HEADER_ACCEPT) String acceptHeader,
            4:                               @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
            5:                               @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
            6:                               @Context UriInfo uriInfo,
            7:                               @Nullable @QueryParam("regions") String regionsStr) {
            8:     // TODO[0009]:RemoteRegionRegistry
            9:     boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
           10:     String[] regions = null;
           11:     if (!isRemoteRegionRequested) {
           12:         EurekaMonitors.GET_ALL.increment();
           13:     } else {
           14:         regions = regionsStr.toLowerCase().split(",");
           15:         Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
           16:         EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
           17:     }
           18: 
           19:     // 判断是否可以访问
           20:     // Check if the server allows the access to the registry. The server can
           21:     // restrict access if it is not
           22:     // ready to serve traffic depending on various reasons.
           23:     if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
           24:         return Response.status(Status.FORBIDDEN).build();
           25:     }
           26: 
           27:     // API 版本
           28:     CurrentRequestVersion.set(Version.toEnum(version));
           29: 
           30:     // 返回数据格式
           31:     KeyType keyType = Key.KeyType.JSON;
           32:     String returnMediaType = MediaType.APPLICATION_JSON;
           33:     if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
           34:         keyType = Key.KeyType.XML;
           35:         returnMediaType = MediaType.APPLICATION_XML;
           36:     }
           37: 
           38:     // 响应缓存键( KEY )
           39:     Key cacheKey = new Key(Key.EntityType.Application,
           40:             ResponseCacheImpl.ALL_APPS,
           41:             keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
           42:     );
           43: 
           44:     //
           45:     Response response;
           46:     if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
           47:         response = Response.ok(responseCache.getGZIP(cacheKey))
           48:                 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
           49:                 .header(HEADER_CONTENT_TYPE, returnMediaType)
           50:                 .build();
           51:     } else {
           52:         response = Response.ok(responseCache.get(cacheKey))
           53:                 .build();
           54:     }
           55:     return response;
           56: }
          
        • 第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry
        • 第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。
        • 第 27 至 28 行 :设置 API 版本号。**默认**最新 API 版本为 V2。实现代码如下:
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          
          public enum Version {
              V1, V2;
          public static Version toEnum(String v) {
              for (Version version : Version.values()) {
                  if (version.name().equalsIgnoreCase(v)) {
                      return version;
                  }
              }
              //Defaults to v2
              return V2;
          }
          }
        • 第 30 至 36 行 :设置返回数据格式,默认 JSON 。
        • 第 38 至 42 行 :创建响应缓存( ResponseCache ) 的键( KEY ),在 「3.2.1 缓存键」详细解析。
        • 第 44 至 55 行 :从响应缓存读取**全量**注册信息,在 「3.3 缓存读取」详细解析。
        • 第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。

          
          com.netflix.eureka.registry.ResponseCache</code>,响应缓存**接口**,接口代码如下:</p>
          <pre style="margin-top: 0px; margin-bottom: 0px; padding: 0px; font-size: 15px; color: #3e3e3e; line-height: inherit; letter-spacing: 2px; word-spacing: 2px; background-color: #ffffff;"><code class="Java language-Java hljs" style="margin-right: 2px; margin-left: 2px; padding: 0.5em; font-size: 14px; color: #a9b7c6; line-height: 18px; border-radius: 0px; background: #282b2e; font-family: Consolas, Inconsolata, Courier, monospace; display: block; overflow-x: auto; word-spacing: 0px; letter-spacing: 0px; word-wrap: normal !important; overflow-y: auto !important;">public interface ResponseCache {
          
              String get(Key key);
          
              byte[] getGZIP(Key key);
          
              void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
          
              AtomicLong getVersionDelta();
          
              AtomicLong getVersionDeltaWithRegions();
          
          }
          
        • 其中,`#getVersionDelta()` 和 `#getVersionDeltaWithRegions()` 已经废弃。这里保留的原因主要是考虑兼容性。判断依据来自如下代码:
          // Applications.java
          @Deprecated
          public void setVersion(Long version) {
             this.versionDelta = version;
          }
          
          

          // AbstractInstanceRegistry.java
          public Applications getApplicationDeltas() {
             // … 省略其它无关代码
             apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方
             // … 省略其它无关代码
          }

        • `#get()` :获得缓存。
        • `#getGZIP()` :获得缓存,并 GZIP 。
        • `#invalidate()` :过期缓存。
        • #get() :获得缓存。

          #invalidate() :过期缓存。

          3.2.1 缓存键

          com.netflix.eureka.registry.Key,缓存键。实现代码如下:

          
          public class Key {
          
              public enum KeyType {
                  JSON, XML
              }
          
              /**
               * An enum to define the entity that is stored in this cache for this key.
               */
              public enum EntityType {
                  Application, VIP, SVIP
              }
          
              /**
               * 实体名
               */
              private final String entityName;
              /**
               * TODO[0009]:RemoteRegionRegistry
               */
              private final String[] regions;
              /**
               * 请求参数类型
               */
              private final KeyType requestType;
              /**
               * 请求 API 版本号
               */
              private final Version requestVersion;
              /**
               * hashKey
               */
              private final String hashKey;
              /**
               * 实体类型
               *
               * {@link EntityType}
               */
              private final EntityType entityType;
              /**
               * {@link EurekaAccept}
               */
              private final EurekaAccept eurekaAccept;
          
              public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
                  this.regions = regions;
                  this.entityType = entityType;
                  this.entityName = entityName;
                  this.requestType = type;
                  this.requestVersion = v;
                  this.eurekaAccept = eurekaAccept;
                  hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                          + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
              }
          
              public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
                  this.regions = regions;
                  this.entityType = entityType;
                  this.entityName = entityName;
                  this.requestType = type;
                  this.requestVersion = v;
                  this.eurekaAccept = eurekaAccept;
                  hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                          + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
              }
          
              @Override
              public int hashCode() {
                  String hashKey = getHashKey();
                  return hashKey.hashCode();
              }
          
              @Override
              public boolean equals(Object other) {
                  if (other instanceof Key) {
                      return getHashKey().equals(((Key) other).getHashKey());
                  } else {
                      return false;
                  }
              }
          
          }
          

          3.2.2 响应缓存实现类

          com.netflix.eureka.registry.ResponseCacheImpl,响应缓存实现类。

          在 ResponseCacheImpl 里,将缓存拆分成两层 :

          • 只读缓存 readOnlyCacheMap )
          • 固定过期 + 固定大小读写缓存 readWriteCacheMap )

          默认配置下,缓存读取策略如下:

          缓存过期策略如下:

          • 应用实例注册、下线、过期时,只只只过期  readWriteCacheMap 。
          • readWriteCacheMap 写入一段时间( 可配置 )后自动过期。
          • 定时任务对比  readWriteCacheMap 和  readOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了  readOnlyCacheMap 的定时过期。

          注意:应用实例注册、下线、过期时,不会很快刷新到  readWriteCacheMap 缓存里。默认配置下,最大延迟在 30 秒。

          为什么可以使用缓存?

          在 CAP 的选择上,Eureka 选择了 AP ,不同于 Zookeeper 选择了 CP 。

          推荐阅读:

          • 《为什么不应该使用ZooKeeper做服务发现》
          • 《Spring Cloud Netflix Eureka源码导读与原理分析》「4. 作为服务注册中心,Eureka比Zookeeper好在哪里」

          3.3 缓存读取

          调用  ResponseCacheImpl#get(...) 方法(  #getGzip(...) 类似 ),读取缓存,实现代码如下:

          
            1: private final ConcurrentMapKey, Value readOnlyCacheMap = new ConcurrentHashMapKey, Value();
            2: 
            3: private final LoadingCacheKey, Value readWriteCacheMap;
            4: 
            5: public String get(final Key key) {
            6:     return get(key, shouldUseReadOnlyResponseCache);
            7: }
            8: 
            9: String get(final Key key, boolean useReadOnlyCache) {
           10:     Value payload = getValue(key, useReadOnlyCache);
           11:     if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
           12:         return null;
           13:     } else {
           14:         return payload.getPayload();
           15:     }
           16: }
           17: 
           18: Value getValue(final Key key, boolean useReadOnlyCache) {
           19:     Value payload = null;
           20:     try {
           21:         if (useReadOnlyCache) {
           22:             final Value currentPayload = readOnlyCacheMap.get(key);
           23:             if (currentPayload != null) {
           24:                 payload = currentPayload;
           25:             } else {
           26:                 payload = readWriteCacheMap.get(key);
           27:                 readOnlyCacheMap.put(key, payload);
           28:             }
           29:         } else {
           30:             payload = readWriteCacheMap.get(key);
           31:         }
           32:     } catch (Throwable t) {
           33:         logger.error("Cannot get value for key :" + key, t);
           34:     }
           35:     return payload;
           36: }
          
        • 第 5 至 7 行 :调用 `#get(key, useReadOnlyCache)` 方法,读取缓存。其中 `shouldUseReadOnlyResponseCache` 通过配置 `eureka.shouldUseReadOnlyResponseCache = true` (默认值 :`true` ) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了 `readOnlyCacheMap` ,性能会有一定的下降。
        • 第 9 至 16 行 :调用 `getValue(key, useReadOnlyCache)` 方法,读取缓存。从 `readOnlyCacheMap` 和 `readWriteCacheMap` 变量可以看到缓存值的类为 `com.netflix.eureka.registry.ResponseCacheImpl.Value` ,实现代码如下:
          public class Value {
          
          

            /**
             * 原始值
             /
            private final String payload;
            /
          *
             * GZIP 压缩后的值
             */
            private byte[] gzipped;

            public Value(String payload) {
                this.payload = payload;
                if (!EMPTY_PAYLOAD.equals(payload)) {
                    // … 省略 GZIP 压缩代码
                    gzipped = bos.toByteArray();
                } else {
                    gzipped = null;
                }
            }

            public String getPayload() {
                return payload;
            }

            public byte[] getGzipped() {
                return gzipped;
            }

          }

        • 第 21 至 31 行 :读取缓存。
            - `readWriteCacheMap` 最大缓存数量为 1000 。 - 调用 `#generatePayload(key)` 方法,生成缓存值。
          • 第 21 至 28 行 :先读取 `readOnlyCacheMap` 。读取不到,读取 `readWriteCacheMap` ,并设置到 `readOnlyCacheMap` 。
          • 第 29 至 31 行 :读取 `readWriteCacheMap` 。
          • `readWriteCacheMap` 实现代码如下:
            this.readWriteCacheMap =
                  CacheBuilder.newBuilder().initialCapacity(1000)
                          .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                          .removalListener(new RemovalListenerKey, Value() {
                              @Override
                              public void onRemoval(RemovalNotificationKey, Value notification) {
                                  // TODO[0009]:RemoteRegionRegistry
                                  Key removedKey = notification.getKey();
                                  if (removedKey.hasRegions()) {
                                      Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                      regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                  }
                              }
                          })
                          .build(new CacheLoaderKey, Value() {
                              @Override
                              public Value load(Key key) throws Exception {
                                  // // TODO[0009]:RemoteRegionRegistry
                                  if (key.hasRegions()) {
                                      Key cloneWithNoRegions = key.cloneWithoutRegions();
                                      regionSpecificKeys.put(cloneWithNoRegions, key);
                                  }
                                  Value value = generatePayload(key);
                                  return value;
                              }
                          });
            
          • 第 9 至 16 行 :调用  getValue(key, useReadOnlyCache) 方法,读取缓存。从  readOnlyCacheMap 和  readWriteCacheMap 变量可以看到缓存值的类为  com.netflix.eureka.registry.ResponseCacheImpl.Value ,实现代码如下:

            第 21 至 28 行 :先读取  readOnlyCacheMap 。读取不到,读取  readWriteCacheMap ,并设置到  readOnlyCacheMap 。

            readWriteCacheMap 实现代码如下:

            
            this.readWriteCacheMap =
                  CacheBuilder.newBuilder().initialCapacity(1000)
                          .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                          .removalListener(new RemovalListenerKey, Value() {
                              @Override
                              public void onRemoval(RemovalNotificationKey, Value notification) {
                                  // TODO[0009]:RemoteRegionRegistry
                                  Key removedKey = notification.getKey();
                                  if (removedKey.hasRegions()) {
                                      Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                      regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                  }
                              }
                          })
                          .build(new CacheLoaderKey, Value() {
                              @Override
                              public Value load(Key key) throws Exception {
                                  // // TODO[0009]:RemoteRegionRegistry
                                  if (key.hasRegions()) {
                                      Key cloneWithNoRegions = key.cloneWithoutRegions();
                                      regionSpecificKeys.put(cloneWithNoRegions, key);
                                  }
                                  Value value = generatePayload(key);
                                  return value;
                              }
                          });
            

            #generatePayload(key) 方法,实现代码如下:

            
              1: private Value generatePayload(Key key) {
              2:     Stopwatch tracer = null;
              3:     try {
              4:         String payload;
              5:         switch (key.getEntityType()) {
              6:             case Application:
              7:                 boolean isRemoteRegionRequested = key.hasRegions();
              8: 
              9:                 if (ALL_APPS.equals(key.getName())) {
             10:                     if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
             11:                         tracer = serializeAllAppsWithRemoteRegionTimer.start();
             12:                         payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
             13:                     } else {
             14:                         tracer = serializeAllAppsTimer.start();
             15:                         payload = getPayLoad(key, registry.getApplications());
             16:                     }
             17:                 } else if (ALL_APPS_DELTA.equals(key.getName())) {
             18:                     // ... 省略增量获取相关的代码
             19:                  } else {
             20:                     tracer = serializeOneApptimer.start();
             21:                     payload = getPayLoad(key, registry.getApplication(key.getName()));
             22:                 }
             23:                 break;
             24:             // ... 省略部分代码 
             25:         }
             26:         return new Value(payload);
             27:     } finally {
             28:         if (tracer != null) {
             29:             tracer.stop();
             30:         }
             31:     }
             32: }
            
            • 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
            • 第 13 至 16 行 :调用  AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合。后调用  #getPayLoad() 方法,将注册的应用集合转换成缓存值。🙂 这两个方法代码较多,下面详细解析。
            • 第 17 至 18 行 :获取增量注册信息的缓存值,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

            3.3.1 获得注册的应用集合

            调用  AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合,实现代码如下:

            
            // ... 省略代码,超过微信文章长度
            
          • 第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry
          • 第 9 至 16 行 :调用 `#getApplicationsFromMultipleRegions(...)` 方法,获得注册的应用集合,实现代码如下:
            `// ... 省略代码,超过微信文章长度`
              - 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry - 第 11 至 29 行 :获得获得注册的应用集合。 - 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry - 第 61 行 :计算应用集合 `hashcode` 。该变量用于校验**增量**获取的注册信息和 Eureka-Server **全量**的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

              第 9 至 16 行 :调用  #getApplicationsFromMultipleRegions(...) 方法,获得注册的应用集合,实现代码如下:

              3.3.2 转换成缓存值

              调用  #getPayLoad() 方法,将注册的应用集合转换成缓存值,实现代码如下:

              
              // ... 省略代码,超过微信文章长度
              

              3.4 主动过期读写缓存

              应用实例注册、下线、过期时,调用  ResponseCacheImpl#invalidate() 方法,主动过期读写缓存(  readWriteCacheMap ),实现代码如下:

              
              // ... 省略代码,超过微信文章长度
              
            • 调用 `#invalidate(keys)` 方法,逐个过期每个缓存键值,实现代码如下:
              `// ... 省略代码,超过微信文章长度`
            • 3.5 被动过期读写缓存

              读写缓存(  readWriteCacheMap ) 写入后,一段时间自动过期,实现代码如下:

              
              expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
              
              • 配置  eureka.responseCacheAutoExpirationInSeconds ,设置写入过期时长。默认值 :180 秒。

              3.6 定时刷新只读缓存

              定时任务对比  readWriteCacheMap 和  readOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了  readOnlyCacheMap 的定时过期。实现代码如下:

              
              // ... 省略代码,超过微信文章长度
              
              • 第 7 至 12 行 :初始化定时任务。配置  eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值 :30 * 1000 毫秒。
              • 第 17 至 39 行 :创建定时任务。
                • 第 22 行 :循环  readOnlyCacheMap 的缓存键。为什么不循环  readWriteCacheMap 呢?  readOnlyCacheMap 的缓存过期依赖  readWriteCacheMap,因此缓存键会更多。
                • 第 28 行 至 33 行 :对比  readWriteCacheMap 和  readOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了  readOnlyCacheMap 的定时过期。

                666. 彩蛋

                比预期,比想想,长老多老多的一篇文章。细思极恐。

                估计下一篇增量获取会简洁很多。

                胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

                注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取
  • 本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 注册中心 Eureka 源码解析 —— 应用实例注册发现(六)之全量获取


     上一篇
    注册中心 Eureka 源码解析 —— 应用实例注册发现(七)之增量获取 注册中心 Eureka 源码解析 —— 应用实例注册发现(七)之增量获取
    中文详细注释的开源项目 Java 并发源码合集 RocketMQ 源码合集 Sharding-JDBC 源码解析合集 Spring MVC 和 Security 源码合集 MyCAT 源码解析合集 本文主要基于 Eureka 1.8.X
    2021-04-05
    下一篇 
    注册中心 Eureka 源码解析 —— 应用实例注册发现 (四)之自我保护机制 注册中心 Eureka 源码解析 —— 应用实例注册发现 (四)之自我保护机制
    中文详细注释的开源项目 Java 并发源码合集 RocketMQ 源码合集 Sharding-JDBC 源码解析合集 Spring MVC 和 Security 源码合集 MyCAT 源码解析合集 本文主要基于 Eureka 1.8.X
    2021-04-05