一、什么是 LoadBalancer?
LoadBalancer(负载均衡器) 是一种网络设备或软件机制,用于分发传入的网络流量负载(请求)到多个后端目标服务器上,从而实现系统资源的均衡利用和提高系统的可用性和新能。
1.1 负载均衡分类
负载均衡分为服务器端负载均衡和客户端负载均衡
1. 服务器端负载均衡指的是存放在服务器端的负载均衡器,例如 Nginx、HAProxy、F5 等。
2. 客户端负载均衡指的是嵌套在客户端的负载均衡器,例如 Ribbon、Spring Cloud LoadBalancer。
1.2 常见的负载均衡策略
-
询轮:按照顺序将每个新的请求分发给后端服务器,依次循环。这是一种最简单的负载均衡策略,适用于后端服务器的性能相近,且每个请求的处理时间大致相同的情况。
-
随机选择:随机选择一个后端服务器来处理每个新的请求。这种策略适用于后端服务器性能相似,且每个请求的处理时间相近的情况,但不保证请求的分发是均匀的。
-
最少连接:最少连接策略将请求分发给当前连接数最少的后端服务器。这可以确保负载均衡在后端服务器的连接负载上均衡。但需要维护连接计数。
-
IP 哈希:IP 哈希策略使用客户端的 IP 地址来计算哈希值,然后将请求发送到与哈希值对应的后端服务器。这种策略可用于确保来自同一客户端的请求都被发送到同一台后端服务器,适用于需要会话保持的情况。
-
加权轮询:加权轮询给每个后端服务器分配一个权重值,然后按照权重值比例来分发请求。这可以用来处理后端服务器性能不均衡的情况,将更多的请求发给性能更高的服务器。
-
加权随机选择:加权随机选择与加权轮询类似,但是按照权重值来随机选择后端服务器。这也可以用来处理后端服务器性能不均衡的情况,但是分发更随机。
-
最短响应时间 :最短响应时间策略会测量每个后端服务器的响应时间,并将请求发送到响应时间最短的服务器。这中策略可以确保客户端获得最快的响应,适用于要求低延迟的应用。
二、默认负载均衡策略
Spring Cloud LoadBalancer 负载均衡策略默认的是轮询,这一点可以通过 Spring Cloud LoadBalancer 的配置类 LoadBalancerClientConfiguration 中发现,它的部分源码如下:
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
public LoadBalancerClientConfiguration() {
}
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty("loadbalancer.client.name");
return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
//...
}
RoundRobinLoadBalancer 核心实现源码:
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + this.serviceId);
}
return new EmptyResponse();
} else if (instances.size() == 1) {
return new DefaultResponse((ServiceInstance)instances.get(0));
} else {
//当有多个实例时,& Integer.MAX_VALUE 摒弃负数,确保下标为正数
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
//取模进行轮询
ServiceInstance instance = (ServiceInstance)instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}
三、随机负载均衡策略
代码实现
设置局部负载均衡器:
//balancer-service:注册实例名称
//RandomLoadBalanceConfig.class 自定义负载均衡器类名
@LoadBalancerClient(name = "balancer-service",configuration = RandomLoadBalanceConfig.class)
public class RandomLoadBalanceConfig {
@Bean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty("loadbalancer.client.name");
return new RandomLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
设置全局负载均衡器:
@SpringBootApplication
@EnableFeignClients
@LoadBalancerClients(defaultConfiguration= RandomLoadBalanceConfig.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
四、设置Nacos权重负载均衡器
Nacos 中支持两种负载均衡器,一种是权重负载均衡器,另一种是第三方 CMDB(地域就近访问)标签负载均衡器,我们可以将 Spring Cloud Loadbalancer 直接配置为 Nacos 的负载均衡器,它默认就是权重负载均衡策略。
它的配置有以下两步:
1. 创建 Nacos 负载均衡器
@LoadBalancerClient(name = "balancer-service",configuration = NacosLoadBalanceConfig.class)
public class NacosLoadBalanceConfig {
@Resource
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Bean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty("loadbalancer.client.name");
return new NacosLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name,nacosDiscoveryProperties);
}
}
五、创建自定义负载均衡器
5.1 创建自定义负载均衡器
自定义负载均衡器只需要参考官方负载均衡器写就可
public class CustomizeLoadBalance implements ReactorServiceInstanceLoadBalancer {
private static final Logger log = LoggerFactory.getLogger(NacosLoadBalancer.class);
private final String serviceId;
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private static final String IPV4_REGEX = "((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}";
private static final String IPV6_KEY = "IPv6";
public static String ipv6;
@Autowired
private InetIPv6Utils inetIPv6Utils;
@PostConstruct
public void init() {
String ip = this.nacosDiscoveryProperties.getIp();
if (StringUtils.isNotEmpty(ip)) {
ipv6 = Pattern.matches("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", ip) ? (String)this.nacosDiscoveryProperties.getMetadata().get("IPv6") : ip;
} else {
ipv6 = this.inetIPv6Utils.findIPv6Address();
}
}
private List<ServiceInstance> filterInstanceByIpType(List<ServiceInstance> instances) {
if (StringUtils.isNotEmpty(ipv6)) {
List<ServiceInstance> ipv6InstanceList = new ArrayList();
Iterator var3 = instances.iterator();
while(var3.hasNext()) {
ServiceInstance instance = (ServiceInstance)var3.next();
if (Pattern.matches("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", instance.getHost())) {
if (StringUtils.isNotEmpty((CharSequence)instance.getMetadata().get("IPv6"))) {
ipv6InstanceList.add(instance);
}
} else {
ipv6InstanceList.add(instance);
}
}
if (ipv6InstanceList.size() == 0) {
return (List)instances.stream().filter((instancex) -> {
return Pattern.matches("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", instancex.getHost());
}).collect(Collectors.toList());
} else {
return ipv6InstanceList;
}
} else {
return (List)instances.stream().filter((instancex) -> {
return Pattern.matches("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", instancex.getHost());
}).collect(Collectors.toList());
}
}
public CustomizeLoadBalance(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, NacosDiscoveryProperties nacosDiscoveryProperties) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
}
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map(this::getInstanceResponse);
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
} else {
try {
String clusterName = this.nacosDiscoveryProperties.getClusterName();
List<ServiceInstance> instancesToChoose = serviceInstances;
if (StringUtils.isNotBlank(clusterName)) {
List<ServiceInstance> sameClusterInstances = (List)serviceInstances.stream().filter((serviceInstance) -> {
String cluster = (String)serviceInstance.getMetadata().get("nacos.cluster");
return StringUtils.equals(cluster, clusterName);
}).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sameClusterInstances)) {
instancesToChoose = sameClusterInstances;
}
} else {
log.warn("A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}", new Object[]{this.serviceId, clusterName, serviceInstances});
}
//获取request 对象
ServletRequestAttributes attributes= (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request= attributes.getRequest();
String ipAddress=request.getRemoteAddr();
System.out.println(ipAddress);
int hash=ipAddress.hashCode();
//自定义负载据衡器策略,通过ip hash后,取模获取下标
int index=(hash&Integer.MAX_VALUE)%serviceInstances.size();
System.out.println(index);
//获取服务实例
ServiceInstance instance=serviceInstances.get(index);
return new DefaultResponse(instance);
} catch (Exception var5) {
log.warn("NacosLoadBalancer error", var5);
return null;
}
}
}
}
5.2 封装并设置自定义负载均衡器
@LoadBalancerClient(name = "balancer-service" ,configuration = CoustomizeHashLoadBalanceConfig.class)
public class CoustomizeHashLoadBalanceConfig {
@Resource
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Bean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty("loadbalancer.client.name");
return new CustomizeLoadBalance(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name,nacosDiscoveryProperties);
}
}
六、缓存
Spring Cloud LoadBalancer 在获取实例时有两种选择:
- 即时获取:每次从注册中心得到最新的健康实例,效果好、开销大
- 缓存服务列表:每次得到服务列表之后,缓存一段时间,这样既能保证性能,同时也能兼容一定的及时性。
Spring Cloud LoadBalancer 中默认开启了缓存服务列表的功能。
Spring Cloud LoadBalancer 默认缓存的重要特性有两项:
- 缓存的过期时间为 35s
- 缓存保存个数为 256 个
spring:
cloud:
loadbalancer:
cache:
ttl: 10
capacity: 128
#不开启缓存
enabled:false
注意:尽管在不开启缓存对于开发和测试很有用,但其效率远低于将缓存开启,因此建议在生产环境中始终启用缓存
七、执行原理
OpenFeign 底层是通过 HTTP 客户端对象 RestTemplate 实现接口请求的,而负载均衡器的作用只是在请求客户端发送请求之前,得到一个服务的地址给到 RestTemplate 对象,而 Spring Cloud LoadBalancer 的整体类图如下:
通过查看 Spring Cloud LoadBalancer 源码可以发现,@LoadBalancer 注解由 spring-cloud-commons 实现,查看实现逻辑我们发现 spring-cloud-commons 存在自动配置类 LoadBalancerAutoConfiguration,当满足条件时,将自动创建 LoadBalancerInterceptor 并注入到RestTemplate 中,部分源码如下:
@Configuration(
proxyBeanMethods = false
)
@Conditional({RetryMissingOrDisabledCondition.class})
static class LoadBalancerInterceptorConfig {
LoadBalancerInterceptorConfig() {
}
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
LoadBalancerInterceptor 又实现了 ClientHttpRequestInterceptor 接口,并实现 Intercept 方法,用于实现负载均衡的拦截处理,实现源码:
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
其中有一个 LoadBalancerClient 负载均衡客户端,用于进行负载均衡逻辑,从服务列表中选择出一个服务进行地址进行调用,默认实现为 BlockingLoadBalancerClient,源码如下:
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
String hint = this.getHint(serviceId);
LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter(request, this.buildRequestContext(request, hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId);
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStart(lbRequest);
});
ServiceInstance serviceInstance = this.choose(serviceId, lbRequest);
if (serviceInstance == null) {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, new EmptyResponse()));
});
throw new IllegalStateException("No instances available for " + serviceId);
} else {
return this.execute(serviceId, serviceInstance, lbRequest);
}
}
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId);
Request lbRequest = request instanceof Request ? (Request)request : new DefaultRequest();
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance));
});
try {
T response = request.apply(serviceInstance);
Object clientResponse = this.getClientResponse(response);
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.SUCCESS, lbRequest, defaultResponse, clientResponse));
});
return response;
} catch (IOException var9) {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.FAILED, var9, lbRequest, defaultResponse));
});
throw var9;
} catch (Exception var10) {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.FAILED, var10, lbRequest, defaultResponse));
});
ReflectionUtils.rethrowRuntimeException(var10);
return null;
}
}
public ServiceInstance choose(String serviceId) {
return this.choose(serviceId, ReactiveLoadBalancer.REQUEST);
}
//通过不同的负载均衡器选择不同的服务
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
//获取负载均衡器
ReactiveLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
} else {
//根据负载均衡器得到一个请求实例
Response<ServiceInstance> loadBalancerResponse = (Response)Mono.from(loadBalancer.choose(request)).block();
return loadBalancerResponse == null ? null : (ServiceInstance)loadBalancerResponse.getServer();
}
}
小结:OpenFeign 通过拦截器调用 RestTemplate 实现和服务的交互,RestTemplate 通过HttpClient 进行请求,而在交互之前通过调用 Spring Cloud LoadBalancer中的LoadBalancerClient中的choose 方法 ,根据服务 id 和负载均衡策略得到某个服务地址,再进行调用。