传统的单体平台架构会随着业务发展而变得越来越臃肿,越来越难维护,这时候微服务架构应运而生,他的理念是将庞大的单体应用进行功能拆分,形成多个微服务,各个服务间都是独立解构的,从而解决单体应用在团队开发、维护上的问题。目前,微服务架构已经广泛应用在互联网行业。
在微服务框架上,Spring Cloud 可以说是当前最为流行的框架。Spring Cloud 包含了一整套的组件,涵盖微服务框架上的各个方面。由于Spring Cloud 的使用在网上都有较为全面的教程,因此,本文重点讲解常用组件的原理。为了避免读者觉得枯燥,我就从典型的电商场景来讲。
业务场景
假设我们要开发一个电商平台,可以实现用户下单支付并且发货的功能。那么我们需要有一个订单服务,支付服务,库存服务、仓储服务、积分服务(实际电商平台肯定不止这几个哈),下定支付的业务流程是这样的:
- 查询库存服务获取商品库存
- 商品有库存,去订单服务下订单
- 下定成功后,支付服务执行支付
- 支付完成后,订单服务、库存服务更新状态
- 积分服务完成相应功能
- 仓储服务执行发货
Spring Cloud Eureka
在上面的业务场景中,假如支付服务完成相关操作后,想要调用订单服务,库存服务执行相关更新操作,该如何调用呢?我们连订单服务、库存服务的地址都不知道。这时候,就需要用到服务注册中心了。
在微服务框架中,最为重要的莫过于服务注册中心,可以理解为它是所有服务的中枢,负责服务的注册及服务间发现。有了它,微服务间才能够互相访问。而在Spring Cloud Eureka就是这样一个核心组件。
服务注册
在Spring Cloud的服务治理框架中,每个服务都有一个Eureka Client组件,他们通过Rest请求的方式向注册中心Eureka Server进行注册,并将自己的服务名、主机ip、端口号等一些信息发送给注册中心,注册中心再按服务名分类组织并维护服务清单。
服务在注册后,注册中心会维护一个注册表,那注册表究竟是怎么样的呢?接下来我们就看看源码
1 | public abstract class AbstractInstanceRegistry implements InstanceRegistry { |
可以看到,如上图所示,图中这个名字叫做registry的CocurrentHashMap,就是注册表的核心结构。不了解CocurrentHashMap的话可以查看Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析,
从代码中可以看到,Eureka Server的注册表直接基于纯内存,即在内存里维护了一个数据结构。各个服务的注册、服务下线、服务故障,全部会在内存里维护和更新这个注册表。维护注册表、拉取注册表、更新心跳时间,全部发生在内存里!这是Eureka Server非常核心的一个点。
搞清楚了这个,咱们再来分析一下registry这个东西的数据结构.
- 首先,这个ConcurrentHashMap的key就是服务名称,比如“inventory-service”,就是一个服务名称。
- value则代表了一个服务的多个服务实例。
- 举例:“inventory-service”是可以有3个服务实例的,每个服务实例部署在一台机器上。
再来看看作为value的这个Map:Map<String, Lease
这个Map的key就是服务实例的idvalue是一个叫做Lease的类,它的泛型是一个叫做InstanceInfo的东东首先说下InstanceInfo,这个InstanceInfo是什么呢?
1 | public class InstanceInfo { |
通过源码可以看到,这个InstanceInfo就代表了服务实例的详细信息,比如实例id、ip地址、端口号等。
而这个Lease,里面则会维护每个服务的注册时间、启动时间以及最近一次的服务续约时间(也就是发送心跳的时间)
服务获取
假如订单服务或者仓储服务有一台机器奔溃了,那么如果后续继续向那台机器调用服务的话,肯定会失败的。要避免这种情况就必须要定时更新各个服务的清单,保证服务清单中的机器都是健康的。
在Eureka中,每个注册到注册中心的Eureka Client都需要定时向Eureka Server发送Rest请求,获取全量的服务清单。
- 如果想要定时获取服务,必须保证Eureka Client中的eureka.client.fetch-registery = true,该值默认为true
- 如果希望修改缓存清单的更新时间,可通过修改Eureka Client中的eureka.client.registry-fetch-interval-seconds参数,默认值为30秒
说到服务获取,可以还得再提一下Eureka Server的在服务清单获取上的多级缓存机制,这是为了提高并发访问性能而设计的。
一级缓存ReadOnlyCacheMap,通过ConcurrentMapl来实现。通过定时任务,根据时间间隔responseCacheUpdateIntervalMs(默认为30秒)从ReadWriteCacheMap中加载新数据
1
2
3
4
5
6
7public class ResponseCacheImpl implements ResponseCache {
......
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value> readWriteCacheMap;
......二级缓存ReadWriteCacheMap,通过Google的Gauva cache来实现。同样是通过定时任务,根据时间间隔responseCacheAutoExpirationInSeconds(默认为180秒)从上文讲到的registry中获取最新数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class ResponseCacheImpl implements ResponseCache {
......
private final LoadingCache<Key, Value> readWriteCacheMap;
......
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
......
客户端拉取注册表:
- 首先从ReadOnlyCacheMap里查缓存的注册表
- 如果没有,就找ReadWriteCacheMap里缓存的注册表
- 如果还是没有,则会触发Gauva缓存的CacheLoader.load()方法,主要执行了generatePayload()方法从registry拉取数据并写入到ReadWriteCacheMap中
- 获取到数据后,写入ReadOnlyCacheMap中并返回
服务续约
服务提供者在注册完服务后,需要维护一个心跳用来持续告诉Eureka Server我还活着,防止Eureka Server将该服务实例从可用服务列表中剔除,该动作就叫做服务续约。
关于服务续约有两个重要属性(Eureka Client配置):
- eureka.instance.lease-renewal-interval-in-seconds,用于定义服务续约的调用间隔时间,也就是定时发送心跳的时间,默认为30秒
- eureka.instance.lease-expiration-duration-in-seconds,用于定义服务失效的时间,超过该时间没有发送心跳给Eureka Server,就会将该服务从服务列表剔除,默认为90秒
Eureka Server在启动之后会创建一个定时任务,每隔一段时间(默认为60秒)将当前服务注册表中超时(默认为90秒)没有续约的服务剔除。
接下来我们从源码中来了解服务续约的实现机制:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DiscoveryClient implements EurekaClient {
......
private void initScheduledTasks() {
......
//判断是否应该向Eureka Server注册
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
......
}
服务续约任务的初始化在DicoveryClient中,可以看到,调度线程池、续约线程池、续约间隔、HeartbeatThread全部封装在了TimedSupervisorTask中,TimedSupervisorTask相当于一个包装类或调度类,封装了续约所需要的全部信息。TimedSupervisorTask内部实现了Runnable接口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class DiscoveryClient implements EurekaClient {
......
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
......
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
......
}
续约的具体执行逻辑在renew()方法中,实现较为简单,就是通过Rest请求向Eureka Server发送心跳。
参考资料: