ShenYu网关注册中心之Zookeeper注册原理

文章目录

  • 1、客户端注册流程
    • 1.1、读取配置
      • 1.1.1、用于注册的 ZookeeperClientRegisterRepository
      • 1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener
    • 1.2、扫描注解,注册元数据和URI
      • 1.2.1、构建URI并写入Disruptor
      • 1.2.2、构建元数据并写入Disruptor
      • 1.2.3、Disruptor消费数据并向shenyu-admin注册数据
  • 2、服务端注册流程
    • 2.1、读取配置
      • 2.1.1、用于监听的ShenyuClientServerRegisterRepository
    • 2.2、注册元数据和URI
      • 2.2.1、监听数据变更并写入Disruptor
      • 2.2.2、Disruptor消费数据并持久化

1、客户端注册流程

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。

1.1、读取配置

该例子是一个springboot项目,所以注册的入口往往在自动装配类中。不妨可以先看下项目的pom文件中引入了什么依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.shenyu</groupId>
        <artifactId>shenyu-spring-boot-starter-client-springmvc</artifactId>
        <version>${project.version}</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

这里面看到就shenyu-spring-boot-starter-client-springmvc是跟ShenYu相关的,所以入口应该就在这个依赖内了,看下这个依赖的项目结构:
在这里插入图片描述

发现就是两个配置类,ShenyuSpringMvcClientInfoRegisterConfiguration由于使用了@Configuration(proxyBeanMethods = false),暂时不用关注,重点关注ShenyuSpringMvcClientConfiguration,它是shenyu客户端http注册配置类。

/**
 * shenyu 客户端http注册配置类
 */
@Configuration
// shenyu客户端通用配置类
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {

    static {
        VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);
    }

    /**
     *
     * 监听并处理http元数据和URI信息的注册
     *
     * @param clientConfig                   客户端注册配置
     * @param shenyuClientRegisterRepository 客户端注册类
     */
    @Bean
    @ConditionalOnMissingBean(ClientRegisterConfiguration.class)
    // 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的
    public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
                                                                          final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
    }
}

通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入ShenyuClientCommonBeanConfiguration配置类。

/**
 * shenyu客户端通用配置类,创建注册中心客户端通用的bean
 */
@Configuration
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuClientCommonBeanConfiguration {
    
    /**
     * 根据注册中心配置通过SPI方式创建客户端注册类
     */
    @Bean
    public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
        return ShenyuClientRegisterRepositoryFactory.newInstance(config);
    }
    
    /**
     * Shenyu 客户端注册中心配置,读取shenyu.register属性配置
     */
    @Bean
    @ConfigurationProperties(prefix = "shenyu.register")
    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
        return new ShenyuRegisterCenterConfig();
    }
    
    /**
     * Shenyu 客户端配置,读取shenyu.client属性配置
     */
    @Bean
    @ConfigurationProperties(prefix = "shenyu")
    public ShenyuClientConfig shenyuClientConfig() {
        return new ShenyuClientConfig();
    }
}

ShenyuClientCommonBeanConfigurationShenYu客户端的通用配置类,创建了3个通用bean。

  • ShenyuClientRegisterRepository:客户端注册类,用于将客户端接口信息注册到注册中心。
  • ShenyuRegisterCenterConfigShenYu客户端注册中心配置类,读取shenyu.register属性配置。
  • ShenyuClientConfigShenYu客户端配置类,读取shenyu.client属性配置。

1.1.1、用于注册的 ZookeeperClientRegisterRepository

上面生成的ShenyuClientRegisterRepository是用于实现客户端注册的接口,会根据注册中心的配置通过SPI方式创建客户端注册类,每一个注册方式都对应一个实现类。
在这里插入图片描述

目前支持7种注册类型:

  • Http:HttpClientRegisterRepository
  • Apollo:ApolloClientRegisterRepository
  • Zookeeper:ZookeeperClientRegisterRepository
  • Etcd:EtcdClientRegisterRepository
  • Nacos:NacosClientRegisterRepository
  • Consul:ConsulClientRegisterRepository
  • Polaris:PolarisClientRegisterRepository
public final class ShenyuClientRegisterRepositoryFactory {
    
    private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
    
    /**
     * 根据注册中心类型实例化注册服务
     */
    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
            // 通过SPI方式创建客户端注册类
            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
            // 初始化对应客户端注册类,比如创建zookeeper client,etcd client,admin平台的token等
            result.init(shenyuRegisterCenterConfig);
            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
            return result;
        }
        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
    }
}

加载类型通过registerType指定,也就是我们在配置文件中指定的类型:

shenyu:
  register:
    registerType: zookeeper
    serverLists: http://localhost:2181

这里指定的是zookeeper,所以这里创建的就是ZookeeperClientRegisterRepository
创建对应的注册客户端后,会调用init方法根据shenyu.register下的配置进行初始化:

@Join
public class ZookeeperClientRegisterRepository implements ShenyuClientRegisterRepository {

    @Override
    public void init(final ShenyuRegisterCenterConfig config) {
        Properties props = config.getProps();
        int sessionTimeout = Integer.parseInt(props.getProperty("sessionTimeout", "3000"));
        int connectionTimeout = Integer.parseInt(props.getProperty("connectionTimeout", "3000"));

        int baseSleepTime = Integer.parseInt(props.getProperty("baseSleepTime", "1000"));
        int maxRetries = Integer.parseInt(props.getProperty("maxRetries", "3"));
        int maxSleepTime = Integer.parseInt(props.getProperty("maxSleepTime", String.valueOf(Integer.MAX_VALUE)));

        ZookeeperConfig zkConfig = new ZookeeperConfig(config.getServerLists());
        zkConfig.setBaseSleepTimeMilliseconds(baseSleepTime)
                .setMaxRetries(maxRetries)
                .setMaxSleepTimeMilliseconds(maxSleepTime)
                .setSessionTimeoutMilliseconds(sessionTimeout)
                .setConnectionTimeoutMilliseconds(connectionTimeout);

        String digest = props.getProperty("digest");
        if (!StringUtils.isEmpty(digest)) {
            zkConfig.setDigest(digest);
        }

        // 创建zookeeper客户端
        this.client = new ZookeeperClient(zkConfig);
        this.client.getClient().getConnectionStateListenable().addListener((c, newState) -> {
            if (newState == ConnectionState.RECONNECTED) {
                nodeDataMap.forEach((k, v) -> {
                    if (!client.isExist(k)) {
                        client.createOrUpdate(k, v, CreateMode.EPHEMERAL);
                        LOGGER.info("zookeeper client register uri success: {}", v);
                    }
                });
            }
        });

        // 启动客户端
        client.start();
    }
}

这里主要就是创建zookeeper的客户端,为后面的发送注册数据做准备。其他注册类型的ShenyuClientRegisterRepository也一样,创建各自注册中心的client,连接注册中心,为发送数据做准备。类注解@Join用于SPI的加载。

1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener

回到一开始的ShenyuSpringMvcClientConfiguration配置类:

/**
 * shenyu 客户端http注册配置类
 */
