深入源码分析kubernetes informer机制(四)DeltaFIFO


[阅读指南]
这是该系列第四篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。


文章目录

  • client-go中的存储结构
  • DeltaFIFO
    • delta
    • 索引 key
    • queue push操作
      • delta push 去重
    • queue pop操作
  • 总结

client-go中的存储结构

如下图,clinet-go中定义了存储类型接口store,用来提供存储对象的基本能力。

queue继承了store接口,并提供了队列的能力,队列中可以保存需要增删改的存储对象的key,它会取出队头元素,调用PopProcessFunc处理。

queue的实现有两个:FIFOdeltaFIFO

deltaFIFO的不同点在于,deltaFIFO队列中,key对应的不是对象本身,而是对象的delta。

另外deltaFIFO除了通过add、update、delete添加元素,还有两种特殊的方式:replaced和sync。replaced一般发生在资源版本更新时,而sync由resync定时发起。

DeltaFIFO

下面是deltaFIFO数据结构的定义

type DeltaFIFO struct {
    // 并发读写锁
	lock sync.RWMutex
	cond sync.Cond

    // `items` maps a key to a Deltas.
    // 资源对象的key与对应的delta数组,每个数组至少都会有一个delta
	items map[string]Deltas

	// 按照FIFO队列顺序存储key,用来给pop()消费。
    // 该数组不会有重复值,并且所有元素都一定在items中
	queue []string

	// 生成key值的函数,默认是 MetaNamespaceKeyFunc
	keyFunc KeyFunc

    // 本地缓存中已知的所有资源对象的key
	knownObjects KeyListerGetter

	......
}

delta

如前面所说,deltaFIFO中key映射的不是对象本身,是delta数组。

根据Delta数据结构的定义,delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据,大部分场景下只会有变更的部分信息。

type Delta struct {
	Type   DeltaType
	Object interface{}
}

type DeltaType string
const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	Replaced DeltaType = "Replaced"
	Sync DeltaType = "Sync"
)

举个栗子,本地已经有了一个pod对象,

&Pod{
    Name:      "mypod",
    Namespace: "default",
    Labels:    map[string]string{"app": "web", "version": "0.0.1"},
}

此时mypod的 lable从web变成了app-server,reflector就会创建一个这样的delta对象放入FIFO队列中。

&Delta{
    Type:   "Updated",
    Object: &Pod{
            Name:      "mypod",
            Namespace: "default",
            Labels:    map[string]string{"app": "app-server"},
        },
}

索引 key

deltaFIFO队列中,存储的是delta的key值,通过key值可以在items map中获取到对应的delta对象。

这个key值在初始化FIFO时通过KeyFunction进行定义,使用者没有指定时,都会使用自带的命名函数 MetaNamespaceKeyFunc 进行命名,命名规则是

  • namespace不为空,key为/
  • namespace为空,key为

这里的name是在yaml资源配置中的matadata.name,比如上面的mypod。在同一个资源下,name在所有api version都一定是唯一的。

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	meta, err := meta.Accessor(obj)

	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	return meta.GetName(), nil
}

queue push操作

watcher监控的资源变更时,会调用deltaFIFO中Added、Updated、Deleted、Replaced、Sync方法,最终它们都会通过queueActionLocked 方法往deltaFIFO队列中加入对应类型的delta对象。

queueActionLocked 也就是deltaFIFO的入队操作。

和一般的入队不同的是,新加入的delta不是直接加入到队尾,队列queue数组中保存的是delta的key。所以入队的操作是这样的

  1. 获取delta对应的key值(还记得keyfunc吗,又是它)
  2. 如果delta所属的资源key已经在队列中,直接将delta添加到key对应到deltas数组末尾。更新已存在的资源delta并不会影响他的key在队列中的位置。
  3. 如果delta所属的资源key不在队列中,就将key添加到队列末尾,并在items中关联key和delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)

	// 自定义的转换函数。可以在delta事件被处理之前完成一些预处理
    // 常见的用法是用来过滤一些处理程序不关注的资源对象、以及处理数据格式等
	if f.transformer != nil {
		obj, err = f.transformer(obj)
	}

    // 将新的delta放入资源key对应的delta数组末尾
    // 如果原本的key不存在,就是创建了一个新的数组,并将新的delta放入其中
	oldDeltas := f.items[id]
	newDeltas := append(oldDeltas, Delta{actionType, obj})

    // 对delta数组中的delta去重
	newDeltas = dedupDeltas(newDeltas)

    // 判断key是否已经在队列中,并且更新key对应的delta数组
	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
    }
    
	return nil
}

delta push 去重

上一节提到,delta进行push操作时,会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效:当delta数组中倒数第一个和第二个delta都是delete类型时,将会去掉其中一个

func dedupDeltas(deltas Deltas) Deltas {
	n := len(deltas)
	if n < 2 {
		return deltas
	}
	a := &deltas[n-1]
	b := &deltas[n-2]
	if out := isDup(a, b); out != nil {
		deltas[n-2] = *out
		return deltas[:n-1]
	}
	return deltas
}

// 判断a、b两个delta是否重复
// 目前暂时只有两个delete类型的delta会被判定为重复。
func isDup(a, b *Delta) *Delta {
	if out := isDeletionDup(a, b); out != nil {
		return out
	}
	return nil
}

// 判定两个delta是否都是deleted类型
func isDeletionDup(a, b *Delta) *Delta {
	if b.Type != Deleted || a.Type != Deleted {
		return nil
	}
    
	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
		return a
	}
	return b
}

举个小小的例子来回顾一下delta push操作。假设queue中有3个pod对象,对应了不同的变更事件,如下所示。

此时watcher监听到资源发生变化:

  1. pod2收到了updated事件
  2. pod1收到了deleted事件
  3. pod3收到了deleted事件

于是,三个delta入队成功后的队列图如下

pod1已有一个deleted事件,再次收到deleted后,经过dedupDeltas去重,最终只保留一个deleted。

pod3虽然有两个deleted事件,但是他们并不是连续的事件,不会被去重

queue pop操作

deltaFIFO出队的操作和普通的队列出队类似,从队头取出一个资源对象key,并删除items中key对应的deltas数组。

pop出队时,会调用传参PopProcessFunc对出队元素进行处理。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
            // 队列为空时阻塞
			if f.closed {
				return nil, ErrFIFOClosed
			}
			f.cond.Wait()
		}

        // 取出队首的资源对象key
		id := f.queue[0]
		f.queue = f.queue[1:]

        // 获取key对应的deltas数组
		item, ok := f.items[id]

        // 执行pop处理函数,处理delta事件,如果处理失败了,资源对象会被重新加入到队列中。
        // 但是如果队列中存在相同的对象,资源对象会被丢弃。
		err := process(item, isInInitialList)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}

		return item, err
	}
}

这里一开始有个小疑问,如果资源的delta处理失败了,并且队列中又出现了同样的资源key,这部分delta数据不就丢失了吗?

但是仔细看出队入队公用一个锁,pop处理对象时不会有新的对象入队,所以理论上不会出现在addIfNotPresent时,key是persent的情况。而deltaFIFO入队的逻辑,也不会存在一个队列有两个相同的key的情况,所以不会有丢失的问题,addIfNotPresent应该只是加多一层保障。如果理解有问题,欢迎大佬们指正。

回顾一下pop的调用方processLoop,调用pop时传入PopProcessFunc(c.config.Process))。

系列第一篇介绍informer时提到过,c.config.Process最终调用的是processDeltas函数,它包含了数据同步到存储,以及调用注册的用户函数两个操作。

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			...
		}
	}
}

// 数据处理函数
func processDeltas(
	handler ResourceEventHandler,
	clientState Store,
	deltas Deltas,
	isInInitialList bool,
) error {
	// from oldest to newest
	for _, d := range deltas {
		obj := d.Object

        // 区分事件类型进行处理
		switch d.Type {
		case Sync, Replaced, Added, Updated:
            // 同步存储数据
			if old, exists, err := clientState.Get(obj); err == nil && exists {
				if err := clientState.Update(obj); err != nil {
					return err
				}
                // 回调用户函数
				handler.OnUpdate(old, obj)
			} else {
                // 同步存储数据
				if err := clientState.Add(obj); err != nil {
					return err
				}
                // 回调用户函数
				handler.OnAdd(obj, isInInitialList)
			}
		case Deleted:
            // 同步存储数据
			if err := clientState.Delete(obj); err != nil {
				return err
			}
            // 回调用户函数
			handler.OnDelete(obj)
		}
	}
	return nil
}

