k8s源码阅读:Informer源码解析

写在之前

Kubernetes的Informer机制是一种用于监控资源对象变化的机制。它提供了一种简化开发者编写控制器的方式,允许控制器能够及时感知并响应 Kubernetes 集群中资源对象的变化。Informer通过与Kubernetes API服务器进行交互,通过监听API服务器上资源对象的修改事件来实现实时的资源对象状态更新。当一个资源对象被创建、更新或删除时,Informer会收到相应的通知,并在内部维护一个缓存用于存储最新的资源对象状态。同时,Informer还能够为特定类型的资源对象设置过滤器以进行更精细的事件监听。使用Informer机制,开发者可以通过注册事件处理函数来定义对资源对象变化的响应逻辑,从而实现自定义的控制器逻辑。在运行时,Informer会定期向API服务器发起请求以获取最新的资源对象的状态,从而保证缓存中的数据与实际集群的状态保持同步。话不多说,直接进入到源码的解析环节。

1.源码阅读环境搭建

informer的源码是在client-go工程中的,需要从仓库clone完整的代码到本地。首先在GoPath路径下创建文件夹k8s.io,进入到该文件夹下,执行以下步骤。

git clone https://github.com/kubernetes/client-go.git

源码的阅读不依赖环境和操作系统,但是需要提供一个k8s集群的环境,提前将集群的kubeconfig保存在本地,以供后文中调试源码使用。

值得注意的是,笔者看的是master分支的代码,其中的go mod依赖的golang版本是1.21,建议大家选择适合自己的版本分支阅读,否则可能会出现编译报错的情况。

2. 一个informer的简单用例

将下面的代码放到kubernetes_test目录下,以方便调试源码

package kubernetes

import (
   "fmt"
   "k8s.io/apimachinery/pkg/labels"
   "k8s.io/client-go/informers"
   "k8s.io/client-go/kubernetes"
   "k8s.io/client-go/tools/cache"
   "k8s.io/client-go/tools/clientcmd"
   "log"
   "testing"
   "time"

   metav1 "k8s.io/api/apps/v1"
)

func getKubernetesClient(kubeconfig string) (*kubernetes.Clientset, error) {
   // 配置文件路径,根据你的实际情况修改

   // 使用指定的 kubeconfig 文件创建一个 Config 对象
   config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
   if err != nil {
      return nil, err
   }
   //config.Insecure = true

   // 创建客户端集
   clientset, err := kubernetes.NewForConfig(config)
   if err != nil {
      return nil, err
   }

   return clientset, nil
}

// 展示informer的核心用法
func TestInformer(t *testing.T) {
   client, err := getKubernetesClient("XXX这里是集群kubeconfig")
   if err != nil {
      log.Fatal(err.Error())
   }
   // 初始化 informer factory(为了测试方便这里设置每30s重新 List 一次)
   informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
   // 对 Deployment 监听
   deployInformer := informerFactory.Apps().V1().Deployments()
   // 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)
   informer := deployInformer.Informer()
   // 创建 Lister
   deployLister := deployInformer.Lister()
   // 注册事件处理程序
   informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    onAdd,
      UpdateFunc: onUpdate,
      DeleteFunc: onDelete,
   })

   stopper := make(chan struct{})
   defer close(stopper)

   // 启动 informer,List & Watch
   informerFactory.Start(stopper)
   // 等待所有启动的 Informer 的缓存被同步
   informerFactory.WaitForCacheSync(stopper)

   // 从本地缓存中获取 default 中的所有 deployment 列表
   deployments, err := deployLister.Deployments("default").List(labels.Everything())
   if err != nil {
      panic(err)
   }
   for idx, deploy := range deployments {
      fmt.Printf("%d -> %s\n", idx+1, deploy.Name)
   }
   <-stopper

}

func onAdd(obj interface{}) {
   deploy := obj.(*metav1.Deployment)
   fmt.Println("add a deployment:", deploy.Name)
}

func onUpdate(old, new interface{}) {
   oldDeploy := old.(*metav1.Deployment)
   newDeploy := new.(*metav1.Deployment)
   fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)
}

func onDelete(obj interface{}) {
   deploy := obj.(*metav1.Deployment)
   fmt.Println("delete a deployment:", deploy.Name)
}

简单mark一下源码分析的链路,建议大家还是一步一步使用编辑器的debug模式来阅读。
在这里插入图片描述

3.2. informers.NewSharedInformerFactory(client, time.Second*30)

初始化一个SharedInformerFactory,传入了一个定时30进行数据同步的配置项。核心代码如下:

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
   factory := &sharedInformerFactory{
      client:           client,
      namespace:        v1.NamespaceAll,
      defaultResync:    defaultResync,
      informers:        make(map[reflect.Type]cache.SharedIndexInformer),
      startedInformers: make(map[reflect.Type]bool),
      customResync:     make(map[reflect.Type]time.Duration),
   }

   // Apply all options
   for _, opt := range options {
       //这一步执行,在我们追踪的链路中,options为空
      factory = opt(factory)
   }

   return factory
}

4.deployInformer := informerFactory.Apps().V1().Deployments()

这一步是deployInformer的初始化方式。我们展开步骤2中的初始化的SharedInformerFactory结构体会发现,下面的属性字段core,policy,apps似乎是将k8s原生的资源分组进行了类别、版本的抽象。类如通过.Apps().V1().Deployments(),其实获取的就是deploymentInformer的一个实例对象:

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
   return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

看一下deploymentInformer这个结构体的定义,这里的factory就是在informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)初始化的factory,之所以关注这个factory的实例,是因为在后文中还有提及。

type deploymentInformer struct {
   factory          internalinterfaces.SharedInformerFactory
   tweakListOptions internalinterfaces.TweakListOptionsFunc
   namespace        string
}

5.informer := deployInformer.Informer()

这里调用了informerfactory结构InformerFor的方法,我们上一节已经分析过了factory的实例是什么了,我们追踪到方法中看一下这个方法是在所做什么

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
   // 注册Deployment的informer到factory中
   return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
   return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

现在继续追踪informerFor源码,这里看起来是将deployment的cache.SharedIndexInformer注册到factory里的缓存中去。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {

    //  f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
     f.lock.Lock()
   defer f.lock.Unlock()

   informerType := reflect.TypeOf(obj)
   // 1.f.informers:  informers map[reflect.Type]cache.SharedIndexInformer 这里缓存不同k8s资源的informer对象
   informer, exists := f.informers[informerType]
   if exists {
      return informer
   }

   resyncPeriod, exists := f.customResync[informerType]
   if !exists {
      resyncPeriod = f.defaultResync
   }
   // 2.customResync     map[reflect.Type]time.Duration 缓存了资源的resync时间
   informer = newFunc(f.client, resyncPeriod)
   informer.SetTransform(f.transform)
   f.informers[informerType] = informer

   return informer
}

6.deployLister := deployInformer.Lister()

返回lister实例,这个listener主要是用来从内存中获取资源对象详细信息的客户端,类比于数据库查询客户端。
我们看一下f.Informer().GetIndexer()函数是什么,返回了一个Index结构体的实现类。

func (s *sharedIndexInformer) GetIndexer() Indexer {
   return s.indexer
}
// 这个类就是在集群中的类别是deployment的所有object缓存存放的结构体
type Indexer interface {
   Store
   // Index returns the stored objects whose set of indexed values
   // intersects the set of indexed values of the given object, for
   // the named index
   Index(indexName string, obj interface{}) ([]interface{}, error)
   // IndexKeys returns the storage keys of the stored objects whose
   // set of indexed values for the named index includes the given
   // indexed value
   IndexKeys(indexName, indexedValue string) ([]string, error)
   // ListIndexFuncValues returns all the indexed values of the given index
   ListIndexFuncValues(indexName string) []string
   // ByIndex returns the stored objects whose set of indexed values
   // for the named index includes the given indexed value
   ByIndex(indexName, indexedValue string) ([]interface{}, error)
   // GetIndexers return the indexers
   GetIndexers() Indexers

   // AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
   AddIndexers(newIndexers Indexers) error
}

7.informer.AddEventHandler