@Configuration
// shenyu客户端通用配置类
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {

    static {
        VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);
    }

    /**
     *
     * 监听并处理http元数据和URI信息的注册
     *
     * @param clientConfig                   客户端注册配置
     * @param shenyuClientRegisterRepository 客户端注册类
     */
    @Bean
    @ConditionalOnMissingBean(ClientRegisterConfiguration.class)
    // 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的
    public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,
                                                                          final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
    }
}

创建了SpringMvcClientEventListener,负责客户端 元数据URI 数据的构建和注册。SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener,而AbstractContextRefreshedEventListener是一个抽象类,它实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。每一种后端服务RPC调用协议都对应了一个监听类。
在这里插入图片描述

public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {

    public SpringMvcClientEventListener(final PropertiesConfig clientConfig,
                                        final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        super(clientConfig, shenyuClientRegisterRepository);
        // client配置
        Properties props = clientConfig.getProps();
        // 是否是全部接口都注册
        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
        // http协议
        this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
        this.addPrefixed = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.ADD_PREFIXED,
                Boolean.FALSE.toString()));
        mappingAnnotation.add(ShenyuSpringMvcClient.class);
        mappingAnnotation.add(RequestMapping.class);
    }

    // ...

}

SpringMvcClientEventListener的构造函数主要就是调用父类AbstractContextRefreshedEventListener的构造函数,传入客户端配置和客户端注册类,客户端配置指shenyu.client.http下的配置:

shenyu:
  client:
     http:
       props:
         contextPath: /http
         appName: http-appName
         port: 8189
         isFull: false
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {

    protected static final String PATH_SEPARATOR = "/";

    // Disruptor 发布器
    private final ShenyuClientRegisterEventPublisher publisher = ShenyuClientRegisterEventPublisher.getInstance();
	// ...
    
    public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,
                                                 final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        // 读取 shenyu.client.http 配置信息
        Properties props = clientConfig.getProps();
        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");
        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
            String errorMsg = "client register param must config the appName or contextPath";
            LOG.error(errorMsg);
            throw new ShenyuClientIllegalArgumentException(errorMsg);
        }
        this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);
        this.host = props.getProperty(ShenyuClientConstants.HOST);
        this.port = props.getProperty(ShenyuClientConstants.PORT);
        // 开始事件发布,启动 Disruptor
        publisher.start(shenyuClientRegisterRepository);
    }   

}

取出相关配置信息后,就启动 Disruptor 队列,ShenyuClientRegisterEventPublisher可以看作是一个生产者,用来向队列发送数据,

public class ShenyuClientRegisterEventPublisher {
    
    private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
    
    private DisruptorProviderManage<DataTypeParent> providerManage;
 
    public static ShenyuClientRegisterEventPublisher getInstance() {
        return INSTANCE;
    }
    
    /**
     * Start.
     *
     * @param shenyuClientRegisterRepository shenyuClientRegisterRepository
     */
    public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        // 注册任务工厂类,用于创建注册的任务,客户端使用的是RegisterClientExecutorFactory, 
        // 而在服务端(shenyu-admin)用于处理注册任务的是RegisterServerConsumerExecutor,
        // 都是用于消费Disruptor数据的任务
        RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
        // 添加元数据订阅器
        factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
        // 添加URI订阅器
        factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
        // 添加ApiDoc订阅器
        factory.addSubscribers(new ShenyuClientApiDocExecutorSubscriber(shenyuClientRegisterRepository));
        providerManage = new DisruptorProviderManage<>(factory);
        // 启动Disruptor队列,并创建消费者
        providerManage.startup();
    }
    
    /**
     * 发布事件,向Disruptor队列发数据
     *
     * @param data the data
     */
    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        provider.onData(data);
    }
}

start方法主要是为队列添加订阅器,会由消费者接收到信息后调用这些订阅器。然后启动启动Disruptor队列,并创建消费者。

public class DisruptorProviderManage<T> {

    public void startup() {
        this.startup(false);
    }
    
    public void startup(final boolean isOrderly) {
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        if (isOrderly) {
            newConsumerSize = 1;
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            eventFactory = new DisruptorEventFactory<>();
        }
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
                ProducerType.MULTI,
                new BlockingWaitStrategy());
        // 创建消费者
        @SuppressWarnings("all")
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        // 设置消费者
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        // 真正调用disruptor的api启动
        disruptor.start();
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        // disruptor的生产者
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }  

}

这里就是准备Disruptor队列的一些逻辑,就不细讲了,其中QueueConsumerDisruptor的消费者,后面就是由它接收数据。

1.2、扫描注解,注册元数据和URI

上面说到SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener,而AbstractContextRefreshedEventListener实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。

// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行,算是客户端的执行入口吧
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
    context = event.getApplicationContext();
    // 获取客户端的接口类,比如http就是Controller类,dubbo就是@DubboService类,由子类实现
    Map<String, T> beans = getBeans(context);
    if (MapUtils.isEmpty(beans)) {
        return;
    }
    // 保证只注册一次
    if (!registered.compareAndSet(false, true)) {
        return;
    }
    // 构建URI并写入Disruptor,由子类实现
    publisher.publishEvent(buildURIRegisterDTO(context, beans));
    // 构建元数据并写入Disruptor
    beans.forEach(this::handle);
    Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);
    apiModules.forEach((k, v) -> handleApiDoc(v, beans));
}

获取客户端服务的接口类,由具体的子类实现,http就是Controller类,这里对应的子类就是SpringMvcClientEventListener

@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {
    // Filter out
    if (Boolean.TRUE.equals(isFull)) {
        // isFull=true,表示代理整个服务,就不需要注解扫描了,
        // 直接构建元数据和URI,写入Disruptor
        getPublisher().publishEvent(MetaDataRegisterDTO.builder()
                .contextPath(getContextPath())
                .addPrefixed(addPrefixed)
                .appName(getAppName())
                .path(PathUtils.decoratorPathWithSlash(getContextPath()))
                .rpcType(RpcTypeEnum.HTTP.getName())
                .enabled(true)
                .ruleName(getContextPath())
                .build());
        LOG.info("init spring mvc client success with isFull mode");
        // 构建URI
        publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));
        return Collections.emptyMap();
    }
    // 否则获取@Controller注解的bean
    return context.getBeansWithAnnotation(Controller.class);
}

这里会判断配置文件中的shenyu.client.http.props.isFull,如果是true,则直接构建一个元数据URI,写入到Disruptor中,然后返回一个空集合,后续的逻辑就没执行了。如果是false,则从spring容器中获取带@Controller注解的bean返回。

1.2.1、构建URI并写入Disruptor

构建一个URI数据写入到Disruptor,这个也是由子类实现的:

// 构建URI
@Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,
                                             final Map<String, Object> beans) {
    try {
        return URIRegisterDTO.builder()
                .contextPath(getContextPath()) // shneyu得contextPath
                .appName(getAppName()) // appName
                .protocol(protocol) // 服务协议
                .host(super.getHost()) // 服务host
                .port(Integer.valueOf(getPort())) // 服务端口
                .rpcType(RpcTypeEnum.HTTP.getName()) // rpc类型
                .eventType(EventType.REGISTER) // 事件类型
                .build();
    } catch (ShenyuException e) {
        throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");
    }
}

可以看出来URI跟接口类没有关系,一个后端服务实例生成一个URI。

1.2.2、构建元数据并写入Disruptor

之后遍历每个接口构建元数据beans.forEach(this::handle)

/**
 * 构建元数据并写入Disruptor
 */
protected void handle(final String beanName, final T bean) {
    Class<?> clazz = getCorrectedClass(bean);
    // 获取当前bean的对应shenyu客户端的注解,比如http是@ShenyuSpringMvcClient, 
    // dubbo是@ShenyuDubboClient
    final A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());
    // 获取bean对应的path(类上注解的路径),由子类实现
    final String superPath = buildApiSuperPath(clazz, beanShenyuClient);
    // 如果有shenyu客户端注解并且path中包含*,则表示要注册整个类的方法,只需要构建一个类元数据
    if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
        // 由具体的子类构建类元数据写入Disruptor
        handleClass(clazz, bean, beanShenyuClient, superPath);
        return;
    }
    // 类上没有shenyu客户端注解(类上没有注解,但方法上有注解,也是可以注册的),
    // 或者有注解但是path没有包含*,则就要遍历每个方法,为每个需要注册的方法构建方法元数据
    final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
    for (Method method : methods) {
        // 由具体子类构建方法元数据写入Disruptor,并将每个method对应的元数据对象缓存在当前类里
        handleMethod(bean, clazz, beanShenyuClient, method, superPath);
    }
}

protected void handleClass(final Class<?> clazz,
                           final T bean,
                           @NonNull final A beanShenyuClient,
                           final String superPath) {
    publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}

protected void handleMethod(final T bean,
                            final Class<?> clazz,
                            @Nullable final A beanShenyuClient,
                            final Method method,
                            final String superPath) {
    // 如果方法上有Shenyu客户端注解,就表示该方法需要注册
    A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());
    if (Objects.nonNull(methodShenyuClient)) {
        final MetaDataRegisterDTO metaData = buildMetaDataDTO(bean, methodShenyuClient,
                buildApiPath(method, superPath, methodShenyuClient), clazz, method);
        publisher.publishEvent(metaData);
        metaDataMap.put(method, metaData);
    }
}
// 获取接口对应路径,如果shenyu注解上没有,就用@RequestMapping上的路径,
// 但是这个只支持第一个路径
@Override
protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {
    if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {
        return beanShenyuClient.path();
    }
    RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);
    // Only the first path is supported temporarily
    if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
        return requestMapping.path()[0];
    }
    return "";
}

// springmvc接口上需要有 ShenyuSpringMvcClient 注解,
// 并且包含RequestMapping注解(表示是一个接口),才进行注册
protected void handleMethod(final Object bean, final Class<?> clazz,
                            @Nullable final ShenyuSpringMvcClient beanShenyuClient,
                            final Method method, final String superPath) {
    final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);
    ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);
    methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
    // 如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解(表示是一个接口),则进行注册
    if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {
        // 构建元数据
        final MetaDataRegisterDTO metaData = buildMetaDataDTO(bean, methodShenyuClient,
                // 构建path = contextPath + 类上的路径 + 方法上的路径                                        
                buildApiPath(method, superPath, methodShenyuClient), clazz, method);
        // 发布元数据
        getPublisher().publishEvent(metaData);
        getMetaDataMap().put(method, metaData);
    }
}

// path = contextPath + 类上的路径 + 方法上的路径,
// 如果@ShenyuSpringMvcClient注解上的路径不为空,则方法上的路径=@ShenyuSpringMvcClient上的value,
// 否则,方法上的路径=@RequestMapping上的value
@Override
protected String buildApiPath(final Method method, final String superPath,
                              @NonNull final ShenyuSpringMvcClient methodShenyuClient) {
    String contextPath = getContextPath();
    if (StringUtils.isNotBlank(methodShenyuClient.path())) {
        return pathJoin(contextPath, superPath, methodShenyuClient.path());
    }
    final String path = getPathByMethod(method);
    if (StringUtils.isNotBlank(path)) {
        return pathJoin(contextPath, superPath, path);
    }
    return pathJoin(contextPath, superPath);
}

1.2.3、Disruptor消费数据并向shenyu-admin注册数据

上面启动Disruptor的时候说到QueueConsumer实现了WorkHandler接口,是Disruptor的消费者,消费逻辑就在它的onEvent方法中:

public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
    
    private final OrderlyExecutor executor;
    
    private final QueueConsumerFactory<T> factory;
  
    public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {
        this.executor = executor;
        this.factory = factory;
    }
    
    @Override
    public void onEvent(final DataEvent<T> t) {
        if (Objects.nonNull(t)) {
            // 根据事件类型使用不同的线程池
            ThreadPoolExecutor executor = orderly(t);
            // 通过工厂创建队列消费任务 RegisterClientConsumerExecutor
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            // 为消费任务设置数据
            queueConsumerExecutor.setData(t.getData());
            t.setData(null);
            // 放在线程池中执行 消费任务
            executor.execute(queueConsumerExecutor);
        }
    }

    // ...
}

QueueConsumerExecutor是实现了Runnable的消费任务,它有两个实现:

  • RegisterClientConsumerExecutor:客户端消费者任务
  • RegisterServerConsumerExecutor:服务端消费者任务

从名字也可以看出,RegisterClientConsumerExecutor负责处理客户端任务,shenyu客户端将元数据URI写入disruptor后由这个消费者任务来消费数据,执行实际向注册中心注册的操作。RegisterServerConsumerExecutor负责处理服务端(shenyu-admin)任务,服务端从注册中心监听到元数据URI后写入disruptor,然后由RegisterServerConsumerExecutor任务来消费数据,处理数据入库操作和发布事件。
RegisterClientConsumerExecutor的消费逻辑:

public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
    
    private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
    
    private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
        this.subscribers = new EnumMap<>(executorSubscriberMap);
    }

    @Override
    public void run() {
        // 获取数据
        final T data = getData();
        // 根据数据类型获取对应的处理器进行处理,即在disruptor启动的时候添加的订阅器
        subscribers.get(data.getType()).executor(Lists.newArrayList(data));
    }

    // ... 
}

根据不同的数据类型使用不同的订阅器执行器去执行,这些订阅器是在disruptor启动的时候设置的。目前注册的数据类型有3种,元数据URIAPI文档

public enum DataType {
    
    /**
     * Meta data data type enum.
     */
    META_DATA,
    
    /**
     * Uri data type enum.
     */
    URI,

    /**
     * Api doc type enum.
     */
    API_DOC,
}

所以相对应的订阅器也分为3类,分别处理元数据,URI和API文档。在客户端和服务端分别有两个,所以一共是6个。
在这里插入图片描述

元数据处理

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
    
    private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;
    
    // ...

    /**
     * 遍历元数据,对数据注册到注册中心
     */
    @Override
    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
        for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
            // 调用响应注册中心的客户端注册类注册元数据
            shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
        }
    }
}

