深入源码分析kubernetes informer机制(二)Reflector


[阅读指南]
这是该系列第二篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。


文章目录

  • Reflector是什么
  • 整体结构
  • 工作流程
    • list拉取数据
    • 缓存resync操作
    • watch监听操作
  • 总结

Reflector是什么

reflector在informer中就像是一个对外的窗口,它与api-server建立连接,监听和获取来自api-server的资源变化信息,并把这些信息放进deltaFIFO中,交给下一个环节处理。

整体结构

与api-server进行交互,通过list获取指定的全量资源,watch监听指定的资源变化事件,并将这些事件放入delta FIFO队列中。
结构与交互如下图

// 省略了部分字段,只留下我们关注的
type Reflector struct {
    // name identifies this reflector. By default it will be a file:line if possible.
    name string

    // reflector对象需要监控的资源类型,比如上一节workqueue中的&v1.Pod{}
    expectedType reflect.Type
    
    // deltaFIFO 队列存储对象
    store Store
    
    // 实现list/watch
    listerWatcher ListerWatcher
    
    // 上次更新的资源版本号,用来判断当前的node的资源状况
    lastSyncResourceVersion string
    ......
}

工作流程

reflecter主函数比较简单,循环同步运行ListAndWatch直到收到stop信号。

func (r *Reflector) Run(stopCh <-chan struct{}) {
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
}

ListAndWatch主要做了这几件事:

  1. 通过stream或者chunk方式拉取全量list数据
  2. 开启一个协程进行缓存resync操作。
  3. 循环执行watch监听操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	...
    
	fallbackToList := !r.UseWatchList

    // stream式同步
	if r.UseWatchList {
		w, err = r.watchList(stopCh)
    	...
		if err != nil {
            ...
			fallbackToList = true
			w = nil
		}
	}

    // chunk式同步
	if fallbackToList {
		err = r.list(stopCh)
		if err != nil {
			return err
		}
	}

	...
	go r.startResync(stopCh, cancelCh, resyncerrc)
	return r.watch(w, stopCh, resyncerrc)
}

接下来咱一步步来看。

list拉取数据

ListAndWatch拉取全量数据时,出现了两种数据拉取的方式,list /watchstream list /watch

stream list是 kubernetes 1.27 引入的新方案,通过 ENABLE_CLIENT_GO_WATCH_LIST_ALPHA 变量可以启用stream list,默认会使用原有的list/watch。后续会单独开一篇介绍stream list方案,详情可以通过KEP-3157了解

前者在初始化时list拉取全量数据,通过watch更新增量变化。
后者可以通过watch 请求的方式获取list数据,从而减轻大规模集群初始化list数据时的资源消耗。
在建立watch连接时,携带如下两个参数即可告知服务器使用streaming list进行一致性读取。
sendInitialEvents=true
resourceVersionMatch=NotOlderThan

常规的list流程借用这个博主画的时序图来看下。
在这里插入图片描述

缓存resync操作

resync负责定期将本地的缓存重新加入deltaFIFO队列,确保本地缓存与controller的数据一致性。

国内太多博客没了解清楚就介绍这一部分是与api-server交互,进行relist。实际上resync完全没有涉及到服务端的部分,他就是一个本地缓存的同步机制。与服务端的交互使用list/watch已经完全可以确保资源一致性了,基本不怎么需要进行relist操作,并且对于节点非常多的大集群来说,list非常消耗资源,何况是定期relist呢。

关于resync机制的介绍,不在这里展开,详细看下一篇笔记。

watch监听操作

watch的实现非常巧妙,它利用了http的chunk编码传输机制建立长连接,来实现动态的数据监听,可以了解分块传输编码。
同样借用一张时序图来看下watch的流程
在这里插入图片描述

reflector通过Watcher监听api-server端的数据delta事件,并将这些事件放入deltaFIFO中统一处理。

// 在这里向服务端发起watch请求,并接收和处理资源变更事件
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
	...
    
	for {
        ...

        // w == nil表示使用常规的list/watch方式,streaming 方式会创建特殊的watcher
		if w == nil {
			timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
			options := metav1.ListOptions{
                // 上次同步的资源版本,也就是本地的资源版本。以此来获取增量的数据
				ResourceVersion: r.LastSyncResourceVersion(),
                // watch 超时时间,长时间没有接受任务事件的watcher会被关掉,避免长时间挂起。
				TimeoutSeconds: &timeoutSeconds,
                // watch书签,避免watch重启时请求api-server导致的消耗。
				AllowWatchBookmarks: true,
			}
        	// 创建一个watch对象,监听api-server的资源变更事件,将接收到的事件丢进resultChan中
			w, err = r.listerWatcher.Watch(options)
        	...
		}

        // 将resultChan中的取出放入FIFO 队列
		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
		
        // 失败重试逻辑
        ...
		
	}
}

建立连接的逻辑在这一行
w, err = r.listerWatcher.Watch(options)

还是用上一篇workqueue来看看这个Watch实例的实现。
从Watch函数一路往上追溯,可以看到先是与server建立了http连接,再通过watch标记建立了watch连接,创建stream watcher对象,并拉起一个协程去处理监听到的事件信息。

  • 此后所有监听的delta事件都会经过receive协程进入到resultChan中。
// reflector调用的watch函数
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
	return lw.WatchFunc(options)
}

// watchFunc函数的定义
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
	...
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true // 向服务端请求chunk连接
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch(context.TODO()) // 这里调用了getter的watch函数
                                    // getter是controller初始化时建立的http客户端: clientset.CoreV1().RESTClient()
	}
	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}


func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
    ...	
    
	url := r.URL().String()
	for {
		req, err := r.newHTTPRequest(ctx)

		resp, err := client.Do(req)
		if err == nil && resp.StatusCode == http.StatusOK {
			return r.newStreamWatcher(resp)
		}
		...
	}
}

func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
	sw := &StreamWatcher{
		source:   d,
		reporter: r,
		result: make(chan Event),
		done: make(chan struct{}),
	}
	go sw.receive() // 处理消息事件的协程
	return sw
}

// 解析接收到的事件,并放到resultChan中等待后续处理。
func (sw *StreamWatcher) receive() {
	for {
        // 解析数据
		action, obj, err := sw.source.Decode()
		select {
		case <-sw.done:
			return
        // 将事件发送到resultChan
		case sw.result <- Event{
			Type:   action,
			Object: obj,
		}:
		}
	}
}
  • 进入resultChan的事件,由watchHandler取出再分类添加到FIFO队列中。
func watchHandler(start time.Time,
	w watch.Interface,	// watch实例
	store Store, // 存储对象 比如delta FIFO queue
	...
) error {
	...
loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err

        // 从ResultChan中取出变更事件,并放进队列中,比如delta FIFO队列中
		case event, ok := <-w.ResultChan():
			// 省略了一些资源过滤和错误处理
            ...

            // 解析监听到的事件数据
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
				continue
			}

            // 解析资源事件的版本
			resourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added: 
				err := store.Add(event.Object) // 往队列添加add delta事件
            	... // err handle
			case watch.Modified: 
				err := store.Update(event.Object) // 往队列添加update delta事件
				... // err handle
			case watch.Deleted:	
				err := store.Delete(event.Object) // 往队列添加delete delta事件,在此之前会判断事件对应的资源对象是否存在
				... // err handle
			case watch.Bookmark:
            	...
			default:
				... // err handle
			}
            
            // 更新resourceVersion版本号,下一轮watch就不会再收到重复的更新事件
			setLastSyncResourceVersion(resourceVersion)
			if rvu, ok := store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(resourceVersion)
			}
            
			...
		}
	}

    ...
	return nil
}

总结

用一个图来回顾下reflector各个模块的关系~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/83503.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

爬虫逆向实战(七)--猿人学第十六题

一、数据接口分析 主页地址&#xff1a;猿人学第十六题 1、抓包 通过抓包可以发现数据接口是api/match/16 2、判断是否有加密参数 请求参数是否加密&#xff1f; 通过查看“载荷”模块可以看出m是加密参数 请求头是否加密&#xff1f; 无响应是否加密&#xff1f; 无cook…

云聊天项目测试

前言 以下将对云聊天项目编写测试用例以及主要功能的自动化测试。 1. 测试用例的编写 2. 自动化测试 以下进行部分自动化测试用例的执行&#xff0c;检验项目功能是否符合预期。 2.1 登录功能测试 测试代码&#xff1a; 输入非法用户名或密码逻辑相似&#xff0c;不重复描…

安防监控视频汇聚平台EasyCVR视频平台调用iframe地址无法播放的问题解决方案

安防监控视频汇聚平台EasyCVR基于云边端一体化架构&#xff0c;具有强大的数据接入、处理及分发能力&#xff0c;可提供视频监控直播、云端录像、视频云存储、视频集中存储、视频存储磁盘阵列、录像检索与回看、智能告警、平台级联、云台控制、语音对讲、AI算法中台智能分析无缝…

冷冻冷藏自动化立体库|HEGERLS四向穿梭车助力打造冷链智能仓储新力量

随着中国仓储物流整体规模和低温产品消费需求的稳步增长&#xff0c;冷链市场应用潜力不断释放。而在实际运行中&#xff0c;由于冷库容量不足、基础设施落后、管理机制欠缺等原因&#xff0c;经常出现“断链”现象&#xff0c;严重威胁到产品质量和消费者安全。 河北沃克金属…

React Native expo项目修改应用程序名称

https://expo.dev/accounts/xutongbao/projects npm install --global eas-cli && \eas init --id e32cf2c0-da5b-4a65-814a-4958d58f0ca7 eas init --id e32cf2c0-da5b-4a65-814a-4958d58f0ca7 app.config.js: export default {name: 学习,slug: learn-gpt,owner: x…

Comparable和Comparator区别

Comparable和Comparator接口都是实现集合中元素的比较、排序的&#xff0c;众所周知&#xff0c;诸如Integer&#xff0c;double等基本数据类型&#xff0c;java可以对他们进行比较&#xff0c;而对于类的比较&#xff0c;需要人工定义比较用到的字段比较逻辑。总体来讲&#x…

【微服务技术二】Feign、Gateway(路由、过滤器、跨域)的初步认知

微服务技术二 五、Feign远程调用Feign替代RestTemplate自定义Feign配置方式一&#xff1a;配置文件方式&#xff1a;方式二&#xff1a;java代码方式 Feign性能优化Feign的最佳实践* 六、Gateway网关搭建网关服务路由断言工厂Route Predicate Factory路由过滤器 GatewayFilter默…

负载均衡下的 WebShell 连接

目录 负载均衡简介负载均衡的分类网络通信分类 负载均衡下的 WebShell 连接场景描述难点介绍解决方法**Plan A** **关掉其中一台机器**&#xff08;作死&#xff09;**Plan B** **执行前先判断要不要执行****Plan C** 在Web 层做一次 HTTP 流量转发 &#xff08;重点&#xff0…

排名前 6 位的数学编程语言

0 说明 任何对数学感兴趣或计划学习数学的人&#xff0c;都应该至少对编程语言有一定的流利程度。您不仅会更有就业能力&#xff0c;还可以更深入地理解和探索数学。那么你应该学习什么语言呢&#xff1f; 1.python 对于任何正在学习数学的人来说&#xff0c;Python都是一门很棒…

