Spring Cloud Eureka原理详解

传统的单体平台架构会随着业务发展而变得越来越臃肿,越来越难维护,这时候微服务架构应运而生,他的理念是将庞大的单体应用进行功能拆分,形成多个微服务,各个服务间都是独立解构的,从而解决单体应用在团队开发、维护上的问题。目前,微服务架构已经广泛应用在互联网行业。

在微服务框架上,Spring Cloud 可以说是当前最为流行的框架。Spring Cloud 包含了一整套的组件,涵盖微服务框架上的各个方面。由于Spring Cloud 的使用在网上都有较为全面的教程,因此,本文重点讲解常用组件的原理。为了避免读者觉得枯燥,我就从典型的电商场景来讲。

业务场景

假设我们要开发一个电商平台,可以实现用户下单支付并且发货的功能。那么我们需要有一个订单服务,支付服务,库存服务、仓储服务、积分服务(实际电商平台肯定不止这几个哈),下定支付的业务流程是这样的:

  • 查询库存服务获取商品库存
  • 商品有库存,去订单服务下订单
  • 下定成功后,支付服务执行支付
  • 支付完成后,订单服务、库存服务更新状态
  • 积分服务完成相应功能
  • 仓储服务执行发货

Spring Cloud Eureka

在上面的业务场景中,假如支付服务完成相关操作后,想要调用订单服务,库存服务执行相关更新操作,该如何调用呢?我们连订单服务、库存服务的地址都不知道。这时候,就需要用到服务注册中心了。

在微服务框架中,最为重要的莫过于服务注册中心,可以理解为它是所有服务的中枢,负责服务的注册及服务间发现。有了它,微服务间才能够互相访问。而在Spring Cloud Eureka就是这样一个核心组件。

服务注册

在Spring Cloud的服务治理框架中,每个服务都有一个Eureka Client组件,他们通过Rest请求的方式向注册中心Eureka Server进行注册,并将自己的服务名、主机ip、端口号等一些信息发送给注册中心,注册中心再按服务名分类组织并维护服务清单。

服务在注册后,注册中心会维护一个注册表,那注册表究竟是怎么样的呢?接下来我们就看看源码

1
2
3
4
5
6
7
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
private static final Logger logger = LoggerFactory.getLogger(AbstractInstanceRegistry.class);

private static final String[] EMPTY_STR_ARRAY = new String[0];
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
...

可以看到,如上图所示,图中这个名字叫做registry的CocurrentHashMap,就是注册表的核心结构。不了解CocurrentHashMap的话可以查看Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

从代码中可以看到,Eureka Server的注册表直接基于纯内存,即在内存里维护了一个数据结构。各个服务的注册、服务下线、服务故障,全部会在内存里维护和更新这个注册表。维护注册表、拉取注册表、更新心跳时间,全部发生在内存里!这是Eureka Server非常核心的一个点。

搞清楚了这个,咱们再来分析一下registry这个东西的数据结构.

  • 首先,这个ConcurrentHashMap的key就是服务名称,比如“inventory-service”,就是一个服务名称。
  • value则代表了一个服务的多个服务实例。
  • 举例:“inventory-service”是可以有3个服务实例的,每个服务实例部署在一台机器上。

再来看看作为value的这个Map:Map<String, Lease>