注册事件处理回调函数,下面是核心逻辑实现的具体细节代码。

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
  .....

   //6.1这个是一个事件处理器的生成函数
   listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)

   if !s.started {
      //6.2首次初始化的时候就started是false的,进入到这个逻辑里
      return s.processor.addListener(listener), nil
   }

   .....
}

7.1 newProcessListener

processListener实例化的方法,该实例将消息事件路由到不同的ResourceEventHandler方法中处理。它包含两个 goroutine、两个无缓冲通道和一个无界环形缓冲区区。 add(notification) 函数将给定的通知事件发送到 addCh。一个 goroutine 运行“pop()”,它使用环形缓冲区中的存储将通知从“addCh”传递到“nextCh”。另一个 goroutine 运行 run(),它接收来自 nextCh 的通知并同步调用适当的处理程序方法。

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
   ret := &processorListener{
       // 这里是事件处理channel通道
      nextCh:                make(chan interface{}),
      // 这里是watch到k8s资源变动的接收通道
      addCh:                 make(chan interface{}),
      // 这里就是传递的handler事件处理函数
      handler:               handler,
      // 这个结构体是一个线程安全的计数器
      syncTracker:           &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
      // 无界环,个人理解主要是用来在生产、消费者模型中速率不一致时进行缓冲的一个存储
      pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
      requestedResyncPeriod: requestedResyncPeriod,
      resyncPeriod:          resyncPeriod,
   }

// 生成下一次resync的时间并保存下来
   ret.determineNextResync(now)

   return ret
}

7.2 processor.addListener(listener)

这里的processor是sharedProcessor实例化的结果,实例化的过程放在
defaultInformer这个函数中,在前文informerFor中由提及,我们把这个函数展开,看一下这个processor是怎么实例化的。

func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
   return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
   return cache.NewSharedIndexInformer(
      //初始化listwatch
      &cache.ListWatch{
         ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
            if tweakListOptions != nil {
               tweakListOptions(&options)
            }
            return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
         },
         WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
            if tweakListOptions != nil {
               tweakListOptions(&options)
            }
            return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
         },
      },
      &appsv1.Deployment{},
      resyncPeriod,
      indexers,
   )
}
//中间还有一跳,我们省略了,直接找到process的实例化代码
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
   realClock := &clock.RealClock{}

   return &sharedIndexInformer{
      // indexre的初始化逻辑在这里
      indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
      //sharedProcessor初始的位置
      processor:                       &sharedProcessor{clock: realClock},
      listerWatcher:                   lw,
      objectType:                      exampleObject,
      objectDescription:               options.ObjectDescription,
      resyncCheckPeriod:               options.ResyncPeriod,
      defaultEventHandlerResyncPeriod: options.ResyncPeriod,
      clock:                           realClock,
      cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
   }
}

addListener主要做了两件事情
1)listeners map[*processorListener]bool,是shardProcessor中的一个缓存,主要是保存定义的processorListener
2)如果p.listenersStarted 是true的话,这里会启动两个线程,一个线程运行listener.run,定时从nextCh这个channel中获取事件进行处理
一个线程运行listener.pop,定时将addCh这个channel中的事件转移到nextCh这个channel中去,两者之间是通过一个无界环状结构进行传递的
但是,按照前文的追踪逻辑这里的p.listenersStarted是false, p.wg.Start(listener.run和p.wg.Start(listener.pop)这两个典型的生产-消费者逻辑没有执行。

func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
   p.listenersLock.Lock()
   defer p.listenersLock.Unlock()

   if p.listeners == nil {
      p.listeners = make(map[*processorListener]bool)
   }

   p.listeners[listener] = true
   //这里貌似没进来
   if p.listenersStarted {
      p.wg.Start(listener.run)
      p.wg.Start(listener.pop)
   }

   return listener
}

8.informerFactory.Start(stopper)

这一步是启动factory的核心逻辑,前文中主要是进行了一些结构体的定义及初始化,并没有任何执行逻辑存在。戏台子搭好了,这一个步骤应该是核心的执行逻辑了吧。核心代码在这里,主要是把所有注册的informer启动起来。

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
   f.lock.Lock()
   defer f.lock.Unlock()

   if f.shuttingDown {
      return
   }

// 上文已经提到了,informer主要是保存k8s的资源的类runtimeObject和Informer之间的映射关系的缓存
   for informerType, informer := range f.informers {
      if !f.startedInformers[informerType] {
         f.wg.Add(1)
         // We need a new variable in each loop iteration,
         // otherwise the goroutine would use the loop variable
         // and that keeps changing.
         informer := informer
         go func() {
            defer f.wg.Done()
            //这里另起协程把factory中全部的informer启动起来
            //8.1 在这里启动indexInformer
            informer.Run(stopCh)
         }()
         f.startedInformers[informerType] = true
      }
   }
}

8.1 informer.Run(stopCh)

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  ....

   // s.controller = New(cfg),就是构建controller这个结构体,把shardInformer的start标识修改成true
   func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()

      //7.1.1 搞了一个队列
      fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
         KnownObjects:          s.indexer,
         EmitDeltaTypeReplaced: true,
         Transformer: s.transform,
      })

      //1)构建了一个配置项
      cfg := &Config{
         Queue:             fifo,
         ListerWatcher:     s.listerWatcher,
         ObjectType:        s.objectType,
         ObjectDescription: s.objectDescription,
         FullResyncPeriod:  s.resyncCheckPeriod,
         RetryOnError:      false,
         ShouldResync:      s.processor.shouldResync,

         Process:           s.HandleDeltas,
         WatchErrorHandler: s.watchErrorHandler,
      }

//2)创建controller
      s.controller = New(cfg)
      s.controller.(*controller).clock = s.clock
      s.started = true
   }()

   // Separate stop channel because Processor should be stopped strictly after controller
   processorStopCh := make(chan struct{})
   var wg wait.Group
   defer wg.Wait()              // Wait for Processor to stop
   defer close(processorStopCh) // Tell Processor to stop
   //这个不知道做了什么事情,用到的时候再说
   wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
   //3)启动controller这里是真正启动listeners的地方
   wg.StartWithChannel(processorStopCh, s.processor.run)

   defer func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()
      s.stopped = true // Don't want any new listeners
   }()
   //4) 启动controller
   s.controller.Run(stopCh)
}

8.1.1 wg.StartWithChannel(processorStopCh, s.processor.run)

这里启动了一个协程用来执行processor.run方法,我们进到这个函数中看一下这个函数做了些什么事情.

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
   func() {
      p.listenersLock.RLock()
      defer p.listenersLock.RUnlock()
      // p.listener还记得是做什么用的吗,就是保存所有的listener的缓存
      for listener := range p.listeners {
          // 启动listener,逻辑在上文已经提及
         p.wg.Start(listener.run)
         p.wg.Start(listener.pop)
      }
      p.listenersStarted = true
   }()
   <-stopCh

   // 后续都是中止逻辑了
 ...
}

8.1.2 s.controller.Run(stopCh),controller

1)构建reflector
2)启动reflector
3)启动循环处理逻辑

func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
   // 1. 初始化reflect来同步服务器和store的object
   r := NewReflectorWithOptions(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      ReflectorOptions{
         ResyncPeriod:    c.config.FullResyncPeriod,
         TypeDescription: c.config.ObjectDescription,
         Clock:           c.clock,
      },
   )
   r.ShouldResync = c.config.ShouldResync
   r.WatchListPageSize = c.config.WatchListPageSize
   if c.config.WatchErrorHandler != nil {
      r.watchErrorHandler = c.config.WatchErrorHandler
   }

   c.reflectorMutex.Lock()
   // 对reflector赋值
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group

   // 2.启动reflector
   wg.StartWithChannel(stopCh, r.Run)
   // 3.循环处理队列中的元素
   wait.Until(c.processLoop, time.Second, stopCh)
   wg.Wait()
}

我们来看一下reflector启动后在做些什么事情。记住一句话就好:首先从k8s的apiserver中利用list的方式获取指定资源的全部数据,并塞到deltaqueue中。后续通过watch的方式,监听资源的变动信息并同步到队列中国。


