Eureka-12丨服务实例故障下线

Posted by jiefang on November 29, 2020

服务实例故障下线

eureka依靠服务实例发送心跳,来感知服务实例是否已经宕机。如果一段时间没有接受到服务心跳,则将服务实例移除。

源码

在eureka server初始化时,会初始化实例移除的线程和定时器。 EurekaBootStrap.initEurekaServerContext()->PeerAwareInstanceRegistryImpl.openForTraffic()->AbstractInstanceRegistry.postInit()

EurekaBootStrap.initEurekaServerContext()

1
2
3
4
5
protected void initEurekaServerContext() throws Exception {
        // Copy registry from neighboring eureka node
    int registryCount = registry.syncUp();
    registry.openForTraffic(applicationInfoManager, registryCount);
}

PeerAwareInstanceRegistryImpl.openForTraffic()

1
2
3
4
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    ...
    super.postInit();
}

AbstractInstanceRegistry.postInit()

1
2
3
4
5
6
7
8
9
10
11
12
protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    //设置运行任务的线程
    evictionTaskRef.set(new EvictionTask());
    //设置timer定时器,60S执行一次
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

EvictionTask.run()

移除故障服务实例任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void run() {
    try {
        //获取剔除时间
        long compensationTimeMs = getCompensationTimeMs();
        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
        evict(compensationTimeMs);
    } catch (Throwable e) {
        logger.error("Could not run the evict task", e);
    }
}

long getCompensationTimeMs() {
    long currNanos = getCurrentTimeNano();
    long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
    if (lastNanos == 0l) {
        return 0l;
    }
    long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
    //当前时间-上次时间-60S
    long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
    return compensationTime <= 0l ? 0l : compensationTime;
}

evict(long additionalLeaseMs)

遍历注册表中的所有服务实例,通过判断实例是否过期,过期的实例放入失效实例list.

  • 分批失效机制:每次最多移除所有实例数量*15%的失效实例;
  • 随机失效机制:从失效实例中随机选取可以移除的服务实例;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    
    public void evict(long additionalLeaseMs) {
      logger.debug("Running the evict task");
    
      if (!isLeaseExpirationEnabled()) {
          logger.debug("DS: lease expiration is currently disabled.");
          return;
      }
    
      // We collect first all expired items, to evict them in random order. For large eviction sets,
      // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
      // the impact should be evenly distributed across all applications.
      List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
      for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
          Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
          if (leaseMap != null) {
              for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                  Lease<InstanceInfo> lease = leaseEntry.getValue();
                  //判断实例是否失效,这里有点bug
                  if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                      expiredLeases.add(lease);
                  }
              }
          }
      }
    
      // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
      // triggering self-preservation. Without that we would wipe out full registry.
      int registrySize = (int) getLocalRegistrySize();
      int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
      //不会一次将所有没有心跳的服务实例全部移除,每次最多移除所有实例数量*15%
      int evictionLimit = registrySize - registrySizeThreshold;
    
      int toEvict = Math.min(expiredLeases.size(), evictionLimit);
      if (toEvict > 0) {
          logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
          //随机选取失效实例进行移除
          Random random = new Random(System.currentTimeMillis());
          for (int i = 0; i < toEvict; i++) {
              // Pick a random item (Knuth shuffle algorithm)
              int next = i + random.nextInt(expiredLeases.size() - i);
              Collections.swap(expiredLeases, i, next);
              Lease<InstanceInfo> lease = expiredLeases.get(i);
    
              String appName = lease.getHolder().getAppName();
              String id = lease.getHolder().getId();
              EXPIRED.increment();
              logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
              //实际移除方法,就是调用服务下线方法
              internalCancel(appName, id, false);
          }
      }
    }
    

    Lease.isExpired()

    duration的默认时间是90S,这里会导致接受上次心跳间隔实际实际为2*duration,即180S

1
2
3
4
5
6
public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
public boolean isExpired(long additionalLeaseMs) {
    return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}