遍历数据,然后又将数据委托给ShenyuClientRegisterRepository执行。ShenyuClientRegisterRepository是在一开始读取配置的时候就创建了,是客户端注册类,用来将数据发送到注册中心的类,不同的注册方式有不同的实现类,该示例使用zookeeper方式注册(shenyu.register.registerType=zookeeper)的实现类是ZookeeperClientRegisterRepository

@Override
public void persistInterface(final MetaDataRegisterDTO metadata) {
    // 后端服务rpc类型
    String rpcType = metadata.getRpcType();
    // contextPath = StringUtils.isEmpty(contextPath) ? appName : contextPath
    String contextPath = ContextPathUtils.buildRealNode(metadata.getContextPath(), metadata.getAppName());
    // 注册元数据
    registerMetadata(rpcType, contextPath, metadata);
}

private void registerMetadata(final String rpcType,
                              final String contextPath,
                              final MetaDataRegisterDTO metadata) {
    // 构建元数据节点名称
    String metadataNodeName = buildMetadataNodeName(metadata);
    // 构建元数据的整体父路径 /shenyu/register/metadata/${rpcType}/${contextPath}
    String metaDataPath = RegisterPathConstants.buildMetaDataParentPath(rpcType, contextPath);
    // 当前元数据在zookeeper中的实际路径,上面两个拼起来
    String realNode = RegisterPathConstants.buildRealNode(metaDataPath, metadataNodeName);
    // 防止同一个元数据添加多次
    synchronized (metadataSet) {
        if (metadataSet.contains(realNode)) {
            return;
        }
        metadataSet.add(realNode);
    }
    // 使用客户端类往zookeeper添加数据,元数据是永久节点
    client.createOrUpdate(realNode, metadata, CreateMode.PERSISTENT);
    LOGGER.info("{} zookeeper client register metadata success: {}", rpcType, metadata);
}

clientshenyuzookeeper操作接口的封装

public void createOrUpdate(final String key, final Object value, final CreateMode mode) {
    if (value != null) {
        // 元数据以json字符串形式存储
        String val = GsonUtils.getInstance().toJson(value);
        createOrUpdate(key, val, mode);
    } else {
        createOrUpdate(key, "", mode);
    }
}

public void createOrUpdate(final String key, final String value, final CreateMode mode) {
    String val = StringUtils.isEmpty(value) ? "" : value;
    try {
        // 使用Curator的API往zookeeper保存数据
        client.create().orSetData().creatingParentsIfNeeded().withMode(mode).forPath(key, val.getBytes(StandardCharsets.UTF_8));
    } catch (Exception e) {
        throw new ShenyuException(e);
    }
}

注册方式比较简单,将元数据的json文本设置到zookeeper对应的路径节点中。

URI处理

public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
 
    @Override
    public void executor(final Collection<URIRegisterDTO> dataList) {
        for (URIRegisterDTO uriRegisterDTO : dataList) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            // 这里的逻辑是为了探测客户端是否已经启动
            while (true) {
                try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
                    break;
                } catch (IOException e) {
                    long sleepTime = 1000;
                    // maybe the port is delay exposed
                    if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
                        LOG.error("host:{}, port:{} connection failed, will retry",
                                uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
                        // If the connection fails for a long time, Increase sleep time
                        if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
                            sleepTime = 10000;
                        }
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepTime);
                    } catch (InterruptedException ex) {
                        LOG.error("interrupted when sleep", ex);
                    }
                }
            }
            ShenyuClientShutdownHook.delayOtherHooks();
            // 向注册中心注册URI数据
            shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
            // 优雅停机
            ShutdownHookManager.get().addShutdownHook(new Thread(() -> {
                final URIRegisterDTO offlineDTO = new URIRegisterDTO();
                BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);
                offlineDTO.setEventType(EventType.OFFLINE);
                shenyuClientRegisterRepository.offline(offlineDTO);
            }), 2);
        }
    }
}

URI注册逻辑跟元数据的一样

@Override
public void persistURI(final URIRegisterDTO registerDTO) {
    // 后端服务rpc类型
    String rpcType = registerDTO.getRpcType();
    // contextPath = StringUtils.isEmpty(contextPath) ? appName : contextPath
    String contextPath = ContextPathUtils.buildRealNode(registerDTO.getContextPath(), registerDTO.getAppName());
    // 注册URI
    registerURI(rpcType, contextPath, registerDTO);
    LOGGER.info("{} zookeeper client register uri success: {}", rpcType, registerDTO);
}

private synchronized void registerURI(final String rpcType, final String contextPath, final URIRegisterDTO registerDTO) {
    // uri节点名称 ${ip:port}
    String uriNodeName = buildURINodeName(registerDTO);
    // uri父路径 /shenyu/register/uri/{rpcType}/${contextPath}
    String uriPath = RegisterPathConstants.buildURIParentPath(rpcType, contextPath);
    // uri的完整路径,上面两个拼起来
    String realNode = RegisterPathConstants.buildRealNode(uriPath, uriNodeName);
    // uri节点数据
    String nodeData = GsonUtils.getInstance().toJson(registerDTO);
    nodeDataMap.put(realNode, nodeData);
    // 往zookeeper设置uri数据,uri节点是临时节点
    client.createOrUpdate(realNode, nodeData, CreateMode.EPHEMERAL);
}

分析到这里就将客户端的注册逻辑分析完了,通过读取自定义的注解信息构造元数据URI,将数据发到Disruptor队列,然后从队列中消费数据,将数据写到Zookeeper节点中。Zookeeper存储结构如下:

shenyu
   ├──regsiter
   ├    ├──metadata
   ├    ├     ├──${rpcType}
   ├    ├     ├      ├────${contextPath}
   ├    ├     ├               ├──${ruleName} : save metadata data of MetaDataRegisterDTO
   ├    ├──uri
   ├    ├     ├──${rpcType}
   ├    ├     ├      ├────${contextPath}
   ├    ├     ├               ├──${ip:port} : save uri data of URIRegisterDTO
   ├    ├     ├               ├──${ip:port}

2、服务端注册流程

2.1、读取配置

客户端是将数据注册到注册中心上,所以服务端(shenyu-admin)自然也是要从注册中心中监听数据的。注册中心配置类是RegisterCenterConfiguration,我们先看这个配置类:

/**
 * 注册中心配置类
 */
@Configuration
public class RegisterCenterConfiguration {

    /**
     * 读取shenyu.register配置
     */
    @Bean
    @ConfigurationProperties(prefix = "shenyu.register")
    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
        return new ShenyuRegisterCenterConfig();
    }
    
    /**
     * 创建用于服务端的注册类,从注册中心中监听数据,然后将数据写入Disruptor队列中
     */
    @Bean(destroyMethod = "close")
    public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,
                                                                               final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
        // 从配置中获取注册类型
        String registerType = shenyuRegisterCenterConfig.getRegisterType();
        // 根据注册类型通过SPI方式创建对应的ShenyuClientServerRegisterRepository
        ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);
        // 创建Disruptor发布者
        RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();
        // 每种客户端类型(rpc类型)的处理类
        Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, Function.identity()));
        // 启动Disruptor,添加元数据和URI的订阅器
        publisher.start(registerServiceMap);
        // 初始化注册中心
        registerRepository.init(publisher, shenyuRegisterCenterConfig);
        return registerRepository;
    }
}

