写在之前
Kubernetes的Informer机制是一种用于监控资源对象变化的机制。它提供了一种简化开发者编写控制器的方式,允许控制器能够及时感知并响应 Kubernetes 集群中资源对象的变化。Informer通过与Kubernetes API服务器进行交互,通过监听API服务器上资源对象的修改事件来实现实时的资源对象状态更新。当一个资源对象被创建、更新或删除时,Informer会收到相应的通知,并在内部维护一个缓存用于存储最新的资源对象状态。同时,Informer还能够为特定类型的资源对象设置过滤器以进行更精细的事件监听。使用Informer机制,开发者可以通过注册事件处理函数来定义对资源对象变化的响应逻辑,从而实现自定义的控制器逻辑。在运行时,Informer会定期向API服务器发起请求以获取最新的资源对象的状态,从而保证缓存中的数据与实际集群的状态保持同步。话不多说,直接进入到源码的解析环节。
1.源码阅读环境搭建
informer的源码是在client-go工程中的,需要从仓库clone完整的代码到本地。首先在GoPath路径下创建文件夹k8s.io,进入到该文件夹下,执行以下步骤。
git clone https://github.com/kubernetes/client-go.git
源码的阅读不依赖环境和操作系统,但是需要提供一个k8s集群的环境,提前将集群的kubeconfig保存在本地,以供后文中调试源码使用。
值得注意的是,笔者看的是master分支的代码,其中的go mod依赖的golang版本是1.21,建议大家选择适合自己的版本分支阅读,否则可能会出现编译报错的情况。
2. 一个informer的简单用例
将下面的代码放到kubernetes_test目录下,以方便调试源码
package kubernetes
import (
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"log"
"testing"
"time"
metav1 "k8s.io/api/apps/v1"
)
func getKubernetesClient(kubeconfig string) (*kubernetes.Clientset, error) {
// 配置文件路径,根据你的实际情况修改
// 使用指定的 kubeconfig 文件创建一个 Config 对象
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
//config.Insecure = true
// 创建客户端集
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
}
// 展示informer的核心用法
func TestInformer(t *testing.T) {
client, err := getKubernetesClient("XXX这里是集群kubeconfig")
if err != nil {
log.Fatal(err.Error())
}
// 初始化 informer factory(为了测试方便这里设置每30s重新 List 一次)
informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
// 对 Deployment 监听
deployInformer := informerFactory.Apps().V1().Deployments()
// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)
informer := deployInformer.Informer()
// 创建 Lister
deployLister := deployInformer.Lister()
// 注册事件处理程序
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
stopper := make(chan struct{})
defer close(stopper)
// 启动 informer,List & Watch
informerFactory.Start(stopper)
// 等待所有启动的 Informer 的缓存被同步
informerFactory.WaitForCacheSync(stopper)
// 从本地缓存中获取 default 中的所有 deployment 列表
deployments, err := deployLister.Deployments("default").List(labels.Everything())
if err != nil {
panic(err)
}
for idx, deploy := range deployments {
fmt.Printf("%d -> %s\n", idx+1, deploy.Name)
}
<-stopper
}
func onAdd(obj interface{}) {
deploy := obj.(*metav1.Deployment)
fmt.Println("add a deployment:", deploy.Name)
}
func onUpdate(old, new interface{}) {
oldDeploy := old.(*metav1.Deployment)
newDeploy := new.(*metav1.Deployment)
fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)
}
func onDelete(obj interface{}) {
deploy := obj.(*metav1.Deployment)
fmt.Println("delete a deployment:", deploy.Name)
}
简单mark一下源码分析的链路,建议大家还是一步一步使用编辑器的debug模式来阅读。
3.2. informers.NewSharedInformerFactory(client, time.Second*30)
初始化一个SharedInformerFactory,传入了一个定时30进行数据同步的配置项。核心代码如下:
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
//这一步执行,在我们追踪的链路中,options为空
factory = opt(factory)
}
return factory
}
4.deployInformer := informerFactory.Apps().V1().Deployments()
这一步是deployInformer的初始化方式。我们展开步骤2中的初始化的SharedInformerFactory结构体会发现,下面的属性字段core,policy,apps似乎是将k8s原生的资源分组进行了类别、版本的抽象。类如通过.Apps().V1().Deployments(),其实获取的就是deploymentInformer的一个实例对象:
// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
看一下deploymentInformer这个结构体的定义,这里的factory就是在informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)初始化的factory,之所以关注这个factory的实例,是因为在后文中还有提及。
type deploymentInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
5.informer := deployInformer.Informer()
这里调用了informerfactory结构InformerFor的方法,我们上一节已经分析过了factory的实例是什么了,我们追踪到方法中看一下这个方法是在所做什么
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
// 注册Deployment的informer到factory中
return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
现在继续追踪informerFor源码,这里看起来是将deployment的cache.SharedIndexInformer注册到factory里的缓存中去。
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
// f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
// 1.f.informers: informers map[reflect.Type]cache.SharedIndexInformer 这里缓存不同k8s资源的informer对象
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 2.customResync map[reflect.Type]time.Duration 缓存了资源的resync时间
informer = newFunc(f.client, resyncPeriod)
informer.SetTransform(f.transform)
f.informers[informerType] = informer
return informer
}
6.deployLister := deployInformer.Lister()
返回lister实例,这个listener主要是用来从内存中获取资源对象详细信息的客户端,类比于数据库查询客户端。
我们看一下f.Informer().GetIndexer()函数是什么,返回了一个Index结构体的实现类。
func (s *sharedIndexInformer) GetIndexer() Indexer {
return s.indexer
}
// 这个类就是在集群中的类别是deployment的所有object缓存存放的结构体
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexers return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
AddIndexers(newIndexers Indexers) error
}
7.informer.AddEventHandler
注册事件处理回调函数,下面是核心逻辑实现的具体细节代码。
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
.....
//6.1这个是一个事件处理器的生成函数
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
if !s.started {
//6.2首次初始化的时候就started是false的,进入到这个逻辑里
return s.processor.addListener(listener), nil
}
.....
}
7.1 newProcessListener
processListener实例化的方法,该实例将消息事件路由到不同的ResourceEventHandler方法中处理。它包含两个 goroutine、两个无缓冲通道和一个无界环形缓冲区区。 add(notification)
函数将给定的通知事件发送到 addCh
。一个 goroutine 运行“pop()”,它使用环形缓冲区中的存储将通知从“addCh”传递到“nextCh”。另一个 goroutine 运行 run(),它接收来自 nextCh 的通知并同步调用适当的处理程序方法。
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
ret := &processorListener{
// 这里是事件处理channel通道
nextCh: make(chan interface{}),
// 这里是watch到k8s资源变动的接收通道
addCh: make(chan interface{}),
// 这里就是传递的handler事件处理函数
handler: handler,
// 这个结构体是一个线程安全的计数器
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
// 无界环,个人理解主要是用来在生产、消费者模型中速率不一致时进行缓冲的一个存储
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
// 生成下一次resync的时间并保存下来
ret.determineNextResync(now)
return ret
}
7.2 processor.addListener(listener)
这里的processor是sharedProcessor实例化的结果,实例化的过程放在
defaultInformer这个函数中,在前文informerFor中由提及,我们把这个函数展开,看一下这个processor是怎么实例化的。
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
//初始化listwatch
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
},
},
&appsv1.Deployment{},
resyncPeriod,
indexers,
)
}
//中间还有一跳,我们省略了,直接找到process的实例化代码
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
realClock := &clock.RealClock{}
return &sharedIndexInformer{
// indexre的初始化逻辑在这里
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
//sharedProcessor初始的位置
processor: &sharedProcessor{clock: realClock},
listerWatcher: lw,
objectType: exampleObject,
objectDescription: options.ObjectDescription,
resyncCheckPeriod: options.ResyncPeriod,
defaultEventHandlerResyncPeriod: options.ResyncPeriod,
clock: realClock,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
}
}
addListener主要做了两件事情
1)listeners map[*processorListener]bool,是shardProcessor中的一个缓存,主要是保存定义的processorListener
2)如果p.listenersStarted 是true的话,这里会启动两个线程,一个线程运行listener.run,定时从nextCh这个channel中获取事件进行处理
一个线程运行listener.pop,定时将addCh这个channel中的事件转移到nextCh这个channel中去,两者之间是通过一个无界环状结构进行传递的
但是,按照前文的追踪逻辑这里的p.listenersStarted是false, p.wg.Start(listener.run和p.wg.Start(listener.pop)这两个典型的生产-消费者逻辑没有执行。
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
if p.listeners == nil {
p.listeners = make(map[*processorListener]bool)
}
p.listeners[listener] = true
//这里貌似没进来
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
return listener
}
8.informerFactory.Start(stopper)
这一步是启动factory的核心逻辑,前文中主要是进行了一些结构体的定义及初始化,并没有任何执行逻辑存在。戏台子搭好了,这一个步骤应该是核心的执行逻辑了吧。核心代码在这里,主要是把所有注册的informer启动起来。
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
if f.shuttingDown {
return
}
// 上文已经提到了,informer主要是保存k8s的资源的类runtimeObject和Informer之间的映射关系的缓存
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
// We need a new variable in each loop iteration,
// otherwise the goroutine would use the loop variable
// and that keeps changing.
informer := informer
go func() {
defer f.wg.Done()
//这里另起协程把factory中全部的informer启动起来
//8.1 在这里启动indexInformer
informer.Run(stopCh)
}()
f.startedInformers[informerType] = true
}
}
}
8.1 informer.Run(stopCh)
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
....
// s.controller = New(cfg),就是构建controller这个结构体,把shardInformer的start标识修改成true
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
//7.1.1 搞了一个队列
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
//1)构建了一个配置项
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
ObjectDescription: s.objectDescription,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
//2)创建controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
//这个不知道做了什么事情,用到的时候再说
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
//3)启动controller这里是真正启动listeners的地方
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
//4) 启动controller
s.controller.Run(stopCh)
}
8.1.1 wg.StartWithChannel(processorStopCh, s.processor.run)
这里启动了一个协程用来执行processor.run方法,我们进到这个函数中看一下这个函数做了些什么事情.
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// p.listener还记得是做什么用的吗,就是保存所有的listener的缓存
for listener := range p.listeners {
// 启动listener,逻辑在上文已经提及
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
// 后续都是中止逻辑了
...
}
8.1.2 s.controller.Run(stopCh),controller
1)构建reflector
2)启动reflector
3)启动循环处理逻辑
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1. 初始化reflect来同步服务器和store的object
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
TypeDescription: c.config.ObjectDescription,
Clock: c.clock,
},
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
// 对reflector赋值
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
// 2.启动reflector
wg.StartWithChannel(stopCh, r.Run)
// 3.循环处理队列中的元素
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
我们来看一下reflector启动后在做些什么事情。记住一句话就好:首先从k8s的apiserver中利用list的方式获取指定资源的全部数据,并塞到deltaqueue中。后续通过watch的方式,监听资源的变动信息并同步到队列中国。
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
// 核心逻辑入口
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error
var w watch.Interface
fallbackToList := !r.UseWatchList
//这里看起来不会落到这个执行逻辑中
if r.UseWatchList {
// 维持本地的资源缓存
w, err = r.watchList(stopCh)
if w == nil && err == nil {
// stopCh was closed
return nil
}
if err != nil {
klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
fallbackToList = true
// ensure that we won't accidentally pass some garbage down the watch.
w = nil
}
}
if fallbackToList {
// 直接借用listwatcher的list方法,从服务器全量同步资源数据
err = r.list(stopCh)
if err != nil {
return err
}
}
klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
//启动一个协程定期同步deltafifo这个结构体,这里埋个坑,有空写deltafifo的时候解释
go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(w, stopCh, resyncerrc)
}
err = r.list(stopCh)这个方法我们看一下里面做了什么事情。
func (r *Reflector) list(stopCh <-chan struct{}) error {
.....
// 启动一个线程全量同步资源
go func() {
.....
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
.....
}()
// 解析成指定的结构体
items, err := meta.ExtractListWithAlloc(list)
....
initTrace.Step("Objects extracted")
// 将数据同步到indexer本地存储中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
...
}
还有一个函数没有分析,这里的watch函数主要是从listwatch的接口中获取数据变动的事件后,动态维护DeltaFiFO队列
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
.....
w, err = r.listerWatcher.Watch(options)
....
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
.....
}
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnInitialEventsEndBookmark *bool,
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
if exitOnInitialEventsEndBookmark != nil {
// set it to false just in case somebody
// made it positive
*exitOnInitialEventsEndBookmark = false
}
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
//从watch接口中获取事件
case event, ok := <-w.ResultChan():
......
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
}
}
我们看一下这个c.processLoop这个循环做了些什么事情。
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
围绕着queue,展开了腥风血雨,看起来对于queue中元素的处理落在在c.config.Process这个函数中,我们找一下这个函数在哪里被初始化的
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
deltas Deltas,
isInInitialList bool,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
clientState.Update(obj)这个方法我们点击去看会发现,他更新的就是我们上文提及结构体的indexer内部缓存。这里的handler就是s *sharedIndexInformer,我们把这个结构体的OnAdd,OnUpdate,OnDelete方法截取出来。
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
isSync := false
// If is a Sync event, isSync should be true
// If is a Replaced event, isSync is true if resource version is unchanged.
// If RV is unchanged: this is a Sync/Replaced event, so isSync is true
if accessor, err := meta.Accessor(new); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(new)
s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.processor.distribute(deleteNotification{oldObj: old}, false)
}
其中核心的方法大概就是s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false),我们看一下这个方法是在做什么。
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for listener, isSyncing := range p.listeners {
switch {
case !sync:
// non-sync messages are delivered to every listener
listener.add(obj)
case isSyncing:
// sync messages are delivered to every syncing listener
listener.add(obj)
default:
// skipping a sync obj for a non-syncing listener
}
}
}
// 还记得processorListener中的两个channel吗,这里就是往addCh中添加元素了
func (p *processorListener) add(notification interface{}) {
if a, ok := notification.(addNotification); ok && a.isInInitialList {
p.syncTracker.Start()
}
p.addCh <- notification
}
9.informerFactory.WaitForCacheSync(stopper)
等待所有启动的 Informer 的缓存被同步,真正的实现方法如下所示
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
//这里是map[reflect.Type]cache.SharedIndexInformer 这个结构的一个深度拷贝,没有做额外的逻辑
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
//这里是判断所有的informer的数据是否被完全同步
res := map[reflect.Type]bool{}
for informType, informer := range informers {
//这是一个同步的方法,只有当每一个informer的queue都已经被完全同步,保存执行结果。如果有一个informer没有同步,就一直卡在则立
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
10.deployments, err := deployLister.Deployments(“default”).List(labels.Everything())
我们追踪一下这个List方法是从远程k8s apiserver获取的还是从本地的indexer存储中获取的。
func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
.....
if namespace == metav1.NamespaceAll {
return ListAll(indexer, selector, appendFn)
}
// 结论在这里
items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace})
.....
}
写在之后
絮絮叨叨终于理完了informer的全部源码逻辑,笔者记录了自己的阅读源码的过程,期望能在未来温习的时候能够有一个参考,可能之后会更新一些流程图去加深一下记忆吧,但是至少现在,洋洋洒洒的分析确实耗尽了我的耐心。接下来有时间的话,我会继续更新controller-runtime的源码解析。如果大家觉得有什么问题,还请在评论里不吝赐教。如果大家觉得有那么一丝帮助,还请大家帮忙点赞关注,再次拜谢。