总结

还是用上一节的例子,小结回顾一下整体的流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/79561.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CCLINK转MODBUS-TCP网关cclink通讯接线图 终端电阻

大家好&#xff0c;今天我们要聊的是生产管理系统中的CCLINK和MODBUS-TCP协议&#xff0c;它们的不同使得数据互通比较困难&#xff0c;但捷米JM-CCLK-TCP网关的出现改变了这一切。 1捷米JM-CCLK-TCP是一款自主研发的CCLINK从站功能的通讯网关&#xff0c;它的主要功能是将各种…

PS丢失d3dcompiler_47.dll文件怎么办(附详细修复方法)

我们在安装PS等软件的时候&#xff0c;有可能安装完之后出现以下问题&#xff08;特别是win10或者win11系统&#xff09; 错误&#xff1a; 打开PS的时候出现这个错误&#xff1a;无法启动此程序&#xff0c;因为计算机中丢失D3DCOMPILER_47.dll。尝试重新安装该程序以解决此问…

[Go版]算法通关村第十一关青铜——理解位运算的规则

目录 数字在计算机中的表示&#xff1a;机器数、真值对机器数进一步细化&#xff1a;原码、反码、补码为何会有原码、反码和补码为何计算机中的按位运算使用的是补码&#xff1f;位运算规则与、或、异或和取反移位运算移位运算与乘除法的关系位运算常用技巧⭐️ 操作某个位的数…

20W IP网络吸顶喇叭 POE供电吸顶喇叭

SV-29852T 20W IP网络吸顶喇叭产品简介 产品用途&#xff1a; ◆室内豪华型吸顶喇叭一体化网络音频解码扬声器&#xff0c;用于广播分区音频解码、声音还原作用 ◆应用场地如火车站、地铁、教堂、工厂、仓库、公园停车场等&#xff1b;室内使用效果均佳。 产品特点&#xff…

linux——MongoDB服务

目录 一、MongoDB概述 相关概念 特性 应用场景 二、目录结构 三、默认数据库 admin local config 四、数据库操作 库操作 文档操作 五、MongoDB数据库备份 一、备份 mongodump mongoexport 二、恢复 mongorestore mongoimport ​编辑 一、MongoDB概述 MongoDB是…

04-微信小程序常用组件-基础组件

04-微信小程序常用组件-基础组件 文章目录 基础内容icon 图标案例代码 text 文本案例代码 progress 进度条案例代码 微信小程序包含了六大组件&#xff1a; 视图容器、 基础内容、 导航、 表单、 互动和 导航。这些组件可以通过WXML和WXSS进行布局和样式设置&#xff0c…

数据库相关面试题

巩固基础&#xff0c;砥砺前行 。 只有不断重复&#xff0c;才能做到超越自己。 能坚持把简单的事情做到极致&#xff0c;也是不容易的。 mysql怎么优化 : MySQL的优化可以从以下几个方面入手&#xff1a; 数据库设计优化&#xff1a;合理设计表结构&#xff0c;选择合适的数…

Windows下问题定位

1、内存相关知识点&#xff1b; 1&#xff09;windows下32位进程&#xff0c;用户态为2G内存&#xff0c;内核态也为2G内存&#xff1b;却别于linux操作系统&#xff1b; 备注&#xff1a;可以通过命令行与管理员权限&#xff0c;启动3G的用户态空间&#xff0c;但是部…

数据结构的树存储结构

数据结构的树存储结构 之前介绍的所有的数据结构都是线性存储结构。本章所介绍的树结构是一种非线性存储结构&#xff0c;存储的是具有“一对多”关系的数据元素的集合。 (A) (B) 图 1 树的示例 图 …

同伦问题与同伦算法