该配置类创建了2个bean:

  • ShenyuRegisterCenterConfigshenyu-admin注册中心配置,读取shenyu.register属性配置。
  • ShenyuClientServerRegisterRepository:服务端注册类,用于从注册中心中监听数据,然后将数据写入Disruptor队列中。

这里的创建Disruptor发布者,启动Disruptor等逻辑跟在客户端那边的一样,只是类是服务端这边的,就不再分析了。

2.1.1、用于监听的ShenyuClientServerRegisterRepository

上面生成的ShenyuClientServerRegisterRepository是用于实现服务端注册的接口,会根据注册中心的配置通过SPI方式创建注册类,每一个注册方式都对应一个实现类。

目前支持7种注册类型:

  • Http:ShenyuClientHttpRegistryController
  • Apollo:ApolloClientServerRegisterRepository
  • Zookeeper:ZookeeperClientServerRegisterRepository
  • Etcd:EtcdClientServerRegisterRepository
  • Nacos:NacosClientServerRegisterRepository
  • Consul:ConsulClientServerRegisterRepository
  • Polaris:PolarisClientServerRegisterRepository
    加载类型通过registerType指定,也就是我们在配置文件中指定的类型:
shenyu:
  register:
    registerType: zookeeper
    serverLists: 127.0.0.1:2181

服务端的注册类型必须跟客户端的注册类型一致,这样服务端才可以监听到注册信息。这里要指定的是zookeeper,所以这里创建的就是ZookeeperClientServerRegisterRepository
初始化ZookeeperClientServerRegisterRepository时会对zookeeper进行监听

@Join
public class ZookeeperClientServerRegisterRepository implements ShenyuClientServerRegisterRepository {

    private ShenyuClientServerRegisterPublisher publisher;

    private ZookeeperClient client;

    @Override
    public void init(final ShenyuClientServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
        this.init(config);
        this.publisher = publisher;

        Properties props = config.getProps();
        int sessionTimeout = Integer.parseInt(props.getProperty("sessionTimeout", "3000"));
        int connectionTimeout = Integer.parseInt(props.getProperty("connectionTimeout", "3000"));

        int baseSleepTime = Integer.parseInt(props.getProperty("baseSleepTime", "1000"));
        int maxRetries = Integer.parseInt(props.getProperty("maxRetries", "3"));
        int maxSleepTime = Integer.parseInt(props.getProperty("maxSleepTime", String.valueOf(Integer.MAX_VALUE)));

        ZookeeperConfig zkConfig = new ZookeeperConfig(config.getServerLists());
        zkConfig.setBaseSleepTimeMilliseconds(baseSleepTime)
        .setMaxRetries(maxRetries)
        .setMaxSleepTimeMilliseconds(maxSleepTime)
        .setSessionTimeoutMilliseconds(sessionTimeout)
        .setConnectionTimeoutMilliseconds(connectionTimeout);

        String digest = props.getProperty("digest");
        if (!StringUtils.isEmpty(digest)) {
            zkConfig.setDigest(digest);
        }

        // 创建zookeeper客户端
        this.client = new ZookeeperClient(zkConfig);

        // 启动客户端
        client.start();

        // 初始化订阅
        initSubscribe();
    }

    private void initSubscribe() {
        // 订阅元数据节点,由于是按rpc类型分类的,所以需要分别监听这几个rpc节点
        RpcTypeEnum.acquireSupportMetadatas().forEach(rpcTypeEnum -> subscribeMetaData(rpcTypeEnum.getName()));
        // 订阅URI节点,由于是按rpc类型分类的,所以需要分别监听这几个rpc节点
        RpcTypeEnum.acquireSupportURIs().forEach(rpcTypeEnum -> subscribeURI(rpcTypeEnum.getName()));
    }

    // 订阅URI
    private void subscribeURI(final String rpcType) {
        // /shenyu/register/uri/${rpcType}
        String contextPathParent = RegisterPathConstants.buildURIContextPathParent(rpcType);
        // 添加监听
        client.addCache(contextPathParent, new URICacheListener());
    }

    // 订阅元数据节点
    private void subscribeMetaData(final String rpcType) {
        // /shenyu/register/metadata/${rpcType}
        String contextPathParent = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
        // 添加监听
        client.addCache(contextPathParent, new MetadataCacheListener());
    }

    // ...
}

2.2、注册元数据和URI

2.2.1、监听数据变更并写入Disruptor

前面分析到服务端启动初始化的时候,会对zookeeper节点进行监听,zookeeper节点结构如下:

shenyu
   ├──regsiter
   ├    ├──metadata
   ├    ├     ├──${rpcType}
   ├    ├     ├      ├────${contextPath}
   ├    ├     ├               ├──${ruleName} : save metadata data of MetaDataRegisterDTO
   ├    ├──uri
   ├    ├     ├──${rpcType}
   ├    ├     ├      ├────${contextPath}
   ├    ├     ├               ├──${ip:port} : save uri data of URIRegisterDTO
   ├    ├     ├               ├──${ip:port}

每一个rpcType节点都会由一个监听器,当它下面的节点变更的时候,会接收到变更的信息。

abstract static class AbstractRegisterListener implements TreeCacheListener {
    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) {
        ChildData childData = event.getData();
        if (null == childData) {
            return;
        }
        // 变更的路径
        String path = childData.getPath();
        if (Strings.isNullOrEmpty(path)) {
            return;
        }
        event(event.getType(), path, childData);
    }

    protected abstract void event(TreeCacheEvent.Type type, String path, ChildData data);
}

// 元数据注册监听
class MetadataCacheListener extends AbstractRegisterListener {
    @Override
    public void event(final TreeCacheEvent.Type type, final String path, final ChildData data) {
        // 如果不是"/shenyu/register"开头的路径,则略过
        if (!path.contains(RegisterPathConstants.ROOT_PATH)) {
            return;
        }
        Optional.ofNullable(data)
                .ifPresent(e -> {
                    String str = new String(data.getData(), StandardCharsets.UTF_8);
                    // 往disruptor写入元数据
                    publishMetadata(str);
                    LOGGER.info("zookeeper register metadata success: {}", str);
                });
    }
}

// URI注册和下线监听
class URICacheListener extends AbstractRegisterListener {

    @Override
    public void event(final TreeCacheEvent.Type type, final String path, final ChildData data) {
        // 不是叶子节点,即不是URI节点,则略过
        if (data.getData() == null || data.getData().length == 0) {
            return;
        }
        // 将节点的数据转为URI对象
        URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance()
                .fromJson(new String(data.getData()), URIRegisterDTO.class);
        if (uriRegisterDTO == null) {
            return;
        }
        switch (type) {
            case NODE_ADDED: // 服务注册
                uriRegisterDTO.setEventType(EventType.REGISTER);
                // 发布URI
                publishRegisterURI(Arrays.asList(uriRegisterDTO));
                break;
            case NODE_REMOVED: // 服务下线
                uriRegisterDTO.setEventType(EventType.OFFLINE);
                // 往disruptor写入URI
                publishRegisterURI(Arrays.asList(uriRegisterDTO));
                break;
            default:
                break;
        }
    }
}

