SpringCloud Alibaba 深入源码 - Nacos 和 Eureka 的区别(健康检测、服务的拉取和订阅)

目录

一、Nacos 和 Eureka 的区别

1.1、以 Nacos 注册流程来解析区别


一、Nacos 和 Eureka 的区别


1.1、以 Nacos 注册流程来解析区别

a)首先,我们的服务启动时。都会把自己的信息提交给注册中心,然后注册中心就会把信息保存下来.

注册的信息实际上就是一个嵌套 Map,结构为 Map<String, Map<String, Service>>,第一层 key 就是 namespace_id,起到环境隔离的作用. value 由是一个嵌套 Map<String, Service>.

第二层的 key 表示 group 分组,key 就是分组名,value 就是分组下的某一个服务,实际上就是一个类,内部又维护了一个  Map<String,Cluster> .

第三层的 key 就是集群的名称,value 就是  Cluster ,也是一个类,包含了集群的具体信息.  

因为一个集群中可能包含多个实例,也就是具体的节点信息(例如实例的IP、Port、健康状态),那么 Cluster 这个类中又维护了 两个 Set<Instance>,分别是临时实例和非临时实例(此处,Eureka 就没有做区分,只有临时实例).

b)那么当服务消费者要去消费时,就可以从注册中心拉取服务信息.  这个过程也被称为“服务发现”.  但是他这个拉去动作不是每次都要做的(压力太大),而是将拉取到的服务信息缓存到一个列表中,这样接下来的一段时间里,就不用去拉去了,而是直接从缓存列表中拿. 

当然这个缓存一直不更新也不行,因此会每隔 30 秒取重新拉取一次(多长时间不用记,都是可以配置的),进行更新.

c)消费者拿到服务列表后,就可以通过 负载均衡(LoadBalancer)从列表中挑选一个发起远程调用就可以了. 

d)截至目前为止,Nacos 和 Eureka 还没什么太大的差别,那差别在哪呢?差别就在于服务提供者的健康检测机制.

e)在 nacos 中,将服务分成了临时实例和非临时实例:

  • 临时实例:当临时实例进行心跳检测的时候,如果心跳不跳了,nacos 就会把它从服务中直接剔除.  (这里的心跳检测机制和 Eureka 是一样的,但非临时实例就不一样了)
  • 非临时实例:nacos 就不会要求你给我发心跳了,而是通过 nacos 主动发请求询问,定时向实例发送请求:“你还活着吗?”,即使真的挂了,nacos 也仅仅只是把它标记为 不健康,不会剔除,而是等待它恢复健康.

而 Eureka 只提供了心跳模式的健康监测,而没有主动检测功能。

主动询问进行健康检测,效率岂不是很低?

对于非临时实例,所有的健康检测任务都不是立即执行的,都会被放入一个阻塞队列中,如下源码

@Override
public void process(HealthCheckTask task) {
    // 获取所有 非临时实例的 集合
    List<Instance> ips = task.getCluster().allIPs(false);

    if (CollectionUtils.isEmpty(ips)) {
        return;
    }

    for (Instance ip : ips) {
		// 封装健康检测信息到 Beat
        Beat beat = new Beat(ip, task);
        // 放入一个阻塞队列中
        taskQueue.add(beat);
        MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
    }
}

可以看出,检测任务不是立即执行,这里也采用了异步指定的策略,会把任务放到线程池中取执行,如下:

public void run() {
    while (true) {
        try {
            // 处理任务
            processTask();
            // ...
        } catch (Throwable e) {
            SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
        }
    }
}

 通过 processTask 来处理健康检测的任务:

private void processTask() throws Exception {
    // 将任务封装为一个 TaskProcessor,并放入集合
    Collection<Callable<Void>> tasks = new LinkedList<>();
    do {
        Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
        if (beat == null) {
            return;
        }

        tasks.add(new TaskProcessor(beat));
    } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
	// 批量处理集合中的任务
    for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
        f.get();
    }
}

 任务被封装到了TaskProcessor中去执行了,TaskProcessor是一个Callable,其中的call方法:

@Override
public Void call() {
    // 获取检测任务已经等待的时长
    long waited = System.currentTimeMillis() - beat.getStartTime();
    if (waited > MAX_WAIT_TIME_MILLISECONDS) {
        Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
    }
	
    SocketChannel channel = null;
    try {
        // 获取实例信息
        Instance instance = beat.getIp();
		// 通过NIO建立TCP连接
        channel = SocketChannel.open();
        channel.configureBlocking(false);
        // only by setting this can we make the socket close event asynchronous
        channel.socket().setSoLinger(false, -1);
        channel.socket().setReuseAddress(true);
        channel.socket().setKeepAlive(true);
        channel.socket().setTcpNoDelay(true);

        Cluster cluster = beat.getTask().getCluster();
        int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
        channel.connect(new InetSocketAddress(instance.getIp(), port));
		// 注册连接、读取事件
        SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        key.attach(beat);
        keyMap.put(beat.toString(), new BeatKey(key));

        beat.setStartTime(System.currentTimeMillis());

        GlobalExecutor
            .scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
                         "tcp:error:" + e.getMessage());

        if (channel != null) {
            try {
                channel.close();
            } catch (Exception ignore) {
            }
        }
    }

    return null;
}

这差别就像是亲生儿子和非亲生儿子,亲生儿子我还会去主动关怀一下,诶,你还活着么?而非临时实例,就是你不心跳了,就把你扔了~

f)还有一个差别在于服务消费者,Eureka 采用的是定时拉取(每 30 秒一次),那如果在 30 秒内服务提供者挂了,消费肯定是不知道的,因此 Eureka 这里更新的时效性也比较差.

我们的 微服务 定时拉取的基本逻辑就是先从本地缓存读:

  • 如果本地缓存没有,就通过 Nacos 客户端构造请求去 nacos 服务器中读取.
  • 如果本地缓存有,就开启定时更新功能(就是创建一个定时任务,每隔一段时间去拉取一次),并返回缓存结果.

核心源码如下:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    // 由 服务名@@集群名拼接 key
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 读取本地服务列表的缓存,缓存是一个Map,格式:Map<String, ServiceInfo>
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    // 判断缓存是否存在
    if (null == serviceObj) {
        // 不存在,创建空ServiceInfo
        serviceObj = new ServiceInfo(serviceName, clusters);
        // 放入缓存
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        // 放入待更新的服务列表(updatingMap)中
        updatingMap.put(serviceName, new Object());
        // 立即更新服务列表
        updateServiceNow(serviceName, clusters);
        // 从待更新列表中移除
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {
        // 缓存中有,但是需要更新
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish 等待5秒中,待更新完成
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                        .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    // 开启定时更新服务列表的功能
    scheduleUpdateIfAbsent(serviceName, clusters);
    // 返回缓存中的服务信息
    return serviceInfoMap.get(serviceObj.getKey());
}

定时更新方法如下:

public void updateService(String serviceName, String clusters) throws NacosException {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
		// 基于ServerProxy发起远程调用,查询服务列表
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

        if (StringUtils.isNotEmpty(result)) {
            // 处理查询结果
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}


public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    throws NacosException {
	// 准备请求参数
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));
	// 发起请求,地址与API接口一致
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

 而 nacos 这里的消费者不光进行服务的定时拉取,nacos 还会主动进行消息的订阅推送,一旦发现有服务挂了,就立刻推送一条消息给服务消费者,告诉你服务要更新了.

Nacos 具体是通过什么实现消息订阅推送机制呢?

a)首先 PushPeceiver 这个类(我们自己的微服务配置的 Nacos 客户端),会以 UDP 的方式与 Nacos 服务端建立连接,监听 Nacos 服务端推送的服务变更数据.

b)一旦 Nacos 服务列表发生变更,就会发送 UDP 广播给所有的微服务订阅者.

c)当订阅者接收到通知以后,就可以将接收到的服务信息缓存到本地缓存列表.

d)那么之后再拉取服务的时候,会优先从缓存里读取,缓存里有就直接返回缓存,如果没有,再去拉取或者订阅.

PushPeceiver 构造函数如下:

public PushReceiver(HostReactor hostReactor) {
    try {
        this.hostReactor = hostReactor;
        // 创建 UDP客户端
        String udpPort = getPushReceiverUdpPort();
        if (StringUtils.isEmpty(udpPort)) {
            this.udpSocket = new DatagramSocket();
        } else {
            this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
        }
        // 准备线程池
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.push.receiver");
                return thread;
            }
        });
		// 开启线程任务,准备接收变更数据
        this.executorService.execute(this);
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}

PushReceiver 构造函数中基于线程池来运行任务。这是因为 PushReceiver 本身也是一个Runnable,其中的run方法业务逻辑就是:

@Override
public void run() {
    while (!closed) {
        try {
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
			// 接收推送数据
            udpSocket.receive(packet);
			// 解析为json字符串
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
			// 反序列化为对象
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                // 交给 HostReactor去处理
                hostReactor.processServiceJson(pushPacket.data);

                // send ack to server 发送ACK回执,略。。
        } catch (Exception e) {
            if (closed) {
                return;
            }
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

通知数据的处理由交给了 HostReactor 的 processServiceJson 方法:

public ServiceInfo processServiceJson(String json) {
    // 解析出ServiceInfo信息
    ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    // 查询缓存中的 ServiceInfo
    ServiceInfo oldService = serviceInfoMap.get(serviceKey);

    // 如果缓存存在,则需要校验哪些数据要更新
    boolean changed = false;
    if (oldService != null) {
		// 拉取的数据是否已经过期
        if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
            NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                               + serviceInfo.getLastRefTime());
        }
        // 放入缓存
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
		
        // 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例;
        // modHosts:需要修改的实例
        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
            // 发布实例变更的事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(
                serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
            DiskCache.write(serviceInfo, cacheDir);
        }

    } else {
        // 本地缓存不存在
        changed = true;
        // 放入缓存
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        // 直接发布实例变更的事件
        NotifyCenter.publishEvent(new InstancesChangeEvent(
            serviceInfo.getName(), serviceInfo.getGroupName(),
            serviceInfo.getClusters(), serviceInfo.getHosts()));
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    }
	// 。。。
    return serviceInfo;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/339110.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

opencv009 滤波器01(卷积)

图像卷积操作&#xff08;convolution&#xff09;&#xff0c;或称为核操作&#xff08;kernel&#xff09;&#xff0c;是进行图像处理的一种常用手段&#xff0c; 图像卷积操作的目的是利用像素点和其邻域像素之前的空间关系&#xff0c;通过加权求和的操作&#xff0c;实现…

【UEFI基础】EDK网络框架(DNS4)

DNS4 DNS4协议说明 IP地址是一串数据&#xff0c;不便记忆。一般用户在使用TCP/IP协议进行通信时也不使用IP地址&#xff0c;而是使用英文和点号组成的字符串&#xff0c;两者的转换通过DNS&#xff08;Domain Name System&#xff09;来完成。 DNS也有v4和v6版本&#xff0…

kubeadm 安装k8s集群后,master节点notready问题解决方案

使用kubeadm 安装k8s集群后&#xff0c;加载calico cni 网络组件后&#xff0c;master节点notready问题 表现为&#xff1a; 使用命令查看日志&#xff1a;journalctl -f -u kubelet 报错如下&#xff1a; Failed to start ContainerManager failed to initialize top level…

vue3中Fragment特性的一个bug,需要留意的注意事项

vue3中的Fragment 模版碎片特性是什么&#xff0c;简单的理解就是template模板代码不在像vue2中那样必须在根节点在包裹一层节点了。 vue2写法 <template><div><h1>标题</h1><p>正文内容</p></div> </template>vue3写法 &l…

【RT-DETR有效改进】Google | EfficientNetV2一种超轻量又高效的网络 (轻量化网络)

前言 大家好&#xff0c;我是Snu77&#xff0c;这里是RT-DETR有效涨点专栏。 本专栏的内容为根据ultralytics版本的RT-DETR进行改进&#xff0c;内容持续更新&#xff0c;每周更新文章数量3-10篇。 专栏以ResNet18、ResNet50为基础修改版本&#xff0c;同时修改内容也支持Re…

SpikingJelly笔记之IFLIF神经元

文章目录 前言一、脉冲神经元二、IF神经元1、神经元模型2、神经元仿真 三、LIF神经元1、神经元模型2、神经元仿真 总结 前言 记录整合发放(integrate-and-fire, IF)神经元与漏电整合发放(leaky integrate-and-fire, LIF)神经元模型&#xff0c;以及在SpikingJelly中的实现方法…

中期国际1.18黄金市场分析:零售销售强劲增长,美联储降息可能性大幅降低!

金价在周四下跌&#xff0c;其中一个主要原因是美国国债收益率的持续上升。此外&#xff0c;强劲的美国零售销售报告也对金价造成了影响&#xff0c;该报告显示零售销售额大幅上涨&#xff0c;超出预期值&#xff0c;这使得美联储3月份降息的可能性大幅降低。 12月份的消费者价…

Spring Boot 集成 API 文档 - Swagger、Knife4J、Smart-Doc

文章目录 1.OpenAPI 规范2.Swagger: 接口管理的利器3.Swagger 与 SpringFox&#xff1a;理念与实现4.Swagger 与 Knife4J&#xff1a;增强与创新5.案例&#xff1a;Spring Boot 整合 Swagger35.1 引入 Swagger3 依赖包5.2 优化路径匹配策略兼容 SpringFox5.3 配置 Swagger5.4 S…

国产操作系统:VirtualBox安装openKylin-1.0.1虚拟机并配置网络

国产操作系统&#xff1a;VirtualBox安装openKylin-1.0.1虚拟机并配置网络 openKylin 操作系统目前适配支持X86、ARM、RISC-V三个架构的个人电脑、平板电脑及教育开发板&#xff0c;可以满足绝大多数个人用户及开发者的使用需求。适用于在VirtualBox平台上安装openKylin-1.0.1…

不同开发语言在进程、线程和协程的设计差异

不同开发语言在进程、线程和协程的设计差异 1. 进程、线程和协程上的差异1.1 进程、线程、协程的定义1.2 进程、线程、协程的差异1.3 进程、线程、协程的内存成本1.4 进程、线程、协程的切换成本 2. 线程、协程之间的通信和协作方式2.1 python如何实现线程通信&#xff1f;2.2 …

智能小程序多语言适配指南

i18n 配置 启用多语言配置&#xff0c;需开启项目配置&#xff08;project.tuya.json&#xff09;中的 {"i18n": true} 选项。多语言的配置内容存放在小程序开发者平台 多语言管理。 本章节的多语言仅适用于智能小程序。如果您开发的是面板小程序&#xff0c;请查阅…

华南理工大学数字信号处理实验实验二源码(薛y老师)

一、实验目的 ▪ 综合运用数字信号处理的理论知识进行信号分析并利用MATLAB作为编程工具进行计算机实现&#xff0c;从而加 深对所学知识的理解&#xff0c;建立概念。 ▪ 掌握数字信号处理的基本概念、基本理论和基本方法。 ▪ 学会用MATLAB对信号进行分析和处理。 ▪ 用F…

QCustomPlot开源库使用

1.简介 QCustomPlot是用于绘图和数据可视化的Qt C 小部件。它没有进一步的依赖关系&#xff0c;并且有据可查。该绘图库专注于制作美观&#xff0c;出版质量的2D绘图&#xff0c;图形和图表&#xff0c;以及为实时可视化应用程序提供高性能。看一下“ 设置”和“ 基本绘图”教…

【系统调用IO】open、close、read、write、lseek

目录 3 系统调用IO3.1 文件描述符3.1.1 FILE结构体3.2.2 文件描述符 3.3 open、close、read、write、lseek3.3.1 文件权限3.3.2 open3.3.3 close3.3.4 read3.3.5 write3.3.6 lseek3.3.7 代码示例 文件io和标准io的区别 橙色 3 系统调用IO 3.1 文件描述符 3.1.1 FILE结构体 …

链表|数据结构|C语言深入学习

什么是链表 离散&#xff0c;就是“分离的、散开的” 链表是什么样子的&#xff1a; 有限个节点离散分配 彼此间通过指针相连 除了首尾节点&#xff0c;每个节点都只有一个前驱节点和一个后继节点 首节点没有前驱结点&#xff0c;尾节点没有后继节点 基本概念术语&#xf…

2023年12月青少年机器人技术等级考试(二级)理论综合试卷

2023年12月青少年机器人技术等级考试&#xff08;二级&#xff09;理论综合试卷 选择题 第 1 题 单选题 下图中&#xff0c;能够将圆周运动转化为往复摆动的是&#xff1f;&#xff08; &#xff09; A. B. C. D. 第 2 题 单选题 如图&#xff0c;该机械结构可实现的运动…

idea远程服务调试

1. 配置idea远程服务调试 这里以 idea 新 ui 为例&#xff0c;首先点击上面的 debug 旁边的三个小圆点&#xff0c;然后在弹出的框框中选择 “Edit”&#xff0c;如下图所示。 然后进入到打开的界面后&#xff0c;点击左上角的 “” 进行添加&#xff0c;找到 “Remote JVM De…

HTML 入门手册(二)

目录 10-表单 11-input标签 11.1文本框 (text) 11.2密码框 (password) 11.3单选按钮 (radio) 11.4复选框 (checkbox) 11.5普通按钮 11.6提交按钮 (submit) 11.7重置按钮 (reset) 11.8隐藏域 (hidden) 11.9文件上传 (file) 11.10数字输入 (number) 11.11日期输入 (…

有效网络安全意识的正确策略

员工在保护组织资产方面发挥着重要作用。随着威胁形势的不断变化&#xff0c;网络安全意识培训是创建良好安全文化的重要组成部分。 为什么要进行网络安全意识培训&#xff1f; 2022 年&#xff0c; 81% 的组织遭受恶意软件、网络钓鱼和密码攻击&#xff0c;主要针对用户。 …

SpringBoot 异常报告器解析

介绍 SpringBootExceptionReporter用于捕获和处理启动期间的异常&#xff0c;例如应用程序上下文的初始化失败。我们业务中的异常处理一般使用拦截器进行拦截处理业务异常。 异常报告流程解析 框架内实现 reportException实现 FailureAnalyzer介绍 analyze逻辑 FailureAnalys…