Eureka是一个服务发现与注册组件,它包含服务端和客户端,服务端管理服务的注册信息,客户端简化服务实例与服务端的交互。我们结合源码来分析下eureka组件的实现原理,内容分为上下两章,第一章分析eureka的服务注册,第二章分析eureka的心跳机制,本章节是第一章。
参考源码:<spring-cloud.version>Hoxton.SR9</spring-cloud.version>
往期系列:
【SpringBoot】SpringBoot源码解析第一章 SpringBoot的构造方法-CSDN博客
【SpringBoot】SpringBoot源码解析第二章 SpringBoot的run方法-CSDN博客
【SpringBoot】SpringBoot源码解析第三章 SpringBoot的自动化配置-CSDN博客
【SpringBoot】SpringBoot源码解析第四章 SpringBoot的bean接口-CSDN博客
【SpringBoot】SpringBoot源码解析第五章 SpringBoot的beanDefinition收集过程-CSDN博客
【SpringBoot】SpringBoot源码解析第六章 SpringBoot的getBean方法-CSDN博客
【SpringBoot】SpringBoot源码解析第七章 SpringBoot的感悟-CSDN博客
1、注册服务
1.1 服务端接收注册信息
spring-cloud-netflix-eureka-server依赖包下有一个spring.factories文件,文件内容如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
根据springboot自动配置的原理可知,EurekaServerAutoConfiguration会被标记成了一个自动配置类。EurekaServerAutoConfiguration配置类中有一个jerseyApplication方法,这个方法会收集指定包下被Path或Provider注解标记的类的beanDefinition,这些类可以看作是Controller
// 扫描包路径
private static final String[] EUREKA_PACKAGES =
new String[]{"com.netflix.discovery", "com.netflix.eureka"};
// 收集包下指定类的beanDefinition,放入application对象
@Bean
public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
// 收集的对象要求被Path或Provider注解标记
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
String[] var5 = EUREKA_PACKAGES;
int var6 = var5.length;
for(int var7 = 0; var7 < var6; ++var7) {
String basePackage = var5[var7];
// 扫描包路径,收集beanDefinition
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
Iterator var10 = beans.iterator();
while(var10.hasNext()) {
BeanDefinition bd = (BeanDefinition)var10.next();
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
classes.add(cls);
}
}
...
return rc;
}
// 获取到application,将beanDefinition置入servlet容器
@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(Application eurekaJerseyApp) {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Integer.MAX_VALUE);
bean.setUrlPatterns(Collections.singletonList("/eureka/*"));
return bean;
}
收集的beanDefinition会通过jerseyFilterRegistration方法放入servlet容器,这样接收请求时就能通过url映射给指定的bean来处理请求
com.netflix.eureka包下被扫描的类如下:
ApplicationResource类是Controller中的一员,它有一个addInstance方法,这个方法就是服务端响应服务注册的方法
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
...
// 执行注册
this.registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
}
调用链:
-> ApplicationResource.addInstance
-> InstanceRegistry.register
-> PeerAwareInstanceRegistryImpl.register
-> AbstractInstanceRegistry.register
服务端使用currentHashMap来存储服务的信息,服务端响应注册的过程较为简单
// 用currentHashMap存储服务信息
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap();
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication)
{
...
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 将服务信息放入map中
((Map)gMap).put(registrant.getId(), lease);
...
}
1.2 客户端发送注册信息
1.2.1 client客户端
spring-cloud-netflix-eureka-client依赖包下也有一个spring.factories文件,文件内容如下
...
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
...
EurekaClientAutoConfiguration被标记成自动配置类,它里面有一个创建EurekaClient类对象的bean方法,看类的名称我们知道这是一个客户端
@Bean(
destroyMethod = "shutdown"
)
@ConditionalOnMissingBean(
value = {EurekaClient.class},
search = SearchStrategy.CURRENT
)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = (ApplicationInfoManager)ProxyUtils.getTargetObject(manager);
} else {
appManager = manager;
}
// 创建客户端
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
调用链:
-> EurekaAutoServiceRegistration.eurekaClient
-> new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
-> CloudEurekaClient.super(applicationInfoManager, config, args);
-> DiscoveryClient.DiscoveryClient
... 构造方法重载
-> DiscoveryClient.DiscoveryClient
-> initScheduledTasks
跟踪EurekaClient类的构造方法找到DiscoveryClient类,DiscoveryClient类的构造方法调用了initScheduledTasks方法,初始化了一个定时任务
private void initScheduledTasks() {
...
// 添加状态变更监听器
this.statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}
public void notify(StatusChangeEvent statusChangeEvent) {
if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
DiscoveryClient.logger.error("Saw local status change event {}", statusChangeEvent);
} else {
DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
}
// 监听器被通知后调用onDemandUpdate方法
DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
}
};
...
}
定时任务内添加了一个状态修改监听器,监听器调用notify方法时会回调onDemandUpdate方法,追踪这个回调方法
调用链:
-> InstanceInfoReplicator.onDemandUpdate
-> InstanceInfoReplicator.this.run
-> this.discoveryClient.register
-> this.eurekaTransport.registrationClient.register(this.instanceInfo)
-> AbstractJerseyEurekaHttpClient.register
进入到AbstractJerseyEurekaHttpClient类的register方法
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
EurekaHttpResponse var5;
try {
// 向注册中心发送http请求
WebResource.Builder resourceBuilder = this.jerseyClient
.resource(this.serviceUrl)
.path(urlPath)
.getRequestBuilder();
this.addExtraHeaders(resourceBuilder);
response = (ClientResponse)((WebResource.Builder)((WebResource.Builder)((WebResource.Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(new String[]{"application/json"})).post(ClientResponse.class, info);
var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", new Object[]{this.serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()});
}
if (response != null) {
response.close();
}
}
return var5;
}
注册方法写得很直白了:客户端拿到注册中心地址,然后携带服务元数据,发送请求完成注册。不过还有一个问题,之前我们提到定时任务内初始化了一个监听器,这个监听器只有被通知了才会执行后续的注册方法,那么监听器是如何被通知的?它的触发时机又在何时?
1.2.2 监听器
EurekaClientAutoConfiguration配置类还有一个创建EurekaAutoServiceRegistration类的bean方法
// 创建服务注册客户端
@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
@ConditionalOnProperty(
value = {"spring.cloud.service-registry.auto-registration.enabled"},
matchIfMissing = true
)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
return new EurekaAutoServiceRegistration(context, registry, registration);
}
EurekaAutoServiceRegistration类实现了SmartLifecycle接口。当spring容器加载完所有bean后会调用SmartLifeCycle接口实现类的start方法,start方法调用EurekaServiceRegistry类的regiser方法
public class EurekaAutoServiceRegistration implements
AutoServiceRegistration,
SmartLifecycle,
Ordered,
SmartApplicationListener
{
public void start() {
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
// 调用EurekaServiceRegistry的regiser
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent(this,
this.registration.getInstanceConfig()));
this.running.set(true);
}
}
}
EurekaServiceRegistry类的regiser方法会设置实例的状态。进入ApplicationInfoManager类的setInstanceStatus方法
// 设置实例状态
reg.getApplicationInfoManager().setInstanceStatus(
reg.getInstanceConfig().getInitialStatus());
setInstanceStatus方法触发了一个状态修改事件,并且通知了监听器
public synchronized void setInstanceStatus(InstanceInfo.InstanceStatus status) {
InstanceInfo.InstanceStatus next = this.instanceStatusMapper.map(status);
if (next != null) {
InstanceInfo.InstanceStatus prev = this.instanceInfo.setStatus(next);
if (prev != null) {
Iterator var4 = this.listeners.values().iterator();
while(var4.hasNext()) {
StatusChangeListener listener = (StatusChangeListener)var4.next();
try {
// 通知监听器
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception var7) {
logger.warn("failed to notify listener: {}", listener.getId(), var7);
}
}
}
}
}
这里的监听器和上面提到的状态修改监听器其实是同一个监听器,在调用EurekaAutoServiceRegistration对象的start方法后,监听器会收到通知然后调用客户端的register方法,这就是发送注册服务请求的执行时机
2、拉取服务
2.1 初次拉取
客户端第一次拉取服务和DiscoveryClient类的构造方法有关,详情如下:
@Inject
DiscoveryClient(...){
...
// 调用fetchRegistry方法,拉取服务
boolean primaryFetchRegistryResult = this.fetchRegistry(false);
if (!primaryFetchRegistryResult) {
logger.info("Initial registry fetch from primary servers failed");
}
...
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
...
// 调用getAndStoreFullRegistry方法,拉取全部服务
this.getAndStoreFullRegistry();
...
}
private void getAndStoreFullRegistry() throws Throwable {
...
long currentUpdateGeneration = this.fetchRegistryGeneration.get();
// 启动时会打印这行日志
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 发送http请求
EurekaHttpResponse<Applications> httpResponse = this.clientConfig.getRegistryRefreshSingleVipAddress() == null ? this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()) : this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = (Applications)httpResponse.getEntity();
}
...
}
2.2 定时拉取
为了保证服务信息真实可信,客户端会定时拉取远程注册列表更新本地数据。提到到定时任务,自然的联想到DiscoveryClient类的initScheduledTasks方法(1.2.1的内容)
private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
if (this.clientConfig.shouldFetchRegistry()) {
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 定时刷新本地服务列表任务,具体任务在CacheRefreshThread内
this.cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new
// 执行任务的线程
CacheRefreshThread());
this.scheduler.schedule(this.cacheRefreshTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
}
class CacheRefreshThread implements Runnable {
CacheRefreshThread() {
}
public void run() {
// 刷新服务列表
DiscoveryClient.this.refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
...
// 获取服务列表
boolean success = this.fetchRegistry(remoteRegionsModified);
...
}
3、总结
eureka服务端启动后通过自动配置加载com.netflix.eureka包下的处理器,处理器会响应注册、拉取、剔除服务等http请求
eureka客户端启动后会发送注册请求,并定时更新服务列表