监听到元数据URI变更后都是直接写入disruptor队列。

2.2.2、Disruptor消费数据并持久化

QueueConsumer实现了WorkHandler接口,是Disruptor的消费者,消费逻辑就在它的onEvent方法中:

public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
    
    private final OrderlyExecutor executor;
    
    private final QueueConsumerFactory<T> factory;
    
    /**
     * Instantiates a new Queue consumer.
     *
     * @param executor the executor
     * @param factory  the factory
     */
    public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {
        this.executor = executor;
        this.factory = factory;
    }
    
    @Override
    public void onEvent(final DataEvent<T> t) {
        if (Objects.nonNull(t)) {
            // 根据事件类型使用不同的线程池
            ThreadPoolExecutor executor = orderly(t);
            // 通过工厂创建队列消费任务 RegisterServerConsumerExecutor
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            // 为消费任务设置数据
            queueConsumerExecutor.setData(t.getData());
            t.setData(null);
            // 放在线程池中执行 消费任务
            executor.execute(queueConsumerExecutor);
        }
    }

    // ...
}

分析客户端注册流程的时候说到RegisterServerConsumerExecutor是服务端消费者任务,处理数据入库操作和发布事件。
RegisterServerConsumerExecutor消费逻辑:

public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> {

    // 每种数据类型的订阅器执行器
    private final Map<DataType, ExecutorSubscriber<DataTypeParent>> subscribers;
    
    private RegisterServerConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> executorSubscriberMap) {
        this.subscribers = new HashMap<>(executorSubscriberMap);
    }
    
    @Override
    public void run() {
        Collection<DataTypeParent> results = getData()
                .stream()
                .filter(this::isValidData)
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(results)) {
            return;
        }
        // 选择对应的数据类型的订阅器执行器去执行
        selectExecutor(results).executor(results);
    }
    
    private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
        final Optional<DataTypeParent> first = list.stream().findFirst();
        return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
    }
    
    // ...
}

根据不同的数据类型使用不同的订阅器执行器去执行,这些订阅器是在disruptor启动的时候设置的。
服务端的订阅器有3个,分别为MetadataExecutorSubscriberURIRegisterExecutorSubscriberApiDocExecutorSubscriber,分别处理元数据URIAPI文档

元数据的处理

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {

    // 每种客户端类型的注册服务
    private final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService;

    public MetadataExecutorSubscriber(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
        this.shenyuClientRegisterService = shenyuClientRegisterService;
    }

    @Override
    public DataType getType() {
        return DataType.META_DATA;
    }

    @Override
    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
        // 遍历元数据
        metaDataRegisterDTOList.forEach(meta -> {
            // 根据客户端类型
            Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType()))
                    .ifPresent(shenyuClientRegisterService -> {
                        // 加锁,保证数据顺序执行,防止并发
                        synchronized (shenyuClientRegisterService) {
                            // 处理数据
                            shenyuClientRegisterService.register(meta);
                        }
                    });
        });
    }
}

ShenyuClientRegisterService是注册方法接口,它有多个实现类:
在这里插入图片描述

  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
  • ShenyuClientRegisterDivideServiceImpl:divide类,处理http注册类型;
  • ShenyuClientRegisterDubboServiceImpl:dubbo类,处理dubbo注册类型;
  • ShenyuClientRegisterGrpcServiceImpl:gRPC类,处理gRPC注册类型;
  • ShenyuClientRegisterBrpcServiceImpl:bRPC类,处理bRPC注册类型;
  • ShenyuClientRegisterMotanServiceImpl:Motan类,处理Motan注册类型;
  • ShenyuClientRegisterSofaServiceImpl:Sofa类,处理Sofa注册类型;
  • ShenyuClientRegisterSpringCloudServiceImpl:SpringCloud类,处理SpringCloud注册类型;
  • ShenyuClientRegisterTarsServiceImpl:Tars类,处理Tars注册类型;
  • ShenyuClientRegisterWebSocketServiceImpl:Websocket类,处理Websocket注册类型;

每一种rpc类型都对应一个注册处理类,本实例后端服务是http接口,所以是使用ShenyuClientRegisterDivideServiceImpl来处理。

public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

    @Resource
    private ApplicationEventPublisher eventPublisher;

    // 这几个就是操作数据库的service
    @Resource
    private SelectorService selectorService;

    @Resource
    private MetaDataService metaDataService;

    @Resource
    private RuleService ruleService;

    @Override
    public String register(final MetaDataRegisterDTO dto) {
        
        // 1、注册选择器(可以认为一个服务就是一个选择器)
        
        // 选择器执行逻辑,默认情况是空的,需要在控制台另外手动配置
        // 子类实现
        String selectorHandler = selectorHandler(dto);
        // 持久化选择器并发布选择器变更事件(不存在的时候)ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE
        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
        
        // 2、注册规则(可以认为一个元数据就是一个规则,根据path判断是否同一个)
        
        // 规则处理逻辑
        // 子类实现,,都是直接创建一个各自rpc类型的默认逻辑
        String ruleHandler = ruleHandler();
        // 构建规则DTO
        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
        // 持久化规则并发布规则变更事件(不存在的时候)ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE
        ruleService.registerDefault(ruleDTO);
        
        // 3、注册元数据,并发布元数据变更事件(已存在,发布元数据更新事件,不存在,发布元数据创建事件)
       
        // 子类实现
        registerMetadata(dto);
        
        // 4、注册contextPath(只有http,springCloud,webSocket类型才有)
        String contextPath = dto.getContextPath();
        if (StringUtils.isNotEmpty(contextPath)) {
            registerContextPath(dto);
        }
        return ShenyuResultMessage.SUCCESS;
    }

}

整个注册处理逻辑可以分为4步:

  1. 注册选择器,构建选择器,默认情况下一个服务就是一个选择器。之后将选择器插入数据库并发布选择器变更事件。
@Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
    // 以contextPath或appName作为选择器名称
    String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
    // 根据选择器名和插件名从数据库中查询选择器
    SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
    // 如果还不存在,就创建一个选择器插入数据库
    if (Objects.isNull(selectorDO)) {
        // 构建选择器DTO
        SelectorDTO selectorDTO = SelectorUtil.buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
        selectorDTO.setHandle(selectorHandler);
        // 注册选择器并发布事件 ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE
        return registerDefault(selectorDTO);
    }
    return selectorDO.getId();
}
  1. 注册规则,可以认为一个元数据就是一个规则,根据path判断是否同一个。

构建规则:

private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
    return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}

private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
    // 构建规则DTO
    RuleDTO ruleDTO = RuleDTO.builder()
            .selectorId(selectorId)
            .name(ruleName)
            .matchMode(MatchModeEnum.AND.getCode())
            .enabled(Boolean.TRUE)
            .loged(Boolean.TRUE)
            .matchRestful(Boolean.FALSE)
            .sort(1)
            .handle(ruleHandler)
            .build();

    // 将{xxx}替换成**
    String conditionPath = this.rewritePath(path);
    RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
            .paramType(ParamTypeEnum.URI.getName())
            .paramName("/")
            .paramValue(conditionPath)
            .build();
    // 设置规则条件
    if (conditionPath.endsWith(AdminConstants.URI_SLASH_SUFFIX)) {
        ruleConditionDTO.setOperator(OperatorEnum.STARTS_WITH.getAlias());
    } else if (conditionPath.endsWith(AdminConstants.URI_SUFFIX)) {
        ruleConditionDTO.setOperator(OperatorEnum.PATH_PATTERN.getAlias());
    } else if (conditionPath.indexOf("*") > 1) {
        ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());
    } else {
        ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());
    }
    ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
    return ruleDTO;
}

保存规则:

@Override
public String registerDefault(final RuleDTO ruleDTO) {
    // 选择器下已经存在同名的规则,则直接返回,什么也不干
    if (Objects.nonNull(ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()))) {
        return "";
    }
    RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
    if (StringUtils.isEmpty(ruleDTO.getId())) {
        // 插入规则
        ruleMapper.insertSelective(ruleDO);
        // 插入规则条件
        addCondition(ruleDO, ruleDTO.getRuleConditions());
    }
    // 发布规则变更事件 ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE
    ruleEventPublisher.onRegister(ruleDO, ruleDTO.getRuleConditions());
    return ruleDO.getId();
}

具体的规则设计建议去看官方文档。

  1. 注册元数据,直接将注册上来的元数据保存
@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
    if (dto.isRegisterMetaData()) {
        MetaDataService metaDataService = getMetaDataService();
        // 根据路径查询元数据时候已存在
        MetaDataDO exist = metaDataService.findByPath(dto.getPath());
        // 已存在,就更新,发布元数据更新事件,不存在,就插入,发布元数据创建事件(用于同步网关)
        metaDataService.saveOrUpdateMetaData(exist, dto);
    }
}
  1. 注册ContextPath,只有httpspringCloudwebSocket类型才有。处理的逻辑在AbstractContextPathRegisterService中。
public abstract class AbstractContextPathRegisterService extends AbstractShenyuClientRegisterServiceImpl {
    
    @Override
    public void registerContextPath(final MetaDataRegisterDTO dto) {
        // 持久化contextPath插件下的选择器并发布选择器变更事件
        String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");
        // 创建规则处理逻辑
        ContextMappingRuleHandle handle = new ContextMappingRuleHandle();
        handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));
        handle.setAddPrefixed(dto.getAddPrefixed());
        // 注册contextPath插件默认的规则,contextPath就是规则名,并发布规则变更事件(用于同步网关)
        getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));
    }
}

URI的处理
URI数据是由URIRegisterExecutorSubscriber订阅器处理:

public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
    
    @Override
    public void executor(final Collection<URIRegisterDTO> dataList) {
        if (CollectionUtils.isEmpty(dataList)) {
            return;
        }
        // 根据rpc类型分类
        final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()
                .filter(data -> StringUtils.isNotBlank(data.getRpcType()))
                .collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));
        for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {
            // 根据不同rpc类型使用对应的shenyuClientRegisterService处理
            final String rpcType = entry.getKey();
            Optional.ofNullable(shenyuClientRegisterService.get(rpcType))
                    .ifPresent(service -> {
                        final List<URIRegisterDTO> list = entry.getValue();
                        // 再以contextPath/appName分类
                        Map<String, List<URIRegisterDTO>> listMap = buildData(list);
                        listMap.forEach((selectorName, uriList) -> {
                            final List<URIRegisterDTO> register = new LinkedList<>();
                            final List<URIRegisterDTO> offline = new LinkedList<>();
                            for (URIRegisterDTO d : uriList) {
                                final EventType eventType = d.getEventType();
                                // 判断是注册类型还是下线类型(服务实例启动和下线)
                                if (Objects.isNull(eventType) || EventType.REGISTER.equals(eventType)) {
                                    // eventType is null, should be old versions
                                    register.add(d);
                                } else if (EventType.OFFLINE.equals(eventType)) {
                                    offline.add(d);
                                }
                            }
                            if (CollectionUtils.isNotEmpty(register)) {
                                // 注册URI
                                service.registerURI(selectorName, register);
                            }
                            if (CollectionUtils.isNotEmpty(offline)) {
                                // 下线URI
                                service.offline(selectorName, offline);
                            }
                        });
                    });
        }
    }
    
    private Map<String, List<URIRegisterDTO>> buildData(final Collection<URIRegisterDTO> dataList) {
        Map<String, List<URIRegisterDTO>> resultMap = new HashMap<>(8);
        for (URIRegisterDTO dto : dataList) {
            String contextPath = dto.getContextPath();
            String key = StringUtils.isNotEmpty(contextPath) ? contextPath : dto.getAppName();
            if (StringUtils.isNotEmpty(key)) {
                if (resultMap.containsKey(key)) {
                    List<URIRegisterDTO> existList = resultMap.get(key);
                    existList.add(dto);
                    resultMap.put(key, existList);
                } else {
                    resultMap.put(key, Lists.newArrayList(dto));
                }
            }
        }
        return resultMap;
    }
}

调到FallbackShenyuClientRegisterServiceregisterURI()方法

@Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
    String result;
    String key = key(selectorName);
    try {
        this.removeFallBack(key);
    	// 注册uri
        result = this.doRegisterURI(selectorName, uriList);
        logger.info("Register success: {},{}", selectorName, uriList);
    } catch (Exception ex) {
        logger.warn("Register exception: cause:{}", ex.getMessage());
        result = "";
        this.addFallback(key, new FallbackHolder(selectorName, uriList));
    }
    return result;
}
FallbackShenyuClientRegisterService是用来异常处理的,然后调用doRegisterURI()做真正处理。
@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
    if (CollectionUtils.isEmpty(uriList)) {
        return "";
    }
    // 查询对应的选择器
    SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
    if (Objects.isNull(selectorDO)) {
        throw new ShenyuException("doRegister Failed to execute,wait to retry.");
    }
  
    // 过滤port或host为空的URI
    List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
    // 由URI构建处理选择器中的handler信息,更新选择器中的handler
    // 应该就是相当于添加上服务实例信息
    String handler = buildHandle(validUriList, selectorDO);
    if (handler != null) {
        selectorDO.setHandle(handler);
        SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
        selectorData.setHandle(handler);
        // 更新数据库
        selectorService.updateSelective(selectorDO);
        // 发布选择器变更事件(用于同步给网关)
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
    }
    return ShenyuResultMessage.SUCCESS;
}

总结就是admin拿到URI数据后,更新选择器中的handler信息,然后写入到数据库,最后发布事件。
更新的就是这里的信息:
在这里插入图片描述

至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler。
在这里插入图片描述

参考资料:
官方博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/246052.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

防火墙在网络安全中的作用有什么?部署模式有什么?

防火墙技术是通过有机结合各类用于安全管理与筛选的软件和硬件设备&#xff0c;帮助计算机网络于其内、外网之间构建一道相对隔绝的保护屏障&#xff0c;以保护用户资料与信息安全性的一种技术。 防火墙在网络安全中的作用主要有以下几个方面&#xff1a; 1.保护网络安全——…

设计模式——适配器模式(结构型)

