endpoints控制器源码解析

endpoints controller 的实现原理

本文从源码的角度分析KubeController Attachdetach相关功能的实现。

本篇kubernetes版本为v1.27.3。

kubernetes项目地址: https://github.com/kubernetes/kubernetes

controller命令main入口: cmd/kube-controller-manager/controller-manager.go

controller相关代码目录: pkg/controller
更多文章访问: https://www.cyisme.com

在这里插入图片描述

启动入口

传入 pods, services, endpoints 三种资源的 informer, 实例化 EndpointsController 端点控制器对象.

func startEndpointController(ctx context.Context, controllerCtx ControllerContext) (controller.Interface, bool, error) {
	go endpointcontroller.NewEndpointController(
		controllerCtx.InformerFactory.Core().V1().Pods(),
		controllerCtx.InformerFactory.Core().V1().Services(),
		controllerCtx.InformerFactory.Core().V1().Endpoints(),
	).Run(ctx, int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs))
	return nil, true, nil
}

实例化 EndpointController 控制器, 内部在 services, pods, endpoints 三种资源的 informer 里, 注册 EventHandler 事件回调处理方法.

func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
	endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
	e := &Controller{
		client:           client,
		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
		workerLoopPeriod: time.Second,
	}

	// 监听 services informer, 并注册事件方法
	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: e.onServiceUpdate,
		UpdateFunc: func(old, cur interface{}) {
			e.onServiceUpdate(cur)
		},
		DeleteFunc: e.onServiceDelete,
	})
	e.serviceLister = serviceInformer.Lister()
	e.servicesSynced = serviceInformer.Informer().HasSynced

	// 监听 pods informer, 并注册事件方法
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    e.addPod,
		UpdateFunc: e.updatePod,
		DeleteFunc: e.deletePod,
	})
	e.podLister = podInformer.Lister()
	e.podsSynced = podInformer.Informer().HasSynced

	// 监听 endpoints informer, 并注册事件方法
	endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: e.onEndpointsDelete,
	})
	e.endpointsLister = endpointsInformer.Lister()
	e.endpointsSynced = endpointsInformer.Informer().HasSynced
	return e
}

启动控制器

Run() 用来启动控制器, 先同步各个资源的数据到本地, 然后启动 workers 数量的协程去启动 worker. workers 的数量由 --concurrent-endpoint-syncs 启动参数指定, 默认为 5 个.

func (e *Controller) Run(ctx context.Context, workers int) {
	// 等待 pods, services, endpoints 资源同步完成
	if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {
		return
	}

	// 启动 workers 数量的协程来处理 endpoints
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
	}

	go func() {
		e.checkLeftoverEndpoints()
	}()

	<-ctx.Done()
}

启动 worker 的协程逻辑还是整洁的, 不断的从队列中获取任务, 然后调用 syncService 来同步处理.

func (e *Controller) worker(ctx context.Context) {
	for e.processNextWorkItem(ctx) {
	}
}

func (e *Controller) processNextWorkItem(ctx context.Context) bool {
	// 无任务时, 陷入等待
	eKey, quit := e.queue.Get()
	if quit {
		return false
	}
	defer e.queue.Done(eKey)

	err := e.syncService(ctx, eKey.(string))
	...
	return true
}

infomer eventHandler

实例化 endpointsController 时会对 pod, service, endpoint 资源进行注册 eventHandler 并监听.

下面分析这三种资源对应的 eventHandler 逻辑.

service eventhandler

增删减 serviceSelectorCache 缓存数据, 并把 key 塞入到队列.

func (e *Controller) onServiceUpdate(obj interface{}) {
	key, err := controller.KeyFunc(obj)
	if err != nil {
		return
	}

	_ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
	e.queue.Add(key)
}

func (e *Controller) onServiceDelete(obj interface{}) {
	key, err := controller.KeyFunc(obj)
	if err != nil {
		return
	}

	e.serviceSelectorCache.Delete(key)
	e.queue.Add(key)
}

serviceSelectorCache 用来管理各个 service 的 labels.Selector 缓存, 后面 pod eventHandler 里通过该 cache 获取 pod labels 关联的 services 列表.

type ServiceSelectorCache struct {
	lock  sync.RWMutex
	cache map[string]labels.Selector
}

func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) {
	...
}

func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector {
	...
}

func (sc *ServiceSelectorCache) Delete(key string) {
	...
}

// 遍历并查找适配该pod labels 的 services 集合
func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
	set := sets.String{}
	services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())
	if err != nil {
		return set, err
	}

	var selector labels.Selector
	for _, service := range services {
		if service.Spec.Selector == nil {
			continue
		}
		key, err := controller.KeyFunc(service)
		if err != nil {
			return nil, err
		}
		if v, ok := sc.Get(key); ok {
			selector = v
		} else {
			selector = sc.Update(key, service.Spec.Selector)
		}

		if selector.Matches(labels.Set(pod.Labels)) {
			set.Insert(key)
		}
	}
	return set, nil
}
pod eventHandler

addPod 需要通过 pod 获取关联的 services 对象列表, 然后遍历 servcies 列表把每个 service 都通过延迟入队的方法入队. 不同的 service 通过标签可以关联同一个 pod.

像 updatePod 和 deletePod 逻辑相似, 不再复述.

func (e *Controller) addPod(obj interface{}) {
	pod := obj.(*v1.Pod)
	services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
	if err != nil {
		return
	}
	for key := range services {
		e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
	}
}

func (e *Controller) updatePod(old, cur interface{}) {
	services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur)
	for key := range services {
		e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
	}
}

func (e *Controller) deletePod(obj interface{}) {
	pod := endpointutil.GetPodFromDeleteAction(obj)
	if pod != nil {
		e.addPod(pod)
	}
}
endpoints eventHandler

只在 endpointsInformer 里注册了 DeleteFunc 处理方法.

func (e *Controller) onEndpointsDelete(obj interface{}) {
	key, err := controller.KeyFunc(obj)
	if err != nil {
		return
	}
	e.queue.Add(key)
}

核心处理函数 syncService

主处理流程

syncService 是 endpoints controller 控制里最核心的方法, 逻辑相对其他 controller 控制器还是要简单不少.

  1. 获取 service 对象, 如果没找到该对象, 则尝试向 apiserver 发起删除 endpoints 请求.
  2. 根据 labels 获取 service 对应的 pods 对象
  3. 定义 subset 子集对象, 遍历 pods 列表生成 EndpointSubset 对象, 并合并到 subset 里.
  4. 尝试获取 endpoints 对象, 没有则执行创建 endpoints 操作, 有则执行更新操作.
func (e *Controller) syncService(ctx context.Context, key string) error {
	// 通过 key 拆解 namespace 和 service name 字段.
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	// 获取 service 对象
	service, err := e.serviceLister.Services(namespace).Get(name)
	if err != nil {
		// 其他错误直接抛出异常
		if !errors.IsNotFound(err) {
			return err
		}

		// 向 apiserver 发起删除 service 的 endpoints 的请求
		err = e.client.CoreV1().Endpoints(namespace).Delete(ctx, name, metav1.DeleteOptions{})
		if err != nil && !errors.IsNotFound(err) {
			return err
		}
		return nil
	}

	// 如果 .spec.selector 为空, 则没有关联对象, 可直接返回.
	if service.Spec.Selector == nil {
		return nil
	}

	// 获取 service 对应 labels 的 pods 对象集合.
	pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
	if err != nil {
		return err
	}

	subsets := []v1.EndpointSubset{}
	var totalReadyEps int
	var totalNotReadyEps int

	for _, pod := range pods {
		// 如果 pod 终止状态, 或 podIP 为空, 或已经标记删除, 则直接跳过该 pod. 
		if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
			continue
		}

		// 实例化 v1.EndpointAddress 对象
		ep, err := podToEndpointAddressForService(service, pod)
		if err != nil {
			continue
		}

		epa := *ep
		if len(service.Spec.Ports) == 0 {
			// 在 headless 模式下, 构建一个 v1.EndpointSubset 对象, append 到 subsets 子集里.
			if service.Spec.ClusterIP == api.ClusterIPNone {
				subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
			}
		} else {
			for i := range service.Spec.Ports {
				servicePort := &service.Spec.Ports[i]
				portNum, err := podutil.FindPort(pod, servicePort)
				// 创建一个 endpoint port 对象
				epp := endpointPortFromServicePort(servicePort, portNum)

				// 创建 v1.EndpointSubset 对象, 并 append subsets 子集里.
				// 返回 readyEps 就绪的 pod 数量和 notReadyEps 没就绪的 pod 数量
				var readyEps, notReadyEps int
				subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
				// 累加就绪的数量
				totalReadyEps = totalReadyEps + readyEps
				// 累加还没就绪的数量
				totalNotReadyEps = totalNotReadyEps + notReadyEps
			}
		}
	}
	subsets = endpoints.RepackSubsets(subsets)

	// 从 informer 里尝试获取当前的 endpoints 对象
	currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
	if err != nil {
		if errors.IsNotFound(err) {
			// 如果没有则实例化一个 endpoints 对象
			currentEndpoints = &v1.Endpoints{
				ObjectMeta: metav1.ObjectMeta{
					Name:   service.Name,
					Labels: service.Labels,
				},
			}
		}
	}

	// 如果版本为空, 则需要创建
	createEndpoints := len(currentEndpoints.ResourceVersion) == 0

	newEndpoints.Subsets = subsets
	newEndpoints.Labels = service.Labels

	if createEndpoints {
		// 如果没创建过, 则创建 endpoints 对象
		_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
	} else {
		// 已经存在 ep 对象, 则需要更新
		_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
	}
	...
	return nil
}
哪些 pod 放到 endpoints 里 ?

ShouldPodBeInEndpoints 方法里定义了哪些 pod 会被放到 subset 里, 也就是会被放到 endpoints 集合里, 只有为 true 才会处理.

func ShouldPodBeInEndpoints(pod *v1.Pod, includeTerminating bool) bool {
	// "Terminal" describes when a Pod is complete (in a succeeded or failed phase).
	// This is distinct from the "Terminating" condition which represents when a Pod
	// is being terminated (metadata.deletionTimestamp is non nil).
	if podutil.IsPodTerminal(pod) {
		return false
	}

	if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
		return false
	}

	if !includeTerminating && pod.DeletionTimestamp != nil {
		return false
	}

	return true
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/276288.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

10分钟带你了解分布式系统的补偿机制

我们知道&#xff0c;应用系统在分布式的情况下&#xff0c;在通信时会有着一个显著的问题&#xff0c;即一个业务流程往往需要组合一组服务&#xff0c;且单单一次通信可能会经过 DNS 服务&#xff0c;网卡、交换机、路由器、负载均衡等设备&#xff0c;而这些服务于设备都不一…

在微服务中如何实现全链路的金丝雀发布?

目录 1. 什么金丝雀发布&#xff1f;它有什么用&#xff1f; 2.如何实现全链路的金丝雀发布 2.1 负载均衡模块 2.2 网关模块 2.3 服务模块 2.3.1 注册为灰色服务实例 2.3.2 设置负载均衡器 2.3.3 传递灰度发布标签 2.4 其他代码 2.4.1 其他业务代码 2.4.2 pom.xml 关…

出现频率高达80%的软件测试常见面试题合集(内附详细答案)

最近看到网上流传着各种面试经验及面试题&#xff0c;往往都是一大堆技术题目贴上去&#xff0c;但是没有答案。 为此我业余时间整理了这份软件测试基础常见的面试题及详细答案&#xff0c;望各路大牛发现不对的地方不吝赐教&#xff0c;留言即可。 01 软件测试理论部分 1.1…

SpingBoot的项目实战--模拟电商【1.首页搭建】

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于SpringBoot电商项目的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.项目背景及技术点运用 …

你知道继电保护测试仪的价格是多少吗?

继电保护测试仪是电气设备检测中经常使用的检测仪器。它能准确、快速地检测到每个继电保护装置的一些潜在故障和问题&#xff0c;帮助电力检测工人锁定问题点&#xff0c;使继电保护装置能够正常工作&#xff0c;保护电力需求。继电保护测试仪贵吗&#xff1f;哪些因素影响价格…

链表:如何利用“假头,新指针,双指针”解决链表问题

Java学习面试指南&#xff1a;https://javaxiaobear.cn 链表是一种线性数据结构&#xff0c;其中的每个元素实际上是一个单独的对象&#xff0c;而所有对象都通过每个元素中的引用字段链接在一起。 链表是一种物理存储单元上非连续、非顺序的存储结构&#xff0c;其物理结构不能…

C# Winform教程(二):基础窗口程序

1、介绍 winform应用程序是一种智能客户端技术&#xff0c;我们可以使用winform应用程序帮助我们获得信息或者传输信息等。 2、常用属性 Name&#xff1a;在后台要获得前台的控件对象&#xff0c;需要使用Name属性。 Visible&#xff1a;指示一个控件是否可见、 Enable&…

基于动态窗口的航线规划

MATLAB2016b可以运行 % ------------------------------------------------------------------------- % File : DWA 算法 % Discription : Mobile Robot Motion Planning with Dynamic Window Approach % Author :Yuncheng Jiang % License : Modified BSD Software License A…

MySQL按月分片

一、按照月分片 使用场景为按照自然月来分片,每个自然月为一个分片,但是一年有12个月,是不是要有12个数据节点才行呢?并不是。例如我现在只有三个分片数据库,这样就可以1月在第一个数据分片中,2月在第二个数据分片中,3月在第三个数据分片中,当来到4月的时候,就会重新开…

echarts中给图表X轴和Y轴加单位以及给tooltip(提示框)增加单位

左边没有单位&#xff0c;右图是增加单位的效果。 1.x轴y轴设置单位 增加单位不管是x轴还是y轴都可以设置name字段&#xff0c;设置完name后效果是红色箭头效果。如果想要蓝色箭头效果可以使用x轴y轴的都有的 axisLabel 属性里面有formatter配置项&#xff0c;formatter支持字…

Python【json模块常用函数】

json模块常用函数 json模块是Python标准库中的一个内置模块&#xff0c;用于处理JSON&#xff08;JavaScript Object Notation&#xff09;格式的数据。它提供了一组函数来解析、序列化和操作JSON数据。 下面是json模块中常用的几个函数&#xff1a; .loads() 用于将JSON字…

YOLOv5-Lite 树莓派4B 15帧教程

【前言】 由于v5Lite仓库遗漏了不少历史问题&#xff0c;最大的问题是毕业后卷起来了&#xff0c;找不到时间更新。 上面是这篇博客的背景&#xff0c;那么先说下结论&#xff0c;使用 v5lite-e 模型&#xff0c;在 树莓派4B&#xff08;4G内存&#xff09; 上&#xff0c;有三…

C#高级 02异步编程

基础知识 1.什么是异步任务 包含了异步任务的各种状态的一个引用类型 1)正在运行、完成、结果、报错等 2)另有ValueTask值类型版本对于异步任务的抽象 1)开启异步任务后&#xff0c;当前线程并不会阻塞&#xff0c;而是可以去做其他事情 2)异步任务&#xff08;默认&#xff…

两张图片沿着斜对角线合并成一张图片

在图像融合领域&#xff0c;论文中的对比算法可视化&#xff0c;需要将红外图像和可见光图像沿着斜对角线合并成一张图片。 红外与可见光图像举例&#xff1a; 然后做出这样的效果&#xff1a; 用Python的PIL库&#xff0c;将两张图片沿着斜对角线合并成一张图片。 from PIL …

【Python基础篇】【19.异常处理】(附案例,源码)

异常处理 异常处理常见异常elsefinallyraise获取异常信息sys.exc_info()traceback 处理异常基本原则assert断点调试两种方式Debugger窗口各图标的含义1.Show Execution Point &#xff08;Alt F10&#xff09;2.Step Over&#xff08;F8&#xff09;3.Step Into &#xff08;F…

GBASE南大通用常用错误代码

错误代码为 GBASE南大通用Server 返回给应用的错误编号&#xff0c;用于唯一的标识一个错误。错误码在 GBaseErrorCode 枚举中定义。 下表仅提供通过 GBASE南大通用数据库返回给应用的常用错误码及错误描述的参考&#xff0c; 具体错误码请参考 GBase 数据库相关手册。

活动回顾 (下) | 机器学习系统趋势研判,大咖金句汇总

作者&#xff1a;三羊、李宝珠、李玮栋、Yudi、xixi 编辑&#xff1a;李宝珠 在大模型时代的浪潮中&#xff0c;机器学习系统正经历着前所未有的变革。模型规模的急剧膨胀&#xff0c;让我们见证了 AI 能力的巨大提升&#xff0c;然而这种提升不仅为各个领域带来了新的机遇&…

汇编语言学习中的Dosbox自动配置方法

学到期末才发现可以自动配置 一、先找到dosbox的下载/安装路径 二、打开其下的Dosbox *.**(这里是版本号) Options.bat 三、在其打开的文件的最下面输入你经常打开dosbox要输入的内容 例如&#xff1a; mount c e:\masm c:

UEFI模拟环境搭建——windows+EDKII

目录 0 说明 1 安装软件 1.1 VS2019的安装 1.2 Python的安装 1.3 IASL的安装 1.4 NASM的安装 1.5 git的下载 2 EDKII的下载 3 配置环境 0 说明 个人感觉UEFI的环境搭建非常复杂&#xff0c;在经过很长一段折磨后&#xff0c;终于还是搭建成功&#xff0c;写下来记录一…

MS761比较器可兼容MAX9030

MS761/762 是一款低噪声&#xff0c;低输入失调电压的高精度比较器&#xff0c;输入失调电压室温下典型值为 200μV&#xff0c;整个温度范围内最大为 1mV。可兼容MAX9030。MS761 有关断脚可以关闭整个器件&#xff0c;减小电流消耗。 MS761/762 具有 CMOS 输入及推挽输出&…