vue所有UI库通用)tree-select 下拉多选(设置 maxTagPlaceholder 隐藏 tag 时显示的内容,支持鼠标悬浮展示更多

如果可以实现记得点赞分享&#xff0c;谢谢老铁&#xff5e; 1.需求描述 引用的下拉树形结构支持多选&#xff0c;限制选中tag的个数&#xff0c;且超过制定个数&#xff0c;鼠标悬浮展示更多已选中。 2.先看下效果图 3.实现思路 首先根据API文档&#xff0c;先设置maxTagC…

LVS-DR+keepalived实现高可用负载群集

VRRP 通信原理&#xff1a; VRRP就是虚拟路由冗余协议&#xff0c;它的出现就是为了解决静态路由的单点故障。 VRRP是通过一种竞选的一种协议机制&#xff0c;来将路由交给某台VRRP路由。 VRRP用IP多播的方式&#xff08;多播地址224.0.0.18&#xff09;来实现高可用的通信&…

Redis中的有序集合

前言 本文着重介绍Redis中的有序集合的底层实现中的跳表 有序集合 Sorted Set Redis中的Sorted Set 是一个有序的无重复值的集合&#xff0c;他底层是使用压缩列表和跳表实现的&#xff0c;和Java中的HashMap底层数据结构&#xff08;1.8&#xff09;链表红黑树异曲同工之妙…

记一次mysql8 在linux上安装全过程

参照MYSQL官网官方文档安装 1、mysql官网 mysql官网 2、直接进入文档页 找到安装文档 3、找到自己系统对应的安装文档&#xff0c;选合适的安装方式&#xff0c;我这里使用的是YUM方式 a、开始安装之前需要替换yum仓库 具体步骤如下 b、将下载的文件上传至自己的服务器 如下…

什么是原型(prototype)和原型链(prototype chain)?如何继承一个对象的属性和方法?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 原型&#xff08;Prototype&#xff09;和原型链&#xff08;Prototype Chain&#xff09;⭐ 原型&#xff08;Prototype&#xff09;⭐ 原型链&#xff08;Prototype Chain&#xff09;⭐ 继承属性和方法⭐ 写在最后 ⭐ 专栏简介 前端入…

shell脚本之循环语句

循环语句 循环含义 将某代码段重复运行多次&#xff0c;通常有进入循环的条件和退出循环的条件 for循环语句 一般知道循环次数使用for循环 第一类 格式1&#xff1a; for名称 in 取值次数;do;done; 格式2&#xff1a; for 名称 in {取值列表} do done# 打印20次 for i i…

从 Ansible Galaxy 使用角色

从 Ansible Galaxy 使用角色 根据下列要求&#xff0c;创建一个名为 /home/curtis/ansible/roles.yml 的 playbook &#xff1a; playbook 中包含一个 play&#xff0c; 该 play 在 balancers 主机组中的主机上运行并将使用 balancer 角色。 此角色配置一项服务&#xff0c;以…

十、flume的安装

1.解压 2.改名 3.修改权限 4.编辑环境变量并source export FLUME_HOME/usr/local/flume export PATH$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HBASE_HOME/bin:$SQOOP_HOME/bin:$PIG_HOME/bin:$FLUME_HOME/bin 5.配置 6.查看版本 7.启动Hadoo…

LLM提示词工程和提示词工程师Prompting and prompt engineering

你输入模型的文本被称为提示&#xff0c;生成文本的行为被称为推断&#xff0c;输出文本被称为完成。用于提示的文本或可用的内存的全部量被称为上下文窗口。尽管这里的示例显示模型表现良好&#xff0c;但你经常会遇到模型在第一次尝试时无法产生你想要的结果的情况。你可能需…

31.Netty源码之客户端启动流程

highlight: arduino-light 客户端启动主要流程 如果看了服务器端的启动流程&#xff0c;这里简单看下就可以了。 java package io.netty.server; ​ import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import …

SHELL 基础 显示字符颜色, 修改历史命令,Linux里的命令 执行顺序

echo 打印命令 &#xff1a; 显示字符串 &#xff1a; [rootserver ~]# echo this is SHELL language this is SHELL language [rootserver ~]# echo this is SHELL language this is SHELL language [rootserver ~]# echo "this is SHELL language" this is SH…