func (r *Reflector) Run(stopCh <-chan struct{}) {
   klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
   wait.BackoffUntil(func() {
   // 核心逻辑入口
      if err := r.ListAndWatch(stopCh); err != nil {
         r.watchErrorHandler(r, err)
      }
   }, r.backoffManager, true, stopCh)
   klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
   klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
   var err error
   var w watch.Interface
   fallbackToList := !r.UseWatchList

   //这里看起来不会落到这个执行逻辑中
   if r.UseWatchList {
       // 维持本地的资源缓存
      w, err = r.watchList(stopCh)
      if w == nil && err == nil {
         // stopCh was closed
         return nil
      }
      if err != nil {
         klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
         fallbackToList = true
         // ensure that we won't accidentally pass some garbage down the watch.
         w = nil
      }
   }

   if fallbackToList {
       // 直接借用listwatcher的list方法,从服务器全量同步资源数据
      err = r.list(stopCh)
      if err != nil {
         return err
      }
   }

   klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)

   resyncerrc := make(chan error, 1)
   cancelCh := make(chan struct{})
   defer close(cancelCh)
   //启动一个协程定期同步deltafifo这个结构体,这里埋个坑,有空写deltafifo的时候解释
   go r.startResync(stopCh, cancelCh, resyncerrc)
   return r.watch(w, stopCh, resyncerrc)
}

err = r.list(stopCh)这个方法我们看一下里面做了什么事情。

func (r *Reflector) list(stopCh <-chan struct{}) error {
   .....
   // 启动一个线程全量同步资源
   go func() {
      .....
      list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
     .....
   }()
   // 解析成指定的结构体
   items, err := meta.ExtractListWithAlloc(list) 
   ....
   initTrace.Step("Objects extracted")
   // 将数据同步到indexer本地存储中
   if err := r.syncWith(items, resourceVersion); err != nil {
      return fmt.Errorf("unable to sync list result: %v", err)
   }
   ...
}

还有一个函数没有分析,这里的watch函数主要是从listwatch的接口中获取数据变动的事件后,动态维护DeltaFiFO队列

func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
   
       .....
         w, err = r.listerWatcher.Watch(options)
       ....
      err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
       .....
}

func watchHandler(start time.Time,
   w watch.Interface,
   store Store,
   expectedType reflect.Type,
   expectedGVK *schema.GroupVersionKind,
   name string,
   expectedTypeName string,
   setLastSyncResourceVersion func(string),
   exitOnInitialEventsEndBookmark *bool,
   clock clock.Clock,
   errc chan error,
   stopCh <-chan struct{},
) error {
   eventCount := 0
   if exitOnInitialEventsEndBookmark != nil {
      // set it to false just in case somebody
      // made it positive
      *exitOnInitialEventsEndBookmark = false
   }

loop:
   for {
      select {
      case <-stopCh:
         return errorStopRequested
      case err := <-errc:
         return err
         //从watch接口中获取事件
      case event, ok := <-w.ResultChan():
        ......
         case watch.Added:
            err := store.Add(event.Object)
            if err != nil {
               utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
            }
         case watch.Modified:
            err := store.Update(event.Object)
            if err != nil {
               utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
            }
         case watch.Deleted:
            // TODO: Will any consumers need access to the "last known
            // state", which is passed in event.Object? If so, may need
            // to change this.
            err := store.Delete(event.Object)
            if err != nil {
               utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
            }
         case watch.Bookmark:
            // A `Bookmark` means watch has synced here, just update the resourceVersion
            if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
               if exitOnInitialEventsEndBookmark != nil {
                  *exitOnInitialEventsEndBookmark = true
               }
            }
       
}

我们看一下这个c.processLoop这个循环做了些什么事情。

func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

围绕着queue,展开了腥风血雨,看起来对于queue中元素的处理落在在c.config.Process这个函数中,我们找一下这个函数在哪里被初始化的

func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
   s.blockDeltas.Lock()
   defer s.blockDeltas.Unlock()

   if deltas, ok := obj.(Deltas); ok {
      return processDeltas(s, s.indexer, deltas, isInInitialList)
   }
   return errors.New("object given as Process argument is not Deltas")
}