这个Map的key就是服务实例的idvalue是一个叫做Lease的类,它的泛型是一个叫做InstanceInfo的东东首先说下InstanceInfo,这个InstanceInfo是什么呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class InstanceInfo {
...
public static final int DEFAULT_PORT = 7001;
public static final int DEFAULT_SECURE_PORT = 7002;
public static final int DEFAULT_COUNTRY_ID = 1; // US

// The (fixed) instanceId for this instanceInfo. This should be unique within the scope of the appName.
private volatile String instanceId;

private volatile String appName;
@Auto
private volatile String appGroupName;

private volatile String ipAddr;

private static final String SID_DEFAULT = "na";
@Deprecated
private volatile String sid = SID_DEFAULT;

private volatile int port = DEFAULT_PORT;
private volatile int securePort = DEFAULT_SECURE_PORT;
...

通过源码可以看到,这个InstanceInfo就代表了服务实例的详细信息,比如实例id、ip地址、端口号等。

而这个Lease,里面则会维护每个服务的注册时间、启动时间以及最近一次的服务续约时间(也就是发送心跳的时间)

服务获取

假如订单服务或者仓储服务有一台机器奔溃了,那么如果后续继续向那台机器调用服务的话,肯定会失败的。要避免这种情况就必须要定时更新各个服务的清单,保证服务清单中的机器都是健康的。
在Eureka中,每个注册到注册中心的Eureka Client都需要定时向Eureka Server发送Rest请求,获取全量的服务清单。

  • 如果想要定时获取服务,必须保证Eureka Client中的eureka.client.fetch-registery = true,该值默认为true
  • 如果希望修改缓存清单的更新时间,可通过修改Eureka Client中的eureka.client.registry-fetch-interval-seconds参数,默认值为30秒

说到服务获取,可以还得再提一下Eureka Server的在服务清单获取上的多级缓存机制,这是为了提高并发访问性能而设计的。

  • 一级缓存ReadOnlyCacheMap,通过ConcurrentMapl来实现。通过定时任务,根据时间间隔responseCacheUpdateIntervalMs(默认为30秒)从ReadWriteCacheMap中加载新数据

    1
    2
    3
    4
    5
    6
    7
    public class ResponseCacheImpl implements ResponseCache {

    ......
    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();

    private final LoadingCache<Key, Value> readWriteCacheMap;
    ......
  • 二级缓存ReadWriteCacheMap,通过Google的Gauva cache来实现。同样是通过定时任务,根据时间间隔responseCacheAutoExpirationInSeconds(默认为180秒)从上文讲到的registry中获取最新数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class ResponseCacheImpl implements ResponseCache {

    ......
    private final LoadingCache<Key, Value> readWriteCacheMap;
    ......

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;

    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    this.readWriteCacheMap =
    CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
    .removalListener(new RemovalListener<Key, Value>() {
    @Override
    public void onRemoval(RemovalNotification<Key, Value> notification) {
    Key removedKey = notification.getKey();
    if (removedKey.hasRegions()) {
    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
    }
    }
    })
    .build(new CacheLoader<Key, Value>() {
    @Override
    public Value load(Key key) throws Exception {
    if (key.hasRegions()) {
    Key cloneWithNoRegions = key.cloneWithoutRegions();
    regionSpecificKeys.put(cloneWithNoRegions, key);
    }
    Value value = generatePayload(key);
    return value;
    }
    });
    ......

客户端拉取注册表:

  • 首先从ReadOnlyCacheMap里查缓存的注册表
  • 如果没有,就找ReadWriteCacheMap里缓存的注册表
  • 如果还是没有,则会触发Gauva缓存的CacheLoader.load()方法,主要执行了generatePayload()方法从registry拉取数据并写入到ReadWriteCacheMap中
  • 获取到数据后,写入ReadOnlyCacheMap中并返回

服务续约

服务提供者在注册完服务后,需要维护一个心跳用来持续告诉Eureka Server我还活着,防止Eureka Server将该服务实例从可用服务列表中剔除,该动作就叫做服务续约

关于服务续约有两个重要属性(Eureka Client配置):

  • eureka.instance.lease-renewal-interval-in-seconds,用于定义服务续约的调用间隔时间,也就是定时发送心跳的时间,默认为30秒
  • eureka.instance.lease-expiration-duration-in-seconds,用于定义服务失效的时间,超过该时间没有发送心跳给Eureka Server,就会将该服务从服务列表剔除,默认为90秒

Eureka Server在启动之后会创建一个定时任务,每隔一段时间(默认为60秒)将当前服务注册表中超时(默认为90秒)没有续约的服务剔除。

接下来我们从源码中来了解服务续约的实现机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Singleton
public class DiscoveryClient implements EurekaClient {
......
private void initScheduledTasks() {
......
//判断是否应该向Eureka Server注册
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
......
}

服务续约任务的初始化在DicoveryClient中,可以看到,调度线程池、续约线程池、续约间隔、HeartbeatThread全部封装在了TimedSupervisorTask中,TimedSupervisorTask相当于一个包装类或调度类,封装了续约所需要的全部信息。TimedSupervisorTask内部实现了Runnable接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Singleton
public class DiscoveryClient implements EurekaClient {
......
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {

public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
......
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
......
}

续约的具体执行逻辑在renew()方法中,实现较为简单,就是通过Rest请求向Eureka Server发送心跳。

参考资料: