spring eureka集群相关问题

一、集群节点信息如何更新?

EurekaServer节点启动的时候,DefaultEurekaServerContext.init()方法调用PeerEurekaNodes.start()方法,start方法中resolvePeerUrls()会从配置文件读取serviceUrl属性值获得集群最新节点信息,通过updatePeerEurekaNodes()方法将最新节点信息更新到PeerEurekaNodes类的属性peerEurekaNodes和peerEurekaNodeUrls中。

PeerEurekaNodes.start()还提供了一个更新节点信息的定时任务,每隔PeerEurekaNodesUpdateIntervalMs (默认10min) 时间执行一次。

针对上述这个过程,需要注意的是(网上很多文章表述有误):peerEurekaNodesUpdateIntervalMs这个参数并不是集群节点注册信息的同步间隔时间。为什么这么说呢?我们可以看上图中的定时任务peersUpdateTask做了什么事情即可,其实只要看updatePeerEurekaNodes(resolvePeerUrls())做了什么即可。

通过源码跟踪:resolvePeerUrls()方法默认即执行了EndpointUtils.getServiceUrlsFromConfig(),即从配置文件读取服务节点urls信息。updatePeerEurekaNodes()方法则把获取得到的urls与之前urls信息进行对比,该新增的放到新增list,该删除的放到删除list,如果既没有新增的节点也没有待删除的节点,则返回不做任何处理。下面是代码:

针对要删除的节点,处理方式是从节点列表移除,同时调用该节点的shutDown方法;针对要新增的节点,则加入到节点列表中,并创建新的节点,调用方法createPeerEurekaNode(String peerEurekaNodeUrl);创建节点最终调用了构造方法:

new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);

上述构造方法会从服务其他节点获取注册服务信息同步到新创建的服务节点中。

那么接下来就要问第二个问题了。

二、注册服务信息如何在集群节点中同步?

注册服务信息在集群节点中的同步有两种场景:一种是新增服务节点如何从其他服务节点同步服务注册信息;另一种是稳定的集群中,各节点之间是如何同步服务注册信息。

我们先来看第一种场景,新增节点如何从其他服务节点同步服务注册信息。

(1)Eureka Server节点启动时的服务同步

Eureka Server启动时,EurekaServerBootStrap调用PeerAwareInstanceRegistryImpl.syncUp(),同步注册服务信息。

// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);

syncUp()进行注册服务信息的同步:

    /**
     * Populates the registry information from a peer eureka node. This
     * operation fails over to other nodes until the list is exhausted if the
     * communication fails.
     */
    @Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
 
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

首先会从EurekaClient.getApplications获取所有的注册服务信息,然后调用register()进行服务注册。

如果同步失败,则会sleep serverConfig.getRegistrySyncRetryWaitMs()后,再次进行同步。

同步完成后,调用PeerAwareInstanceRegistryImpl.openForTraffic()方法,进行自我保护阀值的计算:

   public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        this.expectedNumberOfClientsSendingRenews = count;
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
    }

至此,Eureka Server启动时的节点复制就进行完了。

备注

最大同步重试次数=serverConfig.getRegistrySyncRetries(),默认5次。

同步失败后每次同步的间隔时间=serverConfig.getRegistrySyncRetryWaitMs(),默认30s。

如果启动时同步服务注册信息失败,一段时间不对外提供服务注册功能,waitTimeInMsWhenSyncEmpty,默认5min。

第二种场景:

(2)Eureka Server接收到Register、renew、cancel时的服务同步

处理Register、renew、cancel同步时比较复杂,包括好几个队列的转换处理以及异常处理。这里算是个简图吧,省略了队列的转换和异常处理。

Eureka Server接收到Register、renew或cancel事件后,执行向其他节点同步:

 /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
 
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

以Register为例(逻辑是一样的),Eureka Server循环遍历其他节点,然后调用node.register(info):

/**
     * Sends the registration information of {@link InstanceInfo} receiving by
     * this node to the peer node represented by this class.
     *
     * @param info
     *            the instance information {@link InstanceInfo} of any instance
     *            that is send to this instance.
     * @throws Exception
     */
    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

这里执行TaskDispatcher的process,实际底层转到了AcceptorExecutor.process():

acceptorExecutor.process(id, task, expiryTime);

然后acceptorExecutor里会开启内部线程AcceptorRunner进行处理,处理逻辑有点复杂(包括队列的转换和异常处理),最终将处理好的结果放到BlockingQueue> batchWorkQueue中。

TaskExecutors中会在内部开启WokerRunnable线程组(ThreadGroup),循环poll batchWorkQueue队列中的Task,然后调用InstanceReplicationTask的execute方法,将register事件推送到其他Eureka Server节点。

public void run() {
     try {
                while (!isShutdown.get()) {
                    List<TaskHolder<ID, T>> holders = getWork();
                    metrics.registerExpiryTimes(holders);
 
                    List<T> tasks = getTasksOf(holders);
                    ProcessingResult result = processor.process(tasks);
                    switch (result) {
                        case Success:
                            break;
                        case Congestion:
                        case TransientError:
                            taskDispatcher.reprocess(holders, result);
                            break;
                        case PermanentError:
                            logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
                    }
                    metrics.registerTaskResult(result, tasks.size());
                }
            } catch (InterruptedException e) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
                logger.warn("Discovery WorkerThread error", e);
            }
        }

register、renew、cancel的处理逻辑一样,只不过调用的同步接口不同罢了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/345987.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

微软 AD 介绍 | 安全建议 | 防护

介绍&#xff1a; 什么是Active Directory&#xff08;AD&#xff09;&#xff1f; Active Directory 是由 微软开发的目录服务&#xff0c;用于存储和管理网络中的资源&#xff0c;如计算机、用户、组和其他网络对象。允许组织管理员轻松地管理和验证网络中的用户和计算机。 …

天津大数据培训班推荐,数据分析过程的常见错误

大数据”是近年来IT行业的热词&#xff0c;目前已经广泛应用在各个行业。大数据&#xff0c;又称海量信息&#xff0c;特点是数据量大、种类多、实时性强、数据蕴藏的价值大。大数据是对大量、动态、能持续的数据&#xff0c;通过运用分析、挖掘和整理&#xff0c;实现数据信息…

项目一:踏上Java开发之旅

文章目录 一、实战概述二、实战步骤任务1&#xff1a;安装配置JDK并开发第一个Java程序步骤一&#xff1a;安装JDK步骤二&#xff1a;配置JDK环境变量步骤三&#xff1a;开发第一个Java程序 课堂练习任务1、打印个人信息任务2、打印直角三角形任务3、打印一颗爱心任务4、打印史…

【jetson笔记】解决vscode远程调试qt.qpa.xcb: could not connect to display报错

配置x11转发 jetson远程安装x11转发 安装Xming Xming下载 安装完成后打开安装目录C:\Program Files (x86)\Xming 用记事本打开X0.hosts文件&#xff0c;添加jetson IP地址 后续IP改变需要重新修改配置文件 localhost 192.168.107.57打开Xlaunch Win菜单搜索Xlaundch打开 一…

论文阅读:Vary-toy论文阅读笔记

目录 引言整体结构图方法介绍训练vision vocabulary阶段PDF数据目标检测数据 训练Vary-toy阶段Vary-toy结构数据集情况 引言 论文&#xff1a;Small Language Model Meets with Reinforced Vision Vocabulary Paper | Github | Demo 说来也巧&#xff0c;之前在写论文阅读&…

Linux启动级别和密码问题文件

1、linux启动级别 如果安装的linux默认带的图形化界面&#xff0c;默认的运行级别为5 graphical.target 因为图形化太耗费资源了&#xff0c;想每次启动的时候&#xff0c;更改它的默认允许级别为命令行&#xff08;文本&#xff09; cat /etc/inittab 修改为命令行 多用户…

Springboot项目启动报错:Command line is too long问题解决

启动项目报错:Error running ‘xxxxxxxx’: Command line is too long. Shorten command line for ‘xxxxxxxx’ or also for Application default configuration 方法一 点击提示中的&#xff1a;default&#xff1a;然后在弹出窗口中选择&#xff1a;JAR xxxx xxx&#xff0…

Django、Flask 与 Javascirpt 之间传值与数据转换

常见问题&#xff1a;JavaScript 如何处理Django、Flask传递的数据库数据 Django 、Flask从数据库读出的数据通常保存为&#xff1a;对象列表、字典列表&#xff0c;或 tuple列表形式 # 用object_list 对象列表表示数据库记录 [<Article: id1,title星际穿越影评,body作为一…

Docker安装常用软件集合

大家好&#xff0c;我是豆豆&#xff0c;今天为大家带来了docker安装常用软件&#xff0c;全是干货&#xff0c;没有多余废话&#xff0c;大家点赞收藏吧&#xff0c;以防备用。 1.linux安装docker 环境安装&#xff1a; yum -y install gcc-c 第一步&#xff1a;安装必要的…

Linux命令大全(超详细版)

一 ~ 四章 【点击此处查看】 五、shell 编程 5.1、shell 概述 5.1.1 shell 是什么 Shell是一个命令行解释器&#xff0c;它为用户提供了一个向Linux内核发送请求以便运行程序的界面系统级程序&#xff0c;用户可以用Shell来启动、挂起、停止甚至是编写一些程序。 Shell还是…

使用Python的pygame库实现迷宫游戏

使用Python的pygame库实现迷宫游戏 关于Python中pygame游戏模块的安装使用可见 https://blog.csdn.net/cnds123/article/details/119514520 先给出效果图&#xff1a; 这个游戏能自动生成迷宫布局。 在这个游戏中&#xff0c;玩家将使用键盘箭头键来移动&#xff0c;并且目标…

Sourcetree 更新git账号密码 |Sourcetree 删除git账号密码 |Sourcetree 添加git账号密码

使用Sourcetree 第一次提交代码到git或者从git拉取代码&#xff0c;有可能因为账号的问题不成功。如果提示无法连接等问题&#xff0c;大概率是账号的问题&#xff0c;这时候你就要检查Sourcetree 上的账号密码是否正确。 1.打开Sourcetree&#xff0c;打开设置&#xff0c; …

【小呆的力学笔记】弹塑性力学的初步认知三:广义胡克定律

文章目录 1.7* 广义胡克定律1.8* 广义胡克定律几种形式 1.7* 广义胡克定律 当材料处于弹性状态时&#xff0c;材料的应变和应力呈现线性关系。比如一根杆受拉伸力F作用&#xff0c;轴向会有伸长&#xff0c;同时横向会缩小&#xff0c;如下图所示。 那么有 σ x F A , ε x…

flask_apscheduler源码分析

前言 遵循flask框架的标准的库&#xff0c;称为flask扩展&#xff0c;flask_apscheduler模块就是一个flask扩展&#xff0c;它使用了flask编程上下文&#xff0c;同时内部完全依赖apscheduler。 我近期使用flask_apscheduler遇到了一个所有job全部死亡的bug。现象&#xff1a;j…

编译PCL Qt程序

使用PCL的qt程序时&#xff0c;提示不是用QVTK编译的&#xff0c;所以需要在编译VTK时打开Qt的编译选项&#xff08;由于CMakeList比较复杂&#xff0c;使用CMakeGui进行配置&#xff0c;PCL同理&#xff09;&#xff0c;编译VTK完成后&#xff0c;编译PCL也需要配置Qt支持&…

数字图像处理(实践篇)二十八 使用OpenCV Python中的K-means对图像进行颜色量化处理

目录 1 颜色量化 2 实践 在某些时候,不可避免的某些设备只能生成有限数量的颜色。因此需要执行颜色量化。选择使用cv2.kmeans()函数对颜色量化应用k-means聚类。 1 颜色量化 使用K-means聚类在图像中实现颜色量化的步骤如下: ① 导入依赖库

css文本溢出处理

<!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>文本溢出处理</title><style>.sing-…

基于FPGA的OFDM基带发射机的设计与实现

文章目录 前言一、OFDM描述二、本系统的实现参照 1.IEEE 802.11a协议主要参数2.不同调制方式与速率 3. IFFT映射关系4. IEEE 802.11a物理层规范5. PPDU帧格式三、设计与实现 1.扰码2.卷积编码与删余3.数据交织4.符号调制5.导频插入6.IFFT变换 7.循环前缀&加窗8.训练序列生成…

HCIA——26E-mall、MIME、POP3、IMAP、电子邮件系统的组成结构、电子邮件的发送,接收过程、MIME 与SMTP 的关系

学习目标&#xff1a; 计算机网络 1.掌握计算机网络的基本概念、基本原理和基本方法。 2.掌握计算机网络的体系结构和典型网络协议&#xff0c;了解典型网络设备的组成和特点&#xff0c;理解典型网络设备的工作原理。 3.能够运用计算机网络的基本概念、基本原理和基本方法进行…

沃通服务器密码机(WTHSM)

概述 沃通服务器密码机&#xff08;WTHSM&#xff09;由沃通CA自主设计开发&#xff0c;严格遵照国密局颁布技术规范&#xff0c;获得国密局颁发《商用密码产品认证证书》&#xff0c;是一款多安全功能、高稳定性、可扩展和快速部署的软硬件集成化安全设备&#xff0c;为应用提…