Java在SpringCloud中自定义Gateway负载均衡策略

Java在SpringCloud中自定义Gateway负载均衡策略

一、前言

spring-cloud-starter-netflix-ribbon已经不再更新了,最新版本是2.2.10.RELEASE,最后更新时间是2021年11月18日,详细信息可以看maven官方仓库:org.springframework.cloud/spring-cloud-starter-netflix-ribbon,SpringCloud官方推荐使用spring-cloud-starter-loadbalancer进行负载均衡。

背景:大文件上传做切片文件上传;

流程:将切片文件上传到服务器,然后进行合并任务,合并完成之后上传到对象存储;现在服务搞成多节点以后,网关默认走轮循,但是相同的服务在不同的机器上,这样就会导致切片文件散落在不同的服务器上,会导致文件合并失败;所以根据一个标识去自定义gateway对应服务的负载均衡策略,可以解决这个问题;

我的版本如下:

<spring-boot.version>2.7.3</spring-boot.version>
        <spring-cloud.version>2021.0.4</spring-cloud.version>
        <spring-cloud-alibaba.version>2021.0.4.0</spring-cloud-alibaba.version>

二、参考默认实现

springCloud原生默认的负载均衡策略是这个类:

org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer

我们参考这个类实现自己的负载均衡策略即可,RoundRobinLoadBalancer实现了ReactorServiceInstanceLoadBalancer这个接口,实现了choose这个方法,如下图:

在choose方法中调用了processInstanceResponse方法,processInstanceResponse方法中调用了getInstanceResponse方法,所以我们我们可以复制RoundRobinLoadBalancer整个类,只修改getInstanceResponse这个方法里的内容就可以实现自定义负载均衡策略。

三、实现代码

原理:根据请求头当中设备的唯一标识传递到下游,唯一标识做哈希取余,可以指定对应的服务器节点,需要的服务设置自定义负载策略,不需要的服务设置默认的轮循机制即可

package com.wondertek.gateway.loadBalancer;

import cn.hutool.core.util.ObjectUtil;
import com.wondertek.web.exception.enums.HttpRequestHeaderEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Slf4j
@Component
public class RequestFilter implements GlobalFilter, Ordered {
    @Override
    public int getOrder() {
        // 应该小于LoadBalancerClientFilter的顺序值
        return Ordered.HIGHEST_PRECEDENCE;
    }
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String clientDeviceUniqueCode = request.getHeaders().getFirst(HttpRequestHeaderEnum.CLIENT_DEVICE_UNIQUE_CODE.getCode());
        // 存入Reactor上下文
        String resultCode = clientDeviceUniqueCode;
        return chain.filter(exchange)
                .contextWrite(context -> {
                    if (ObjectUtil.isNotEmpty(resultCode)) {
                        log.info("开始将request中的唯一标识封装到上下游中:{}", resultCode);
                        return context.put("identification", resultCode);
                    } else {
                        // 或者根据需求进行其他处理
                        return context;
                    }
                });
    }
}

package com.wondertek.gateway.loadBalancer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

@Slf4j
public class ClientDeviceUniqueCodeInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {

    private final String serviceId;
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;


    public ClientDeviceUniqueCodeInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        //在 choose 方法中,使用 deferContextual 方法来访问上下文并提取客户端标识。这里的 getOrDefault 方法尝试从上下文中获取一个键为 "identification" 的值,如果不存在则返回 "default-identification"
        return Mono.deferContextual(contextView -> {
            String identification = contextView.getOrDefault("identification", "14d58a1ba286f087d9736249ec785314");
            log.info("上下游获取到的identification的值为:{}", identification);
            ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
                    .getIfAvailable(NoopServiceInstanceListSupplier::new);
            return supplier.get(request).next()
                    .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, identification));
        });
    }

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances, String identification) {
        Response<ServiceInstance> serviceInstanceResponse;
        if (Objects.isNull(identification)) {
            serviceInstanceResponse = this.getInstanceResponse(serviceInstances, null);
        } else {
            serviceInstanceResponse = this.getIpInstanceResponse(serviceInstances, identification);
        }
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier).selectedServiceInstance((ServiceInstance) serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, String identification) {
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("No servers available for service: " + this.serviceId);
            }
            return new EmptyResponse();
        } else {
            int index = ThreadLocalRandom.current().nextInt(instances.size());
            ServiceInstance instance = (ServiceInstance) instances.get(index);
            return new DefaultResponse(instance);
        }
    }

    private Response<ServiceInstance> getIpInstanceResponse(List<ServiceInstance> instances, String identification) {
        if (instances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        } else if (instances.size() == 1) {
            log.info("只有一个服务实例,直接返回这个实例");
            return new DefaultResponse(instances.get(0));
        } else {
            //创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题
            List<ServiceInstance> sortedInstances = new ArrayList<>(instances);
            // 现在对新列表进行排序,保持原始列表的顺序不变
            Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));
            //log.info("获取到的实例个数的值为:{}", sortedInstances.size());
            sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));
            //log.info("多个服务实例,使用客户端 identification 地址的哈希值来选择服务实例");
            // 使用排序后的列表来找到实例
            int ipHashCode = Math.abs(identification.hashCode());
            //log.info("identificationHashCode的值为:{}", ipHashCode);
            int instanceIndex = ipHashCode % sortedInstances.size();
            //log.info("instanceIndex的值为:{}", instanceIndex);
            ServiceInstance instanceToReturn = sortedInstances.get(instanceIndex);
            //log.info("instanceToReturn.getUri()的值为:{}", instanceToReturn.getUri());
            log.info("自定义identification负载机制,Client identification: {} is routed to instance: {}:{}", identification, instanceToReturn.getHost(), instanceToReturn.getPort());
            return new DefaultResponse(instanceToReturn);
        }
    }

}

package com.wondertek.gateway.loadBalancer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class DefaultInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {

    private final String serviceId;
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    final AtomicInteger position;

    public DefaultInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, AtomicInteger position) {
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        this.position = position;
    }
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
                .getIfAvailable(NoopServiceInstanceListSupplier::new);
        return supplier.get(request).next()
                .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
    }

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
                                                              List<ServiceInstance> serviceInstances) {
        Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("No servers available for service: " + serviceId);
            }
            return new EmptyResponse();
        }
        //创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题
        List<ServiceInstance> sortedInstances = new ArrayList<>(instances);
        // 现在对新列表进行排序,保持原始列表的顺序不变
        Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));
        //log.info("获取到的实例个数的值为:{}", sortedInstances.size());
        sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));
        int pos = Math.abs(this.position.incrementAndGet());
        //log.info("默认轮循机制,pos递加后的值为:{}", pos);
        int positionIndex = pos % instances.size();
        //log.info("取余后的positionIndex的值为:{}", positionIndex);
        ServiceInstance instance = instances.get(positionIndex);
        //log.info("instance.getUri()的值为:{}", instance.getUri());
        log.info("默认轮循机制,routed to instance: {}:{}",instance.getHost(), instance.getPort());
        return new DefaultResponse(instance);
    }

}

package com.wondertek.gateway.loadBalancer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import java.util.concurrent.atomic.AtomicInteger;

@Configuration
//单台服务
//@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class)
//多台服务
@LoadBalancerClients({
        @LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "unity-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "cloud-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "open-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "server-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "center-service", configuration = CustomLoadBalancerConfig.class),
})
@Slf4j
public class CustomLoadBalancerConfig {
    // 定义一个Bean来提供AtomicInteger的实例
    @Bean
    public AtomicInteger positionTracker() {
        // 这将在应用上下文中只初始化一次
        return new AtomicInteger(0);
    }

    //自定义优先级负载均衡器
    @Bean
    public ReactorServiceInstanceLoadBalancer customPriorityLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
                                                                     Environment environment,AtomicInteger positionTracker) {
        String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        //目的为解决文件上传切片文件分散上传的问题
        if ("oms-api".equals(serviceId)||"unity-api".equals(serviceId)||"cloud-api".equals(serviceId)){
            //log.info("服务名称:serviceId:{},走自定义clientDeviceUniqueCode负载模式", serviceId);
            return new ClientDeviceUniqueCodeInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId);
        }
        //log.info("服务名称:serviceId:{},走默认负载模式", serviceId);
        return new DefaultInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId,positionTracker);
    }
}

【SpringCloud系列】开发环境下重写Loadbalancer实现自定义负载均衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/274719.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp 输入手机号并且正则校验

1.<input input“onInput” :value“phoneNum” type“number” maxlength“11”/> 3. method里面写 onInput(e){ this.phoneNum e.detail.value }, 4.调用接口时候校验正则 if (!/^1[3456789]\d{9}$/.test(this.phoneNum)) {uni.showToast({title: 请输入正确的手机号…

K8S网络类型

k8s的网络类型 k8s的通信模式 1 pod内部之间容器与容器之间的通信&#xff0c;在同一个pod中容器是共享资源和网络&#xff0c;使用同一个网络命名空间&#xff0c;可以直接通信 2 同一个node节点之内&#xff0c;不同pod之间的通信&#xff0c;每个pod都有一个全局的真实ip地…

Qt学习:Qt的意义安装Qt

Qt 的简介 QT 是一个跨平台的 C图形用户界面应用程序框架。它为程序开发者提供图形界面所需的所有功能。它是完全面向对象的&#xff0c;很容易扩展&#xff0c;并且允许真正地组件编程。 支持平台 xP 、 Vista、Win7、win8、win2008、win10Windows . Unix/Linux: Ubuntu 等…

gin框架使用系列之六——自定义中间件

系列目录 《gin框架使用系列之一——快速启动和url分组》《gin框架使用系列之二——uri占位符和占位符变量的获取》《gin框架使用系列之三——获取表单数据》《gin框架使用系列之四——json和protobuf的渲染》《gin框架使用系列之五——表单校验》 一、gin中间件概述 gin中将…

【项目】玩具租赁博客测试报告

目录 一、项目背景 二、项目功能 三、功能测试 一、项目背景 玩具租赁系统采用前后端分离的方法来实现&#xff0c;同时使用了数据库来存储相关的数据&#xff0c;同时将其部署到云服务器上。前端主要有十五个页面构成&#xff1a;用户注册、管理员注册、登录页、用户和管理…

LabVIEW利用视觉引导机开发器人精准抓取

LabVIEW利用视觉引导机开发器人精准抓取 本项目利用单目视觉技术指导多关节机器人精确抓取三维物体的技术。通过改进传统的相机标定方法&#xff0c;结合LabVIEW平台的Vision Development和Vision Builder forAutomated Inspection组件&#xff0c;优化了摄像系统的标定过程&a…

matlab列优先与高维矩阵重构

由于matlab在列化a(:)以及reshape(a)等操作中是列优先的&#xff0c;所以要重构出新的高维度矩阵&#xff0c;通常要把reshape和permute结合起来使用。 先到 http://caffe.berkeleyvision.org/ 下载 训练好的model bvlc_reference_caffenet.caffemodel; 更多caffe使用也请参看…

操作教程|MeterSphere UI测试+VNC:简单、快捷地查看UI测试实时执行详情

编者注&#xff1a;本文为CSDN博主hxe116的原创文章。 原文链接为&#xff1a;https://blog.csdn.net/hxe116/article/details/134714960?spm1001.2014.3001.5502 作为一款一站式的开源持续测试平台&#xff0c;MeterSphere涵盖了测试跟踪、接口测试、UI测试和性能测试等功能…

java浅拷贝BeanUtils.copyProperties引发的RPC异常 | 京东物流技术团队

背景 近期参与了一个攻坚项目&#xff0c;前期因为其他流程原因&#xff0c;测试时间已经耽搁了好几天了&#xff0c;本以为已经解决了卡点&#xff0c;后续流程应该顺顺利利的&#xff0c;没想到 人在地铁上&#xff0c;bug从咚咚来~ 没有任何修改的服务接口&#xff0c;抛出…

python实现一维傅里叶变换——冈萨雷斯数字图像处理

原理 傅立叶变换&#xff0c;表示能将满足一定条件的某个函数表示成三角函数&#xff08;正弦和/或余弦函数&#xff09;或者它们的积分的线性组合。在不同的研究领域&#xff0c;傅立叶变换具有多种不同的变体形式&#xff0c;如连续傅立叶变换和离散傅立叶变换。最初傅立叶分…

NAS上使用Docker搭建Wiki.js构建云知识库

文章目录 NAS上使用Docker搭建Wiki.js、PostgreSQL和Nginx云知识库前置条件步骤1&#xff1a;获取wikijs的镜像步骤2&#xff1a;配置容器参数2.1 端口设置2.2 挂载设置2.3 环境变量设置&#xff08;配置数据库&#xff09; 步骤3. 启动界面3.1 切换语言3.2 GIT 配置3.3 用户和…

Flask 日志

flask 日志 代码源码源自编程浪子flask点餐小程序代码 记录用户访问日志 和 错误日志 这段代码是一个基于Flask框架的日志服务类&#xff0c;用于 记录用户访问日志 和 错误日志。代码中定义了一个名为LogService的类&#xff0c;其中包含了两个静态方法&#xff1a;addAcc…

小程序开发平台源码系统:创建专属你的小程序 海量模板任你选择 带完整的代码包以及搭建教程

小程序作为一种轻量级的应用程序&#xff0c;已经深入到人们的日常生活之中。今天来给大家分享一个小程序开发平台源码系统&#xff0c;轻松搭建小程序&#xff0c;还有完整的代码包以及搭建教程。 以下是部分代码示例&#xff1a; 系统特色功能一览&#xff1a; 1.海量模板任…

STM32 IIC开发学习

1IIC总线时序图 ① 起始信号 当 SCL 为高电平期间&#xff0c;SDA 由高到低的跳变。起始信号是一种电平跳变时序信号&#xff0c;而不是 一个电平信号。该信号由主机发出&#xff0c;在起始信号产生后&#xff0c;总线就会处于被占用状态&#xff0c;准备数据 传输。 ② 停止信…

ios环境搭建_xcode安装及运行源码

目录 1 xcode 介绍 2 xcode 下载 3 xocde 运行ios源码 1 xcode 介绍 Xcode 是运行在操作系统Mac OS X上的集成开发工具&#xff08;IDE&#xff09;&#xff0c;由Apple Inc开发。Xcode是开发 macOS 和 iOS 应用程序的最快捷的方式。Xcode 具有统一的用户界面设计&#xff0…

矿泉水硝酸盐和溴酸盐超标解决工艺

在当今社会&#xff0c;人们对健康和优质生活的追求不断提升&#xff0c;使得瓶装饮用水的安全问题受到了广泛关注。溴酸盐和硝酸盐作为自然水体中常见的物质&#xff0c;若在矿泉水中含量过高&#xff0c;可能会对消费者的健康构成潜在威胁。因此&#xff0c;探究有效去除矿泉…

【OpenCV】告别人工目检:深度学习技术引领工业品缺陷检测新时代

目录 前言 机器视觉 缺陷检测 工业上常见缺陷检测方法 内容简介 作者简介 目录 读者对象 如何阅读本书 获取方式 前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。 点击跳转到网站 机器视觉…

WPF项目创建HTTP WEB服务,不使用IIS业务 WPF桌面程序WebApi WPF 集成WebApi C# 创建HTTP Web API服务

在C# WPF应用程序中直接创建HTTP服务或WebAPI服务有以下优点&#xff1a; 自托管服务&#xff1a; 简化部署&#xff1a;无需依赖外部服务器或IIS&#xff08;Internet Information Services&#xff09;&#xff0c;可以直接在应用程序内部启动和运行Web服务。 集成紧密&…

如何在无公网IP环境使用Windows远程桌面Ubuntu

文章目录 一、 同个局域网内远程桌面Ubuntu二、使用Windows远程桌面连接三、公网环境系统远程桌面Ubuntu1. 注册cpolar账号并安装2. 创建隧道&#xff0c;映射3389端口3. Windows远程桌面Ubuntu 四、 配置固定公网地址远程Ubuntu1. 保留固定TCP地址2. 配置固定的TCP地址3. 使用…

「Kafka」入门篇

「Kafka」入门篇 基础架构 Kafka 快速入门 集群规划 集群部署 官方下载地址&#xff1a;http://kafka.apache.org/downloads.html 解压安装包&#xff1a; [atguiguhadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/修改解压后的文件名称&#xff1a; [a…