Eureka源码原理分析
文章目录
- Eureka源码原理分析
- 一:启动过程源码
- 1:初始化环境
- 2:初始化上下文
- 2.1:加载erueka-server配置文件
- 2.2:构造实例信息管理器
- 2.3:初始化erueka-client
- 2.4:处理注册相关的流程
- 2.5:初始化上下文
- 2.6:其他
- 二:客户端注册
- 1:初始化配置
- 1.1:初始化变量
- 1.2:获取配置文件的配置
- 1.3:初始化实例信息
- 1.4:初始化实例信息管理器
- 2:构造EurekaClient
- 3:Eureka Client注册
- 4:总结一下
- 三:Map和注册表
- 1:注册入口(jersey)
- 2:接收注册请求
- 3:存放注册信息
- 4:值得学习的地方
- 5:Client 怎么获取其他客户注册信息
- 5.1:首次获取注册信息
- 5.2:Server 端的注册表缓存
- 四:客户端增量获取注册表
- 1:增量获取引发的问题
- 2:隔多久进行一次增量获取
- 3:变化的数据存放在哪?
- 4:注册表合并
- 五:多级缓存机制
- 1:本地缓存registry
- 2:读写缓存
- 2.1:定时过期
- 2.2:实时过期
- 3:只读缓存
- 4:缓存相关配置
- 5:缓存带来的问题
- 六:心跳机制
- 1:谁发送的心跳请求
- 2:多久发送一次
- 3:如何发送心跳请求
一:启动过程源码
首先呢,Eureka 服务的启动入口在这里:EurekaBootStrap.java 的 contextInitialized ()
方法。
/**
* 在servlet容器初始化时调用,用于启动Eureka服务器上下文
* 此方法中完成了Eureka环境初始化和Eureka服务器上下文的初始化,并将服务器上下文绑定到servlet上下文中
*
* @param event ServletContextEvent对象,包含servlet上下文等信息
*/
public void contextInitialized(ServletContextEvent event) {
try {
// 初始化Eureka运行环境,包括配置、数据目录等
this.initEurekaEnvironment();
// 初始化Eureka服务器上下文,为Eureka服务器的运行做准备
this.initEurekaServerContext();
// 获取servlet上下文对象
ServletContext sc = event.getServletContext();
// 将Eureka服务器上下文对象绑定到servlet上下文中,以便其他部分可以共享
sc.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
} catch (Throwable var3) {
// 记录初始化Eureka服务器过程中的异常信息
logger.error("Cannot bootstrap eureka server :", var3);
// 抛出运行时异常,终止Eureka服务器的启动
throw new RuntimeException("Cannot bootstrap eureka server :", var3);
}
}
1:初始化环境
初始化环境的方法 initEurekaEnvironment(),点进去看下这个方法做了什么。
可以发现就是一个获取配置类的单例
/**
* 初始化Eureka环境配置
* 本方法从配置中读取Eureka的数据中心和环境设置,
* 如果这些设置不存在,则会设置默认值
*
* @throws Exception 如果无法获取配置信息或设置配置属性时抛出异常
*/
protected void initEurekaEnvironment() throws Exception {
// 设置Eureka的配置信息
logger.info("Setting the eureka configuration..");
// 尝试获取Eureka数据中心配置
String dataCenter = ConfigurationManager.getConfigInstance().getString("eureka.datacenter");
if (dataCenter == null) {
// 如果未设置数据中心,则将其设置为默认值
logger.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", "default");
} else {
// 如果设置了数据中心,则使用配置的值
ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", dataCenter);
}
// 尝试获取Eureka环境配置
String environment = ConfigurationManager.getConfigInstance().getString("eureka.environment");
if (environment == null) {
// 如果未设置环境,则将其设置为测试环境
ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", "test");
logger.info("Eureka environment value eureka.environment is not set, defaulting to test");
}
}
// 单例模式
static volatile AbstractConfiguration instance = null;
public static AbstractConfiguration getConfigInstance() {
if (instance == null) {
synchronized (ConfigurationManager.class) {
if (instance == null) {
instance = getConfigInstance(false));
}
}
}
return instance;
}
2:初始化上下文
/**
* 初始化Eureka服务器上下文
* 该方法负责设置Eureka服务器的配置,初始化客户端,设置编码解码器,以及配置AWS特定功能
* 它还负责与对等节点的交互和实例注册
*/
protected void initEurekaServerContext() throws Exception {
// 创建并配置Eureka服务器配置对象
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
// 注册JSON和XML转换器,以支持V1实例信息的转换
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
// 日志记录Eureka客户端初始化
logger.info("Initializing the eureka client...");
logger.info(eurekaServerConfig.getJsonCodecName());
// 创建服务器编码解码器实例
ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
// 应用信息管理器和注册表初始化
ApplicationInfoManager applicationInfoManager = null;
Object registry;
// 如果Eureka客户端未初始化,则进行初始化
if (this.eurekaClient == null) {
// 根据部署环境配置选择合适的实例配置
registry = this.isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager((EurekaInstanceConfig)registry, (new EurekaConfigBasedInstanceInfoProvider((EurekaInstanceConfig)registry)).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
this.eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = this.eurekaClient.getApplicationInfoManager();
}
// 如果运行在AWS环境,则配置AWS特定的实例注册和绑定
if (this.isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(eurekaServerConfig, this.eurekaClient.getEurekaClientConfig(), serverCodecs, this.eurekaClient);
this.awsBinder = new AwsBinderDelegate(eurekaServerConfig, this.eurekaClient.getEurekaClientConfig(), (PeerAwareInstanceRegistry)registry, applicationInfoManager);
this.awsBinder.start();
} else {
// 否则,使用默认的实例注册实现
registry = new PeerAwareInstanceRegistryImpl(eurekaServerConfig, this.eurekaClient.getEurekaClientConfig(), serverCodecs, this.eurekaClient);
}
// 获取对等Eureka节点配置
PeerEurekaNodes peerEurekaNodes = this.getPeerEurekaNodes((PeerAwareInstanceRegistry)registry, eurekaServerConfig, this.eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager);
// 创建和设置Eureka服务器上下文
this.serverContext = new DefaultEurekaServerContext(eurekaServerConfig, serverCodecs, (PeerAwareInstanceRegistry)registry, peerEurekaNodes, applicationInfoManager);
EurekaServerContextHolder.initialize(this.serverContext);
this.serverContext.initialize();
logger.info("Initialized server context");
// 与对等节点同步并更新实例注册表
int registryCount = ((PeerAwareInstanceRegistry)registry).syncUp();
((PeerAwareInstanceRegistry)registry).openForTraffic(applicationInfoManager, registryCount);
// 注册所有监控统计
EurekaMonitors.registerAllStats();
}
初始化上下文的时序图如下:
2.1:加载erueka-server配置文件
基于接口的方式,获取配置项。
initEurekaServerContext 方法创建了一个 eurekaServerConfig 对象:
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
EurekaServerConfig 是一个接口,里面定义了很多获取配置项的方法。
和定义常量来获取配置项的方式不同。比如获取 AccessId 和 SecretKey。
String getAWSAccessId();
String getAWSSecretKey();
还有另外一种获取配置项的方式:Config.get(Constants.XX_XX)
,这种方式和上面的接口的方式相比:
- 常量的方式较容易取错变量。
- 常量的方式不易于修改。
创建默认的eureka server配置
new DefaultEurekaServerConfig (),会创建出一个默认的 server 配置,构造方法会调用 init 方法:
public DefaultEurekaServerConfig() {
init();
}
/**
* 初始化方法,用于配置Eureka的环境属性和属性文件加载
*
* 该方法首先从配置管理器中读取eureka.environment属性,如果没有设置则默认为"test"
* 然后设置archaius.deployment.environment属性,使其用于部署环境标识
* 接着尝试从预定义的Eureka属性文件中加载配置,如果文件不存在,则记录警告信息
* 这对于依赖其他环境特定属性或不同配置机制的情况可能是可接受的
*/
private void init() {
// 获取环境配置,如果没有指定,则默认为"test"
String env = ConfigurationManager.getConfigInstance().getString("eureka.environment", "test");
// 设置部署环境属性,以便Archaius识别环境
ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", env);
// 获取Eureka属性文件名
String eurekaPropsFile = EUREKA_PROPS_FILE.get();
try {
// 从资源中加载Eureka属性文件
ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile);
} catch (IOException var4) {
// 如果找不到属性文件,记录警告信息
logger.warn("Cannot find the properties specified : {}. This may be okay if there are other environment specific properties or the configuration is installed with a different mechanism.", eurekaPropsFile);
}
}
前两行是设置环境名称,后面几行是关键语句:获取配置文件,并放到 ConfigurationManager 单例中。
来看下 EUREKA_PROPS_FILE.get(); 做了什么。首先 EUREKA_PROPS_FILE 是这样定义的:
private static final DynamicStringProperty EUREKA_PROPS_FILE = DynamicPropertyFactory
.getInstance().getStringProperty("eureka.server.props", "eureka-server");
用单例工厂 DynamicPropertyFactory 设置了默认值 eureka-server,然后 EUREKA_PROPS_FILE.get() 就会从缓存里面这个默认值。
然后再调用 loadCascadedPropertiesFromResources 方法,来加载配置文件。(其实就是加载eureka-server.properties
)
真正的配置项在哪里?
上面可以看到 eureka-server.properties 都是空的,那配置项都配置在哪呢?
DefaultEurekaServerConfig 是实现了 EurekaServerConfig 接口的,如下所示:
public class DefaultEurekaServerConfig implements EurekaServerConfig
在 EurekaServerConfig 接口里面定义很多 get 方法,而 DefaultEurekaServerConfig 实现了这些 get 方法,来看下怎么实现的
@Override
public int getWaitTimeInMsWhenSyncEmpty() {
return configInstance.getIntProperty(
namespace + "waitTimeInMsWhenSyncEmpty", (1000 * 60 * 5)).get();
}
里面的类似这样的 getXX 的方法,都有一个 default value,比如上面的是 1000 * 60 * 5
所以我们可以知道,配置项是在 DefaultEurekaServerConfig 类中定义的。
所以如果配置文件中没有配置,则用 DefaultEurekaServerConfig 定义的默认值
小结一下:
(1)创建一个 DefaultEurekaServerConfig 对象,实现了 EurekaServerConfig 接口,里面有很多获取配置项的方法。
(2)DefaultEurekaServerConfig 构造函数中调用了 init 方法。
(3)init 方法会加载 eureka-server.properties 配置文件,把里面的配置项都放到一个 map 中,然后交给 ConfigurationManager 来管理。
(4)DefaultEurekaServerConfig 对象里面有很多 get 方法,里面通过硬编码定义了配置项的名称,当调用 get 方法时,调用的是 DynamicPropertyFactory 的获取配置项的方法,这些配置项如果在配置文件中有,则用配置项的。
(5)配置文件中的配置项是通过ConfigurationManager 赋值给 DynamicPropertyFactory 的。
(6)当要获取配置项时,就调用对应的 get 方法,如果配置文件没有配置,则用默认值
2.2:构造实例信息管理器
创建了一个 ApplicationInfoManager 对象,服务配置管理器
Application 可以理解为一个 Eureka client,作为一个应用程序向 Eureka 服务注册的
构造服务器实例instanceInfo
另外一个参数是 EurekaConfigBasedInstanceInfoProvider,这个 Provider 是用来构造 instanceInfo(服务实例)
怎么构造出来的呢?用到了设计模式中的构造器模式
,而用到的配置信息就是从 EurekaInstanceConfig 里面获取到的。
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);
builder.setXX
...
instanceInfo = builder.build();
小结
(1)初始化服务实例的配置 instanceConfig 。
(2)用构造器模式初始化服务实例 instanceInfo。
(3)将 instanceConfig 和 instanceInfo 传给了 ApplicationInfoManager,交由它来管理
2.3:初始化erueka-client
eurekaClient 是包含在 eureka-server 服务中的,用来跟其他 eureka-server 进行通信的
为什么还会有其他 eureka-server,因为在集群环境中,是会有多个 eureka 服务的,而服务之间是需要相互通信的。
初始化eureka代码:
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
第一行又是初始化了一个配置,和之前初始化 server config,instance config 的地方很相似。
也是通过接口方法里面的 DynamicPropertyFactory 来获取配置项的值。
eureka-client 也有一个加载配置文件的方法:
Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
这个文件就是 eureka-client.properties
。
初始化配置的时候还初始化了一个 DefaultEurekaTransportConfig()
,可以理解为传输的配置
第二行代码,创建了一个 DiscoveryClient 对象,赋值给了 eurekaClient
创建 DiscoveryClient 对象的过程非常复杂,我们来细看下。
(1)拿到 eureka-client 的 config 、transport 的 config、instance 实例信息。
(2)判断是否要获取注册表信息,默认会获取。
(3)判断是否要把自己注册到其他 eureka 上,默认会注册
(4)创建了一个支持任务调度的线程池 - scheduler
(5)创建了一个支持心跳检测的线程池 - heartbeatExecutor
(6)创建了一个支持缓存刷新的线程池 - cacheRefreshExecutor
(7)创建了一个支持 eureka client 和 eureka server 进行通信的对象
(8)初始化调度任务
- 这个里面就会根据 fetch-registry 来判断是否需要开始调度执行刷新注册表信息,默认 30 s 调度一次。
- 这个刷新的操作是由一个 CacheRefreshThread 线程来执行的。
- 同样的,也会根据 register-with-eureka 来判断是否需要开始调度执行发送心跳,默认 30 s 调度一次。
- 这个发送心跳的操作由一个 HeartbeatThread 线程来执行的。
- 然后还创建了一个实例信息的副本,用来将自己本地的 instanceInfo 实例信息传给其他服务。什么时候发送这些信息呢?
- 又创建了一个监听器 statusChangeListener,这个监听器监听到状态改变时,就调用副本的 onDemandUpdate() 方法,将 instanceInfo 传给其他服务。
2.4:处理注册相关的流程
创建了一个 PeerAwareInstanceRegistryImpl
对象,通过名字可以知道是可以感知集群实例注册表的实现类
- 处理所有的拷贝操作到其他节点,让他们保持同步。复制的操作包含 注册,续约,摘除,过期和状态变更。
- 当 eureka server 启动后,它尝试着从集群节点去获取所有的注册信息。如果获取失败了,当前 eureka server 在一段时间内不会让其他应用获取注册信息,默认 5 分钟。
- 自我保护机制:如果应用丢失续约的占比在一定时间内超过了设定的百分比,则 eureka 会报警,然后停止执行过期应用
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
PeerAwareInstanceRegistryImpl 继承 AbstractInstanceRegistry 抽象类,构造函数主要做了以下事情:
- 初始化 server config 和 client config 的配置信息。
- 初始化摘除的队列,队列长度为 1000。
- 初始化注册的队列。
// AbstractInstanceRegistry
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
this.overriddenInstanceStatusMap = CacheBuilder.newBuilder().initialCapacity(500).expireAfterAccess(1L, TimeUnit.HOURS).build().asMap();
this.recentlyChangedQueue = new ConcurrentLinkedQueue();
this.readWriteLock = new ReentrantReadWriteLock();
this.read = this.readWriteLock.readLock();
this.write = this.readWriteLock.writeLock();
this.lock = new Object();
this.deltaRetentionTimer = new Timer("Eureka-DeltaRetentionTimer", true);
this.evictionTimer = new Timer("Eureka-EvictionTimer", true);
this.evictionTaskRef = new AtomicReference();
this.allKnownRemoteRegions = EMPTY_STR_ARRAY;
// 初始化 server config 和 client config 的配置信息。
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
// 初始化摘除的队列,队列长度为 1000。
this.recentCanceledQueue = new CircularQueue(1000);
// 初始化注册的队列。
this.recentRegisteredQueue = new CircularQueue(1000);
this.renewsLastMin = new MeasuredRate(60000L);
this.deltaRetentionTimer.schedule(this.getDeltaRetentionTask(), serverConfig.getDeltaRetentionTimerIntervalInMs(), serverConfig.getDeltaRetentionTimerIntervalInMs());
}
2.5:初始化上下文
创建了一个PeerEurekaNodes
,它是一个帮助类,来管理集群节点的生命周期。
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
创建了一个 DefaultEurekaServerContext
默认上下文。
serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
创建了一个 holder,用来持有上下文。
其他地方想要获取上下文,就通过 holder 来获取。用到了单例模式
EurekaServerContextHolder.initialize(serverContext);
// holder 的 initialize() 初始化方法是一个线程安全的方法。
public static synchronized void initialize(EurekaServerContext serverContext) {
holder = new EurekaServerContextHolder(serverContext);
}
// 定义了一个静态的私有的 holder 变量
private static EurekaServerContextHolder holder;
// 其他地方想获取 holder 的话,就通过 getInstance() 方法来获取 holder
public static EurekaServerContextHolder getInstance() {
return holder;
}
// 然后想要获取上下文的就调用 holder 的 getServerContext() 方法
public EurekaServerContext getServerContext() {
return this.serverContext;
}
调用 serverContext 的 initialize() 方法来初始化。
public void initialize() throws Exception {
logger.info("Initializing ...");
// 这个里面就是启动了一个定时任务,将集群节点的 URL 放到集合里面,这个集合不包含本地节点的 url
// 每隔一定时间,就更新 eureka server 集群的信息
peerEurekaNodes.start();
// 这个里面会初始化注册表,将集群中的 注册信息获取下,然后放到注册表里面
registry.init(peerEurekaNodes);
logger.info("Initialized");
}
2.6:其他
从相邻节点拷贝注册信息
int registryCount = registry.syncUp();
/**
* 同步上级注册中心的数据
* 该方法尝试将当前注册中心的数据与上级注册中心同步
* 它会重试一定次数,每次重试前可能会等待一段时间
*
* @return 同步的实例数量
*/
public int syncUp() {
int count = 0;
// 遍历注册中心配置的重试次数
for(int i = 0; i < this.serverConfig.getRegistrySyncRetries() && count == 0; ++i) {
// 如果不是第一次尝试,则等待一段时间后重试
if (i > 0) {
try {
Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException var10) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 获取上级注册中心的应用信息
Applications apps = this.eurekaClient.getApplications();
Iterator var4 = apps.getRegisteredApplications().iterator();
// 遍历每个应用
while(var4.hasNext()) {
Application app = (Application)var4.next();
Iterator var6 = app.getInstances().iterator();
// 遍历每个实例
while(var6.hasNext()) {
InstanceInfo instance = (InstanceInfo)var6.next();
try {
// 如果实例是可注册的,则尝试注册
if (this.isRegisterable(instance)) {
this.register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
++count;
}
} catch (Throwable var9) {
logger.error("During DS init copy", var9);
}
}
}
}
// 返回同步的实例数量
return count;
}
eureka监控
EurekaMonitors.registerAllStats();
/**
* 注册所有监控项
* 本方法遍历EurekaMonitors枚举中的所有监控项,并使用Monitors类的registerObject方法将它们注册为监控对象
* 这是为了确保所有定义在EurekaMonitors枚举中的监控项都能被系统识别并监控
*/
public static void registerAllStats() {
// 获取所有监控项
EurekaMonitors[] var0 = values();
// 获取监控项的数量
int var1 = var0.length;
// 遍历所有监控项
for(int var2 = 0; var2 < var1; ++var2) {
// 获取当前监控项
EurekaMonitors c = var0[var2];
// 注册当前监控项
Monitors.registerObject(c.getName(), c);
}
}
二:客户端注册
Eureka Client 就是客户端,可以是 Eureka Server 自身,也可以是要注册的服务实例,比如订单服务、商品服务等。
后续讲到 @EnableEurekaClient 注解时,其实是将当前 Application 当作一个 eureka client,注册到 eureka 服务上。
那么 Eureka Client 是如何注册的呢?
我们可以通过 Eureka 源码提供的示例类 ExampleEurekaClient 来看下 Eureka Client 的构造和注册过程。
public static void main(String[] args) throws UnknownHostException {
injectEurekaConfiguration();
ExampleEurekaClient sampleClient = new ExampleEurekaClient();
// create the client
ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
// shutdown the client
eurekaClient.shutdown();
}
1:初始化配置
1.1:初始化变量
injectEurekaConfiguration() 方法初始化了 Eureka 的一些变量,比如端口号、当前服务的访问路径、是否需要抓取注册表信息等等
private static void injectEurekaConfiguration() throws UnknownHostException {
String myHostName = InetAddress.getLocalHost().getHostName();
String myServiceUrl = "http://" + myHostName + ":8080/v2/";
System.setProperty("eureka.region", "default");
System.setProperty("eureka.name", "eureka");
System.setProperty("eureka.vipAddress", "eureka.mydomain.net");
System.setProperty("eureka.port", "8080");
System.setProperty("eureka.preferSameZone", "false");
System.setProperty("eureka.shouldUseDns", "false");
System.setProperty("eureka.shouldFetchRegistry", "false");
System.setProperty("eureka.serviceUrl.defaultZone", myServiceUrl);
System.setProperty("eureka.serviceUrl.default.defaultZone", myServiceUrl);
System.setProperty("eureka.awsAccessId", "fake_aws_access_id");
System.setProperty("eureka.awsSecretKey", "fake_aws_secret_key");
System.setProperty("eureka.numberRegistrySyncRetries", "0");
}
1.2:获取配置文件的配置
在这一行代码中,将配置文件 eureka-client.properties 中的配置读取后,放到了 EurekaInstanceConfig 中。
这个 EurekaInstanceConfig 是用来初始化 applicationInfoManager 信息管理器的。
看下面代码,创建了一个 MyDataCenterInstanceConfig,其实就是创建了 EurekaInstanceConfig。
new MyDataCenterInstanceConfig()
那 MyDataCenterInstanceConfig 和 EurekaInstanceConfig 是什么关系呢?
从类图关系中可以看到 MyDataCenterInstanceConfig 继承 PropertiesInstanceConfig 类,实现了 EurekaInstanceConfig 接口。
这种接口之前专门讲过,通过接口来获取配置信息,类似这种方法 getXX()。
然后在 PropertiesInstanceConfig 类的构造函数调用了一个工具类,读取了配置文件 eureka-client.properties 中的值
Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
1.3:初始化实例信息
InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
1.4:初始化实例信息管理器
就是将 instanceConfig 和 instanceInfo 交给实例信息管理器来管理。
applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
2:构造EurekaClient
eurekaClient = new DiscoveryClient(applicationInfoManager, clientConfig);
DiscoveryClient 是 EurekaClient 的子类,构造 DiscoveryClient做了以下几件事:
- 加载配置文件
- 初始化网络传输组件
- 将服务实例配置、配置文件配置、网络传输组件都赋值给了 DiscoveryClient。
- 初始化两个线程,一个用来心跳检测,一个用来刷新缓存。
- 初始化网络通信组件 EurekaTransport
- 尝试抓取注册表信息,如果没有抓取到,则从备用的注册表中获取。
- 初始化调度任务的方法中,启动了定时调度任务:心跳检测 heartbeat、缓存刷新 cacheRefresh
- 初始化调度任务的方法中,初始化了一个 InstanceInfoReplicator,用来向 eureka server 注册的。
- 初始化调度任务的方法中,初始化了一个状态变更的监听器 StatusChangeListener,这个里面也有注册的逻辑。
/**
* 初始化调度任务
* 根据客户端配置,初始化与Eureka服务器的定时交互任务,包括缓存刷新和心跳任务
*/
private void initScheduledTasks() {
// 定义重续间隔时间和指数退避界限变量
int renewalIntervalInSecs;
int expBackOffBound;
// 如果需要从Eureka服务器获取注册表信息
if (this.clientConfig.shouldFetchRegistry()) {
// 获取注册表获取间隔时间和缓存刷新执行器的指数退避界限
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 创建缓存刷新任务并安排定时执行
this.cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread());
this.scheduler.schedule(this.cacheRefreshTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
// 如果需要向Eureka服务器注册当前实例
if (this.clientConfig.shouldRegisterWithEureka()) {
// 获取实例租约的重续间隔时间和心跳执行器的指数退避界限
renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
// 记录心跳任务的日志信息
logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);
// 创建心跳任务并安排定时执行
this.heartbeatTask = new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread());
this.scheduler.schedule(this.heartbeatTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS);
// 初始化实例信息复制器和状态变更监听器
this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
this.statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}
public void notify(StatusChangeEvent statusChangeEvent) {
// 当本地状态变更时,触发实例信息的即时更新
DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
}
};
// 如果配置为按需更新状态变更
if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {
this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
}
// 启动实例信息复制器
this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
// 如果配置为不向Eureka服务器注册,记录日志信息
logger.info("Not registering with Eureka server per configuration");
}
}
3:Eureka Client注册
Eureka Client 向 Server 注册的代码隐藏的比较深,很难找到,不是直接调用注册的方法,而是通过一个后台线程去做的
而且调用注册方法的类的名字起得也有争议,叫做 InstanceInfoReplicator
,“Replicator” 是拷贝副本的意思,而注册其实不是拷贝副本,而是将新的注册信息发送到 eureka server 上去的,所以这个类的名字起得不太好,这也是容易造成找不到注册代码的一个问题。
下面来看下 eureka client 是怎么向 eureka server 注册的。
- 注册是通过 InstanceInfoReplicator 类来注册的。它是在构造 DiscoveryClient 时创建出来的
- 然后将一个标志位设置为 true,用来标记是否注册过了
- 然后调用注册的方法
- 发送 post 注册请求
4:总结一下
Eureka Client 向 Eureka Server 注册的过程:
(1)Eureka Client 初始化了一个 DiscoveryClient,抓取注册表,执行调度任务。
(2)InstanceInfoReplicator 对象启动了一个延迟 40 s 的后台线程,执行注册。
(3)然后使用 AbstractJersey2EurekaHttpClient 发送 post 请求,将 instanceInfo 实例信息发送给 Eureka Server。
三:Map和注册表
1:注册入口(jersey)
Eureka Client 是通过发送 http 请求来注册的,那么肯定是有一个地方来接收这个 http 请求的,也就是注册入口。这是怎么玩的呢?
其实是用到了 jersey 框架,这个框架不用深究,我们只需要知道这个框架在哪引用以及做什么事情的就可以了
可以把 jersey 类比 mvc 框架,jersey 有 servlet 专门处理 http 请求。引用 jersey 框架的地方:
\eureka\eureka-server\src\main\webapp\WEB-INF\web.xml
然后处理 HTTP 请求的 controller 在哪呢?
其实是在 eureka-core 项目的 resources 目录下,里面定义了很多的 Resource 结尾的类,它们就是用来处理 HTTP 请求的。
\eureka\eureka-core\src\main\java\com\netflix\eureka\resources
通过XxResource 类的英文注释我们也可以知道,这个 jersey resource 类是用来处理 HTTP 请求的。
A jersey resource that handles request
2:接收注册请求
ddInstance 方法里面的核心代码就是
registry.register(info, true);
registry
就是 PeerAwareInstanceRegistryImpl 的实例对象。它实现了 PeerAwareInstanceRegistry 接口。
调用它的 register() 方法后会调用抽象类 AbstractInstanceRegistry 的 register() 方法,核心代码就是在这个抽象类的 register() 方法。
另外要说下的就是上面的抽象类和接口分别实现和继承了接口 InstanceRegistry。
3:存放注册信息
我们看到源码里面定义了一个 gNewMap,是 ConcurrentHashMap,然后赋值给了 gMap 变量
ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap
所以其实是用 gMap 变量来存注册信息的。我们来分析 gMap 的结构。
首先 gMap 是 ConcurrentHashMap 结构,所以就是 key-value 这种键值对的。
key
就是一个 唯一 id,String 类型。值类似这种:i-00000004
value
里面存的是Lease<InstanceInfo>
。Lease
是一个类,里面持有一个 instanceInfo 的 holder。这个 instanceInfo 就是注册过来的服务实例信息,包含 ip 地址,端口号等。
把服务实例信息放到 gMap 中也很简单,调用 put 方法就可以了。
gMap.put(registrant.getId(), least);
4:值得学习的地方
为什么使用ConcurrentHashMap而不是hashMap
- 并发安全,在并发编程中使用 HashMap 可能造成死循环 ( JDK 1.7 和 1.8 可能会造成数据丢失
- HashTable 效率非常较低
ConcurrentHashMap的底层原理
在JDK1.7中,ConcurrentHashMap使用Segment数组+HashEntry数组+单向链表的方式实现。
而Segment继承了ReentrantLock,所以Segment对象也可以作为ConcurrentHashMap中的锁资源使用。
如上,ConcurrentHashMap的每个Segment(段)相当于一个HashTable容器【所有的方法都是synchronized的,所以是全局锁】
而Segment数组长度默认为16,但在创建时可以指定段数,必须为2的次幂,如果不为2的次幂则会自优化。
在写入数据时都会分段上锁,每个段之间互不影响。而当有线程读取数据时则不会加锁,但是在一个数据在读的时候发生了修改则会重新加锁读取一次。
在JDK1.8+,采用了更轻量级的数组+链表(Node)+红黑树(TreeBin -> TreeNode)+CAS+Synchronized关键字实现。
// Node节点数组,该数组中每个位置要存储的元素为每个链表的头节点
transient volatile Node<K,V>[] table;
// 在扩容时过渡用的table表,扩容时节点会暂时转迁到这个数组
private transient volatile Node<K,V>[] nextTable;
// 计数器值=baseCount+每个CounterCell[i].value。所以baseCount只是计数器的一部分
private transient volatile long baseCount;
// 这个值在不同情况时存放值都不同,主要有如下几种情况:
// 1. 数组没新建时,暂时存储数组容量大小的数值
// 2. 数组正在新建时,该成员为-1
// 3. 数组正常情况时,存放阈值
// 4. 数组扩容时,高16bit存放旧容量唯一对应的一个标签值,低16bit存放进行扩容的线程数量
private transient volatile int sizeCtl;
//扩容时使用,正常情况时=0,扩容刚开始时为容量,代表下一次领取的扩容任务的索引上界
private transient volatile int transferIndex;
//CounterCell相配套一个独占锁
private transient volatile int cellsBusy;
//counterCells也是计数器的一部分
private transient volatile CounterCell[] counterCells;
// 三种特殊的节点哈希值,一个节点正常的哈希值都为>=0的正数
// 此节点是扩容期间的转发节点,这个节点持有新table的引用
static final int MOVED = -1;
// 代表此节点是红黑树的节点
static final int TREEBIN = -2;
// 代表此节点是一个占位节点,不包含任何实际数据
static final int RESERVED = -3;
为什么分成读锁和写锁
在没有读写锁之前,假设使用普通的 ReentrantLock,那么虽然保证了线程安全,但是也浪费了一定的资源
因为如果多个读操作同时进行,其实并没有线程安全问题,可以允许让多个读操作并行,以便提高程序效率。
但是写操作不是线程安全的,如果多个线程同时写,或者在写的同时进行读操作,便会造成线程安全问题。
读写锁就解决了这样的问题,它设定了一套规则,既可以保证多个线程同时读的效率,同时又可以保证有写入操作时的线程安全。
读锁: 允许多个线程获取读锁,同时访问同一个资源。
写锁: 只允许一个线程获取写锁,不允许同时访问同一个资源。
读读共享、写写互斥、读写互斥、写读互斥
5:Client 怎么获取其他客户注册信息
5.1:首次获取注册信息
首先我们想一下,服务 B 发送注册请求到注册中心了,那服务 A 就得获取注册表了吧,服务 A 本地一开始肯定是没有注册表信息的,那肯定就得到注册中心把注册表全部拉取一遍了。(这里服务 A 也称作 Eureka 客户端)
服务 A 对于注册中心来说,就是
初次见面
,服务 A 想把所有
注册信息都在自己本地存一份,方便后续的 API 调用。
首次获取注册信息就是用在 DiscoveryClient 初始化的时候获取的。我们可以从源码中找到如下判断:
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
上面代码的意思就是如果配置了注册表将会发送全量获取注册表的请求
就是先根据是否配置了 shouldFetchRegistry,如果配置了,则会调用 fetchRegistry 方法获取注册表。
因为是新的 client,所以肯定是没有注册信息的,所以已注册的 client 的个数是否等于 0。
然后根据几个条件来判断是否需要全量获取注册表,满足其中一个条件就会全量获取:
- 条件一:是否强制全量获取。传的 false,不需要全量。
- 条件二:注册表信息是否为空。applications== null,为空,需要全量获取。
- 条件三:获取已注册的 client 的个数是否等于 0。是的,需要全量获取。
5.2:Server 端的注册表缓存
Server 端会把注册表放到缓存里面,读取注册表其实是从缓存里面读取出来的。
分为两级缓存,只读缓存 readOnlyCacheMap
和读写缓存 readOnlyCacheMap
。
if (userReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
然后,Eureka Client 获取注册表信息后,就会存到本地 localRegionApps 变量中。
这样 Client 就会有一份 Server 的注册表信息了
localRegionApps.set(this.filterAndShuffle(apps));
四:客户端增量获取注册表
1:增量获取引发的问题
上面我们说到,当第一次获取全量信息后,本地就有注册信息了。
那如果 Server 的注册表有更新,比如有服务注册、下线,Client 必须要重新获取一次注册表信息才行。
那是否可以重新全量拉取一次呢?
可以是可以,但是,如果注册表信息很大呢?比如有几百个微服务都注册上去了,那一次拉取是非常耗时的,而且占用网络带宽,性能较差,这种方案是不靠谱的。
所以我们就需要用增量拉取注册信息表的方式,也就是说只拉取变化的数据,这样数据量就比较小了。如下图所示:
从源码里面我们可以看到,Eureka Client 通过调用 getAndUpdateDelta 方法获取增量的变化的注册表数据,Eureka Server 将变化的数据返回给 Client。
这里就有几个问题:
(1)Client 隔多久进行一次增量获取?
(2)Server 将变化的数据存放在哪里?
(3)Client 如何将变化的数据合并到本地注册表里面?
2:隔多久进行一次增量获取
默认每隔 30 s 执行一次同步,如下图所示:
Eureka 每 30 s 会调用一个后台线程去拉取增量注册表,这个后台线程的名字叫做:cacheRefresh
client发送拉取注册表的请求
就是调用 getDelta 方法,发送 HTTP请求调用 jersey 的 restful 接口。
然后 Server 端的 Jersey 框架就会去处理这个请求了。
发送请求的方法 getDelta 如下所示:
eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
restful 接口的地址就长这样:
http://localhost:8080/v2/apps/delta
那么 Server 端如何过滤出增量的注册表信息呢?我们可以找到这个方法:getContainerDifferential。
/**
* 获取容器的差异信息
*
* @param version 版本信息,用于指定返回的数据格式版本
* @param acceptHeader 客户端可接受的媒体类型
* @param acceptEncoding 客户端可接受的编码方式,用于压缩响应内容
* @param eurekaAccept 与Eureka相关的Accept头信息
* @param uriInfo URI信息,可用于构建响应的URL
* @param regionsStr 可选参数,指定需要返回差异信息的区域,多个区域以逗号分隔
* @return 返回一个Response对象,包含容器的差异信息如果请求被禁止,则返回403状态码
*/
@Path("delta")
@GET
public Response getContainerDifferential(@PathParam("version") String version, @HeaderParam("Accept") String acceptHeader, @HeaderParam("Accept-Encoding") String acceptEncoding, @HeaderParam("X-Eureka-Accept") String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
// 判断是否有远程区域的请求
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
// 检查是否应禁用差异信息功能,以及是否允许访问远程区域
if (!this.serverConfig.shouldDisableDelta() && this.registry.shouldAllowAccess(isRemoteRegionRequested)) {
String[] regions = null;
// 如果没有请求远程区域,增加本地区域的监控计数器
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
// 对请求的远程区域进行处理和监控
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions);
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}
// 设置当前请求的版本信息
CurrentRequestVersion.set(Version.toEnum(version));
// 默认使用JSON格式和键类型
Key.KeyType keyType = KeyType.JSON;
String returnMediaType = "application/json";
// 根据Accept头信息确定返回格式
if (acceptHeader == null || !acceptHeader.contains("json")) {
keyType = KeyType.XML;
returnMediaType = "application/xml";
}
// 构建缓存键
Key cacheKey = new Key(EntityType.Application, "ALL_APPS_DELTA", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
// 根据Accept-Encoding头信息,决定是否压缩响应内容
Response response;
if (acceptEncoding != null && acceptEncoding.contains("gzip")) {
response = Response.ok(this.responseCache.getGZIP(cacheKey)).header("Content-Encoding", "gzip").header("Content-Type", returnMediaType).build();
} else {
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
// 清除当前请求的版本信息
CurrentRequestVersion.remove();
return response;
} else {
// 如果差异信息功能被禁用或不允许访问指定区域,返回403状态码
return Response.status(Status.FORBIDDEN).build();
}
}
3:变化的数据存放在哪?
其实就是放在这个队列里面:recentlyChangedQueue。
它的数据结构是一个并发安全的链表队列 ConcurrentLinkedQueue。链表里面存放的元素就是最近变化的注册信息 RecentlyChangedItem。
ConcurrentLinkedQueue<RecentlyChangedItem>
ConcurrentLinkedQueue 是由链表结构组成的线程安全的先进先出无界队列。如下图所示:
下面是增量数据的内部构造
什么样的数据才是最近改变的数据呢?
通过源码我们找到了这个默认配置,三分钟
刷新一次,也就是 180s 刷新一次。
那刷新了什么?刷新其实是会遍历这个队列:recentlyChangedQueue。
将队列里面的所有元素都遍历一遍,比对每个对象的最后更新时间是否超过了三分钟,如果超过了,就移除这个元素。【最后更新时间 + 180s < 当前时间】
当元素的最后更新时间超过 3 分钟未更新,则移除该元素。
检查间隔是30s,也就是每过30s就会调用一次检查任务。
- Client 每隔 30 秒调用一次增量获取注册表的接口。
- Server 每隔 30 秒调用检查一次队列。
- 如果队列中有元素在 3 分钟以内都没有更新过,则从队列中移除该元素
4:注册表合并
这里有个问题:客户端首次拿到的全量注册表,存放本地了。第二次拿到的是增量的注册表,怎么将两次的数据合并在一起呢?如下图所示:
- 首先就会遍历增量注册表,检查其中的每一项,不论 actionType 是新增、删除还是更新,如果本地本来就有,则执行后续的类型判断逻辑。
- 如果实例信息的名字在本地不存在则会先往本地注册表新增一个注册信息。然后本地肯定存在注册信息了,执行后续的判断逻辑。
- 当类型字段 actionType 等于新增或更新时,先删除后增加。
- 当类型字段 actionType 等于删除时,直接进行删除。
对比注册表
经过重重判断 + 合并操作,客户端终于完成了本地注册表的刷新,理论上来说,这个时候客户端的注册表应该和注册中心的注册表一致了。
但是如何确定是一致的呢?这里我们来考虑几种方案:
- 再全量拉取一次注册表,和本地注册表进行比对。但是既然又要做一次全量拉取,那之前的增量拉取就没有必要了。
- 拉取增量注册表,Server 返回全量注册表的实例 id,客户端比对每个实例 id 是否存在,以及检查本地是否有多余的,如果能匹配上,则认为是一致的。但是这里也有一个问题,对于新增和更新的注册实例,得把更新的实例信息的字段一一比对才能确定是否一致,这就太麻烦了。另外还有一个致命的问题:如果客户端因为网络故障下线了,上一次最近 3 分钟的增量数据没有拉取到,那么相当于丢失了一次增量数据,这个时候,就不是完整的注册表信息了。
有没有既方便又准确的比对方式呢?
有的,那就是哈希比对
。哈希比对的意思就是将两个对象经过哈希算法计算出两个 hash 值,如果两个 hash 值相等,则认为这两个对象相等。
这种方式在代码中也非常常见,比如类的 hashcode() 方法。
从源码中,我们看到 Eureka Server 返回注册表时,会返回一个 hash 值,是将全量注册表 hash 之后的值。调用的是这个方法:getReconcileHashCode()。
如下图所示,获取增量注册表的接口,会返回增量注册表和 hashcode。
五:多级缓存机制
Eureka 注册中心的 Server 端有三级缓存来保存注册信息,可以利用缓存的快速读取来提高系统性能。我们再来细看下:
一级缓存:只读缓存 readOnlyCacheMap
,数据结构 ConcurrentHashMap。相当于数据库。
二级缓存:读写缓存 readOnlyCacheMap
,Guava Cache。相当于 Redis 主从架构中主节点,既可以进行读也可以进行写。
三级缓存:本地注册表 registry
,数据结构 ConcurentHashMap。相当于 Redis 主从架构的从节点,只负责读。
if (userReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
- 默认会先从
只读
缓存里面找。 - 没有的话,再从
读写
缓存里面找。 - 找到了的话就更新只读缓存,并返回找到的缓存。
- 还找不到的话,就从本地缓存 registry 中加载进来。
三级缓存数据怎么来的?缓存数据如何更新的?缓存如何过期?
1:本地缓存registry
我们先来看下本地缓存 registry,它是一种定义为 ConcurrentHashMap 的数据结构,之前也详细讲解过。
当客户端发起注册请求的时候,就会把注册信息放到 registry 中。如下代码所示:
registry.putIfAbsent(app)
经过 putIfAbsent 操作就把客户端的注册信息放到 registry 中了。
2:读写缓存
读写缓存,顾名思义,就是既可以进行读,也可以进行写的缓存。
- 读主要是给只读缓存来读取的。
- 写主要是将缓存更新到自己的 Map 中。
读写缓存用的是 Guava Cache工具类,这篇不会深究。简单来说就是当访问读写缓存时,如果这个 key 在缓存中不存在,则从本地去查,查到后再放回缓存。
然后又实现抽象方法 load(key),这个方法的作用就是当读写缓存中没有,则从本地 registry 缓存中拿。
读写缓存过期的时候其实分两种:定时过期和实时过期。
2.1:定时过期
当构建这个读写缓存时,就会定义间隔多久过期整个读写缓存。如下代码所示,180 s 会定时过期读写缓存:
expireAfterWrite(180s)
2.2:实时过期
当有新的服务实例进行注册或者下线、发生故障时,就会把这个对应的服务实例的缓存给过期掉。
如下图所示,最上面的时注册中心,下面三个是服务实例。
服务实例发生注册、下线、发生故障,注册中心都是可以感知到的,然后就会主动过期读写缓存对应的服务实例。
/**
* 无效化缓存
*
* @param appName 应用程序名称,不能为空
* @param vipAddress 虚拟IP地址,可能为空
* @param secureVipAddress 安全虚拟IP地址,可能为空
*
* 此方法用于在缓存中无效化特定应用程序及其关联的虚拟IP地址
* 如果提供了虚拟IP地址,则仅无效化与这些地址关联的缓存项
* 如果未提供虚拟IP地址,则无效化与应用程序相关的所有缓存项
*/
private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
this.responseCache.invalidate(appName, vipAddress, secureVipAddress);
}
3:只读缓存
只读缓存 readOnlyCacheMap,有一个定时更新的机制,每隔 30 秒就会更新一次只读缓存中的某些 key(读写缓存 -> 30s -> 只读缓存)。
它其实是遍历自己的所有注册信息,然后和读写缓存进行比对,如果注册信息不一致,则替换为读写缓存的数据。
源码如下,有一个定时调度任务,每隔 30 秒调度一次。
另外当客户端获取注册信息时,也会先读只读缓存,如果只读缓存中没有,则会从读写缓存中找,找到后就放到只读缓存中。
如果读写缓存中没有,则从本地注册表 registry 中加载到读写缓存中,然后将注册表信息返回。
既然这个缓存叫做只读缓存,怎么还能被更新,不应该是不变的吗?
其实这里的不变是相对于客户端来说的,客户端获取注册表信息时,最开始访问的就是只读缓存,类似数据库或 Redis 的主从架构,主负责读写,从负责读。
然后系统内部会把主节点的信息同步给从节点。
4:缓存相关配置
- eureka.server.useReadOnlyResponseCache -> 是否开启只读缓存,默认是true
- eureka.server.responseCacheUpdateIntervalMs -> 默认每隔 30 秒将读写缓存更新的缓存同步到只读缓存
5:缓存带来的问题
三级缓存看似可以带来性能的提升。但是也会引入其他问题,比如缓存不一致问题。
只读缓存每隔 30s 才会刷新一次,和读写缓存会造成数据的不一致,客户端在 30s 内获取的注册表信息是滞后的。
当使用 Eureka 集群时,这种缓存不一致的问题会更明显,不同的节点之间也会出现只读缓存的数据不一致,所以 Eureka 只能保证高可用,并不能保证强一致性,也就是保证了 AP,不保证 CP,另外我们可以选用强一致性的注册中心,比如 Zookeeper、Nacos,这是后续要讲的内容了。
如何解决不一致的问题
- 在服务端,我们可以设置更新只读缓存的时间间隔,默认是 30 秒,缩短一点,比如 15 秒,频率太高,可能对 Eureka 造成性能问题。
- 在服务端,我们也可以考虑关闭从只读缓存读注册表信息,Eureka Client 直接从读写缓存读取。
- 默认情况下,每隔 30 秒从读写缓存将注册信息更新到只读缓存。
- 默认情况下,客户端读取注册表时,先从只读缓存读,如果没有,则从读写缓存中读取,如果还是没有,则从本地注册表 registry 读取。
- 默认情况下,每隔 180 秒定时过期读写缓存。
- 服务实例注册、下线、故障时,会实时过期读写缓存。
六:心跳机制
1:谁发送的心跳请求
然后每个微服务自己会单独发送心跳请求给注册中心
2:多久发送一次
DIscoveryClient 初始化时,会调度一些定时任务。Eureka 初始化了发送心跳请求的线程池 heartbeatExecutor,用来创建发送心跳的线程 HeartbeatThread
this.heartbeatExecutor = new ThreadPoolExecutor(
1,
this.clientConfig.getHeartbeatExecutorThreadPoolSize(),
0L,
TimeUnit.SECONDS,
new SynchronousQueue(),
(new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()
);
然后将这个线程池用来执行定时调度任务,源码如下所示,在定时任务开始后,延迟 30s 开始执行发送心跳请求,然后每隔 30秒执行一次发送心跳请求。
这里可以看到 new 了一个 HeartbeatThread 线程
3:如何发送心跳请求
HeartbeatThread 线程继承自 Runnable 类,实现了 run 方法,这个里面就会执行发送心跳请求的具体逻辑了。
private class HeartbeatThread implements Runable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMills();
}
}
}
直接进到 renew() 方法里面,核心逻辑就这一行:
eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
调用 EurekaHttpClient 的 sentHeartBeat 方法,将实例信息发送给注册信息。
拼接的请求 URL 示例如下:
http://localhost:8080/v2/apps/order/i-000000-1 <!-put