func processDeltas(
   // Object which receives event notifications from the given deltas
   handler ResourceEventHandler,
   clientState Store,
   deltas Deltas,
   isInInitialList bool,
) error {
   // from oldest to newest
   for _, d := range deltas {
      obj := d.Object

      switch d.Type {
      case Sync, Replaced, Added, Updated:
         if old, exists, err := clientState.Get(obj); err == nil && exists {
            if err := clientState.Update(obj); err != nil {
               return err
            }
            handler.OnUpdate(old, obj)
         } else {
            if err := clientState.Add(obj); err != nil {
               return err
            }
            handler.OnAdd(obj, isInInitialList)
         }
      case Deleted:
         if err := clientState.Delete(obj); err != nil {
            return err
         }
         handler.OnDelete(obj)
      }
   }
   return nil
}

clientState.Update(obj)这个方法我们点击去看会发现,他更新的就是我们上文提及结构体的indexer内部缓存。这里的handler就是s *sharedIndexInformer,我们把这个结构体的OnAdd,OnUpdate,OnDelete方法截取出来。

// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
   // Invocation of this function is locked under s.blockDeltas, so it is
   // save to distribute the notification
   s.cacheMutationDetector.AddObject(obj)
   s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}

// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
   isSync := false

   // If is a Sync event, isSync should be true
   // If is a Replaced event, isSync is true if resource version is unchanged.
   // If RV is unchanged: this is a Sync/Replaced event, so isSync is true

   if accessor, err := meta.Accessor(new); err == nil {
      if oldAccessor, err := meta.Accessor(old); err == nil {
         // Events that didn't change resourceVersion are treated as resync events
         // and only propagated to listeners that requested resync
         isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
      }
   }

   // Invocation of this function is locked under s.blockDeltas, so it is
   // save to distribute the notification
   s.cacheMutationDetector.AddObject(new)
   s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}

// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {
   // Invocation of this function is locked under s.blockDeltas, so it is
   // save to distribute the notification
   s.processor.distribute(deleteNotification{oldObj: old}, false)
}

其中核心的方法大概就是s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false),我们看一下这个方法是在做什么。

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
   p.listenersLock.RLock()
   defer p.listenersLock.RUnlock()

   for listener, isSyncing := range p.listeners {
      switch {
      case !sync:
         // non-sync messages are delivered to every listener
         listener.add(obj)
      case isSyncing:
         // sync messages are delivered to every syncing listener
         listener.add(obj)
      default:
         // skipping a sync obj for a non-syncing listener
      }
   }
}
// 还记得processorListener中的两个channel吗,这里就是往addCh中添加元素了
func (p *processorListener) add(notification interface{}) {
   if a, ok := notification.(addNotification); ok && a.isInInitialList {
      p.syncTracker.Start()
   }
   p.addCh <- notification
}

9.informerFactory.WaitForCacheSync(stopper)

等待所有启动的 Informer 的缓存被同步,真正的实现方法如下所示

func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
   //这里是map[reflect.Type]cache.SharedIndexInformer 这个结构的一个深度拷贝,没有做额外的逻辑
   informers := func() map[reflect.Type]cache.SharedIndexInformer {
      f.lock.Lock()
      defer f.lock.Unlock()

      informers := map[reflect.Type]cache.SharedIndexInformer{}
      for informerType, informer := range f.informers {
         if f.startedInformers[informerType] {
            informers[informerType] = informer
         }
      }
      return informers
   }()

   //这里是判断所有的informer的数据是否被完全同步
   res := map[reflect.Type]bool{}
   for informType, informer := range informers {
      //这是一个同步的方法,只有当每一个informer的queue都已经被完全同步,保存执行结果。如果有一个informer没有同步,就一直卡在则立
      res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
   }
   return res
}

10.deployments, err := deployLister.Deployments(“default”).List(labels.Everything())

我们追踪一下这个List方法是从远程k8s apiserver获取的还是从本地的indexer存储中获取的。

func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
   .....
   if namespace == metav1.NamespaceAll {
      return ListAll(indexer, selector, appendFn)
   }

   // 结论在这里
   items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace})
  .....

  
}

写在之后

絮絮叨叨终于理完了informer的全部源码逻辑,笔者记录了自己的阅读源码的过程,期望能在未来温习的时候能够有一个参考,可能之后会更新一些流程图去加深一下记忆吧,但是至少现在,洋洋洒洒的分析确实耗尽了我的耐心。接下来有时间的话,我会继续更新controller-runtime的源码解析。如果大家觉得有什么问题,还请在评论里不吝赐教。如果大家觉得有那么一丝帮助,还请大家帮忙点赞关注,再次拜谢。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/325786.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机毕业设计 | SpringBoot宠物店管理系统(附源码)

1&#xff0c;绪论 项目背景 我国已经成为世界第二大经济体&#xff0c;经济实力高速发展以及百姓生活水平的普遍提高&#xff0c;不断地要求企业提供更加多元化的娱乐方式&#xff0c;更加快速和方便的服务&#xff0c;因此对宠物行业也提出了更加严格的要求&#xff0c;如管…

layabox_2d游戏A*寻路实践

使用工具 Red Blob Games 效果 项目地址 LayaAStar2D: Laya2.0引擎2D游戏使用AStar实践。

C++设计模式-- 2.代理模式 和 外观模式

文章目录 代理模式外观模式角色和职责代码演示一&#xff1a;代码演示二&#xff1a;外观模式适用场景 代理模式 代理模式的定义&#xff1a;为其他对象提供一种代理以控制对这个对象的访问。在某些情况下&#xff0c;一个对象不适合 或不能直接引用另一个对象&#xff0c;而代…

<蓝桥杯软件赛>零基础备赛20周--第13周--DFS剪枝

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 每周发1个博客&#xff0c;共20周。 在QQ群上答疑&#x…

docker安装nacos+mysql+配置网络

一、配置网络 为什么要配置网络&#xff1f;因为 Nacos 内要连接MySQL数据库的&#xff0c;我的 MySQL 数据库也是用 Docker启动的&#xff0c;所以2个容器间要通信是需要配置他们使用相同的网络。这个操作要在启动Nacos容器之前。 注意&#xff1a;这里配置的网络只在镜像内部…

zabbix6.4设置网络设备端口流量P95

P95概念&#xff1a; p95函数写法&#xff1a; 需要监控P95的设备如下&#xff1a; 先找到原来的端口接收发送速率的监控项&#xff1a; 可以看到他们归属于自动发现规则&#xff1a;端口表UP 找到自动发现规则&#xff1a; 点击创建监控项原型&#xff1a; 公式如下&#xff…

基于LLM大模型的信息提取指南

信息提取&#xff08;information Extraction&#xff09;是从文本或文档集合中自动检索与特定主题相关的特定信息的过程。 这通常涉及自然语言处理技术的使用。 使用自然语言处理来提取信息通常会导致构建复杂的逻辑&#xff0c;这些逻辑有时非常具体并且不能很好地概括。 好…

Linux例行性工作 at和crontab命令

1&#xff0c;例行性工作 例行性工作 —— 在某一时刻&#xff0c;必须要做的事情 —— 定时任务 &#xff08;比如&#xff1a;闹钟&#xff09; 例行性工作分为两种&#xff1a;“单一的例行性工作 at”和“循环的例行性工作 crontab” 2&#xff0c;单一执行的例行性工作 …

C 程序运行机制

1.编辑 编写C语言源程序代码&#xff0c;源程序文件以“.c”作为扩展名。 2.编译 将C语言源程序转换为目标程序(或目标文件)。如果程序没有错误&#xff0c;没有任何提示&#xff0c;就会生成一个扩展名为“.obj”的二进制文件。C语言中的每条可执行语句经过编译后最终都将被…

Go语言中的HTTP请求发送

在Go语言中&#xff0c;发送HTTP请求是一种常见的网络操作。Go语言的net/http包提供了强大的API&#xff0c;使开发者能够轻松地构建HTTP请求并处理响应。 下面我们将详细介绍如何使用Go语言发送HTTP请求&#xff0c;包括设置请求参数、处理响应状态码和头部信息、发送JSON数据…

中湖盐——健康盐,盐中贵族

祁连山古老的盐湖,有一个水晶女和玉莹郎的传说,他们来到凡间做盐为生,做出的盐像水晶玉莹一样纯净。直到有一天,始皇帝传旨修长城,玉莹和水晶不忍百姓疾苦,用相思之泪化为了水晶般的白盐;血肉精华冶炼成萤石般的盐根,造福一方。 雪水溶积汇入的盐湖,水天倒映,美不胜收,中湖盐坚…

Zero-Shot Learning—A Comprehensive Evaluation of the Good, the Bad and the Ugly

目录 背景知识why zero-shot learning?广义零样本学习设置 1 INTRODUCTION1.1 zero-shot learning——methods1.2 zero-shot learning——datasets1.3 zero-shot learning——evaluation protocol 2 RELATED WORK2.1 早期工作2.1.1 Attribute-based classification for zero-s…

STC8H8K蓝牙智能巡线小车——3.按键开关状态获取

电路分析 引脚为P37开关未按下时&#xff0c;P37是高电平开关按下时&#xff0c;GND导通&#xff0c;P37是低电平 编程思路 Driver目录中添加KEY.h文件&#xff0c;应包含引脚定义、开关GPIO实例化函数、开关状态获取函数以及当按下和未按下时执行不同的函数&#xff08;函数…

多级缓存架构(三)OpenResty Lua缓存

文章目录 一、nginx服务二、OpenResty服务1. 服务块定义2. 配置修改3. Lua程序编写4. 总结 三、运行四、测试五、高可用集群1. openresty2. tomcat 通过本文章&#xff0c;可以完成多级缓存架构中的Lua缓存。 一、nginx服务 在docker/docker-compose.yml中添加nginx服务块。…

全光谱护眼灯有哪些?寒假护眼台灯推荐

全光谱指的是包含了整个可见光谱范围以及部分红外和紫外光的光线。通常的白炽灯或荧光灯只能发出有限范围内的光波&#xff0c;而全光谱台灯通过使用多种类型的LED灯或荧光灯管来产生更广泛的光谱。这样的光谱更接近自然光&#xff0c;能够提供更真实的颜色还原和更好的照明效果…

Spring Cloud微服务基础入门

文章目录 发现宝藏前言环境准备创建第一个微服务1. 创建Spring Boot项目2. 创建微服务模块3. 编写微服务代码4. 创建一个简单的REST控制器 运行微服务 总结好书推荐 发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不…

懒得玩游戏--帮我做数独

目录 简介自动解数独思路核心思路输入解析打印 完整代码 简介 最近玩上了一款类似于数独的微信小程序游戏&#xff0c;名字叫数独趣味闯关&#xff0c;过了数独的关卡之后会给拼图&#xff0c;玩了几关之后摸清套路了就有点累了&#xff0c;但是还想集齐拼图&#xff0c;所以就…

为什么使用 atan2(sin(z), cos(z)) 进行角度归一化?

文章目录 为什么使用 atan2(sin(z), cos(z)) 进行归一化&#xff1f;为什么归一化后的角度等于原始角度&#xff1f; atan2 方法返回 -π 到 π 之间的值&#xff0c;代表点 (x, y) 相对于正X轴的偏移角度。这个角度是逆时针测量的&#xff0c;以弧度为单位。关于 atan2 函数为…

【第十四课】并查集(acwing-836合并集合 / 做题思路 /c++代码)

目录 错误思路(但能骗分emm)--邻接矩阵(可以跳过) 思路 存在的问题 代码如下 并查集 思路 代码如下 一些解释 错误思路(但能骗分emm)--邻接矩阵(可以跳过) 思路 刚看到这道题我自己做的时候&#xff0c;因为之前学的trie树的时候意识到使用二维数组的含义&#xff0c;…

群发邮件的免费软件?做外贸用什么邮箱好?

群发邮件的免费软件有哪些&#xff1f;好用的邮件群发软件&#xff1f; 在数字时代&#xff0c;邮件已成为人们沟通的主要方式之一。有时候&#xff0c;我们需要给大量的联系人发送信息&#xff0c;这时候&#xff0c;群发邮件就显得格外重要。接下来蜂邮就来探讨一下那些值得…