引言 适配器模式是一种结构型设计模式&#xff0c; 它能使接口不兼容的对象能够相互合作。 问题 假如你正在开发一款股票市场监测程序&#xff0c; 它会从不同来源下载 XML 格式的股票数据&#xff0c; 然后向用户呈现出美观的图表。 在开发过程中&#xff0c; 你决定在程序…

Process On在线绘制流程图

目录 一.ProcessOn 1.1.介绍 1.2.直接网上使用 二.绘制门诊流程图 三.绘制住院流程图 四.绘制药库采购入库流程图 五.绘制OA会议流程图 今天就到这里了哦!!!希望能帮到你哦&#xff01;&#xff01;&#xff01; 一.ProcessOn 1.1.介绍 ProcessOn&#xff08;流程&#…

使用podman管理容器

目录 1.安装及配置podman 2.镜像的命名 3.对镜像重新做标签 4.删除镜像 5.查看镜像的层结构 6.导出和导入镜像 7.创建容器 8.创建一个简单的容器 9.容器的生命周期 10.创建临时容器 11.指定容器中运行的命令 12.创建容器时使用变量 对于初学者来说&#xff0c;不太容易理…

机器学习算法---分类

当然&#xff0c;让我为您提供更详细的机器学习算法介绍&#xff0c;重点在于每种算法的原理、优缺点&#xff0c;并在注意事项中特别提到它们对非平衡数据和高维稀疏数据的适应性。 1. 决策树&#xff08;Decision Trees&#xff09; 原理&#xff1a; 决策树通过学习简单的…

【动态规划】路径问题_不同路径_C++

题目链接&#xff1a;leetcode不同路径 目录 题目解析&#xff1a; 算法原理 1.状态表示 2.状态转移方程 3.初始化 4.填表顺序 5.返回值 编写代码 题目解析&#xff1a; 题目让我们求总共有多少条不同的路径可到达右下角&#xff1b; 由题可得&#xff1a; 机器人位于…

C# 字符串格式化

写在前面 在日常编程中&#xff0c;经常需要对字符串进行格式化操作&#xff0c;以便呈现为不同的格式&#xff0c;满足各种各样的显示需求&#xff0c;C#的字符串格式化参数是非常丰富的&#xff0c;这里做个简单的列举&#xff0c;以供后续参考和延伸。 代码实现 var curr…

WPF 显示PDF、PDF转成图片

1.NuGet 安装 O2S.Components.PDFView4NET.WPF 2.添加组件 工具箱中&#xff0c;空白处 右键&#xff0c;选择项 WPF组件 界面&#xff0c;选择NuGet安装库对面路径下的 O2S.Components.PDFView4NET.WPF.dll 3.引入组件命名空间&#xff0c;并使用 <Windowxmlns"htt…

关于ctf反序列化题的一些见解([MRCTF2020]Ezpop以及[NISACTF 2022]babyserialize)

这里对php反序列化做简单了解 在PHP中&#xff0c;序列化用于存储或传递 PHP 的值的过程中&#xff0c;同时不丢失其类型和结构。 serialize&#xff08;&#xff09; 函数序列化对象后&#xff0c;可以很方便的将它传递给其他需要它的地方&#xff0c;且其类型和结构不会改变…

nvm--node版本管理详细安装和使用教程

1&#xff09;nvm是什么? nvm全英文也叫node.js version management&#xff0c;是一个nodejs的版本管理工具。nodejs是项目开发时所需要的代码库&#xff0c;nvm是nodejs版本管理工具&#xff0c;npm是nodejs包管理工具&#xff1b;nodejs能够使得javascript能够脱离浏览器运…

PyCharm连接远程服务器

要求&#xff1a;PyCharm专业版才支持远程服务 一、创建远程连接 先建立本地与远程服务器之间的SSH连接 1、配置连接 2、建立SSH连接&#xff0c;选择文件传输协议 SFTP 3、设置服务器名&#xff08;可以随意命名&#xff09; 4、配置 SSH连接 点击 172.18.1.202 配置…

Python爬取旅游网站热门景点信息的技术性文章

目录 一、引言 二、准备工作 三、爬取热门景点信息 1、分析网页结构 2、发送HTTP请求 3、解析HTML文档 4、提取所需信息 5、保存数据到文件或数据库 四、优化爬虫程序性能和效率 五、异常处理与日志记录 1、异常处理 2、日志记录 六、安全性与合法性考虑 七、总结…

【EI会议征稿】第五届机械仪表与自动化国际学术会议(ICMIA 2024)

第五届机械仪表与自动化国际学术会议&#xff08;ICMIA 2024&#xff09; The 5th International Conference on Mechanical Instrumentation and Automation 2024年第五届机械仪表与自动化国际学术会议&#xff08;ICMIA 2024&#xff09;定于2024年4月5-7日在中国武汉隆重…

Node.js 的适用场景

目录 前言 适用场景 1. 实时应用 用法 代码 理解 代码示例 理解 3. 微服务架构 用法 代码示例 理解 总结 前言 Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境&#xff0c;它使得 JavaScript 可以脱离浏览器运行在服务器端。Node.js 的出现极大地扩展…

flink源码分析之功能组件(五)-高可用组件

简介 本系列是flink源码分析的第二个系列,上一个《flink源码分析之集群与资源》分析集群与资源,本系列分析功能组件,kubeclient,rpc,心跳,高可用,slotpool,rest,metrics,future。 本文解释高可用组件,包括两项服务,主节点选举和主节点变更通知* 高可用服务常见有两…

23.Java程序设计--基于SSM框架的移动端家庭客栈管理系统的设计与实现

第一章&#xff1a;引言 1.1 背景 客栈业务背景移动端应用需求增长趋势 1.2 研究动机 移动端管理系统的需求SSM框架的选择和优势 1.3 研究目的与意义 提高家庭客栈管理效率移动端解决方案的创新 第二章&#xff1a;相关技术和理论综述 2.1 SSM框架简介 Spring框架Spri…

翻译: ChatGPT Token消耗粗略计算英文就是除以四分之三

在这个视频中&#xff0c;我想带你快速浏览一些例子&#xff0c;以建立对在软件应用中使用大型语言模型的实际成本的直观感受。让我们来看看。这是一些示例价格&#xff0c;用于从不同的大型语言模型获取提示和回应&#xff0c;这些模型对开发者可用。即&#xff0c;如果你在你…

基于vue实现的疫情数据可视化分析及预测系统-计算机毕业设计推荐django

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

flink中如何把DB大表的配置数据加载到内存中对数据流进行增强处理

背景 在处理flink的数据流时&#xff0c;比如处理商品流时&#xff0c;一般我们从kafka中只拿到了商品id&#xff0c;此时我们需要把商品的其他配置信息比如品牌品类等也拿到&#xff0c;此时就需要关联上外部配置表来达到丰富数据流的目的&#xff0c;如果外部配置表很大&…

gitlab下载安装

1.下载 官网rpm包 gitlab/gitlab-ce - Results in gitlab/gitlab-ce 国内镜像 Index of /gitlab-ce/yum/ | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 2.安装 rpm -ivh gitlab-ce-16.4.3-ce.0.el7.x86_64.rpm 3.配置 vim /etc/gitlab/gitlab.rb 将 externa…