同伦问题 据我所知&#xff0c;这篇博客是CSDN上少数几篇讲同伦算法的博客之一考虑同伦算法的目的 扩大初值选取范围解决非线性代数方程组的全部解计算问题 同伦算法中的基本概念 考虑求的解人为地引入参数t,构造一个函数族使得 同时假设的解已知&#xff0c;从出发可以求解对…

3 Python的数据类型

概述 在上一节&#xff0c;我们介绍了Python的基础语法&#xff0c;包括&#xff1a;编码格式、标识符、关键字、注释、多行、空行、缩进、引号、输入输出、import、运算符、条件控制、循环等内容。Python是一种动态类型的编程语言&#xff0c;这意味着当你创建一个变量时&…

星际争霸之小霸王之小蜜蜂(三)--重构模块

目录 前言 一、为什么要重构模块 二、创建game_functions 三、创建update_screen() 四、修改alien_invasion模块 五、课后思考 总结 前言 前两天我们已经成功创建了窗口&#xff0c;并将小蜜蜂放在窗口的最下方中间位置&#xff0c;本来以为今天将学习控制小蜜蜂&#xff0c;结…

<CodeGeeX>基于大模型的全能AI编程助手

CodeGeex官网 智谱AI官网 CodeGeex是由清华大学 KEG 实验室和智谱 AI 公司于2023共同训练的代码生成模型 CodeGeeX 开发的AI助手。它基于深度学习技术&#xff0c;能够针对用户的问题和要求提供适当的答复和支持。CodeGeex的功能包括代码生成、自动添加注释、代码翻译以及智能问…

php base64转图片保存本地

调用函数 public function base64(){$img $this->request->param(img);$img …

Android开发之性能优化:过渡绘制解决方案

1. 过渡绘制 屏幕上某一像素点在一帧中被重复绘制多次&#xff0c;就是过渡绘制。 下图中多个卡片跌在一起&#xff0c;但是只有第一个卡片是完全可见的。背后的卡片只有部分可见。但是Android系统在绘制时会将下层的卡片进行绘制&#xff0c;接着再将上层的卡片进行绘制。但其…

前端跨域问题解决方法

跨域是WEB浏览器专有的同源限制访问策略。(后台接口调用和postman等工具会出现) 跨源资源共享&#xff08;CORS&#xff0c;或通俗地译为跨域资源共享&#xff09;是一种基于 HTTP 头的机制&#xff0c;该机制通过允许服务器标示除了它自己以外的其他源&#xff08;域、协议或端…

【eNSP】交换机(vlan和vlan间通信)

【eNSP】交换机&#xff08;vlan和vlan间通信&#xff09; 原理术语过程 实验根据图片连接模块配置设备名称和IP地址配置交换机交换机链路指定sw1配置sw2配置 设置网关交换机互联实验设置查看设置结果 ospf配置 原理 HUB集线器&#xff1a;它的作用可以简单的理解为将一些机器…

智能工厂:适应不断变化的制造世界

制造业已经从过去传统的装配线工艺流程中走了很长一段路。随着技术的进步和工业 4.0 的兴起&#xff0c;制造业正在迅速发展&#xff0c;以满足现代世界不断变化的需求。近年来出现的一个关键概念就是“智能工厂”。在这篇文章中&#xff0c;我们将探讨什么是智能工厂、它是如何…

利用Opencv实现人像迁移

前言&#xff1a; Hello大家好&#xff0c;我是Dream。 今天来学习一下如何使用Opencv实现人像迁移&#xff0c;欢迎大家一起参与探讨交流~ 本文目录&#xff1a; 一、实验要求二、实验环境三、实验原理及操作1.照片准备2.图像增强3.实现美颜功能4.背景虚化5.图像二值化处理6.人…

【Vue3】Vue3 UI 框架 | Element Plus —— 创建并优化表单

安装 # NPM $ npm install element-plus --save // 或者&#xff08;下载慢切换国内镜像&#xff09; $ npm install element-plus -S// 可以选择性安装 less npm install less less-loader -D // 可以选择性配置 自动联想src目录Element Plus 的引入和注入 main.ts import…