前言
最近做项目,还是K8S的插件监听器(理论上插件都是通过API-server通信),官方的不同写法居然都能出现争议,争议点就是对API-Server的请求的耗时,说是会影响API-Server。实际上通过源码分析两着有差别,但是差别不大,对API-Server的影响几乎一样。
老式写法
package main
import (
"controller/control"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
func main() {
// 读取构建 config
config, err := clientcmd.BuildConfigFromFlags("", "xxx/config")
if err != nil {
klog.Fatal(err)
}
// 创建 k8s client
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
// 指定 ListWatcher 在所有namespace下监听 pod 资源
podListWatcher := cache.NewListWatchFromClient(clientSet.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
// 创建 workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 创建 indexer 和 informer
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
// 当有 pod 创建时,根据 Delta queue 弹出的 object 生成对应的Key,并加入到 workqueue中。此处可以根据Object的一些属性,进行过滤
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(obj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
queue.Add(key)
}
},
// pod 删除操作
DeleteFunc: func(obj interface{}) {
// DeletionHandlingMetaNamespaceKeyFunc 会在生成key 之前检查。因为资源删除后有可能会进行重建等操作,监听时错过了删除信息,从而导致该条记录是陈旧的。
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
controller := control.NewController(queue, indexer, informer)
stop := make(chan struct{})
defer close(stop)
// 启动 control
go controller.Run(1, stop)
select {}
}
然后写个Controller代码
package control
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"time"
)
type Controller struct {
indexer cache.Indexer // Indexer 的引用
queue workqueue.RateLimitingInterface //workqueue 的引用
informer cache.Controller // Informer 的引用
}
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
indexer: indexer,
queue: queue,
informer: informer,
}
}
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()
klog.Info("Starting pod control")
go c.informer.Run(stopCh) // 启动 informer
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("time out waitng for caches to sync"))
return
}
// 启动多个 worker 处理 workqueue 中的对象
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
klog.Info("Stopping Pod control")
}
func (c *Controller) runWorker() {
// 启动无限循环,接收并处理消息
for c.processNextItem() {
}
}
// 从 workqueue 中获取对象,并打印信息。
func (c *Controller) processNextItem() bool {
key, shutdown := c.queue.Get()
// 退出
if shutdown {
return false
}
// 标记此key已经处理
defer c.queue.Done(key)
// 将key对应的 object 的信息进行打印
err := c.syncToStdout(key.(string))
c.handleError(err, key)
return true
}
// 获取 key 对应的 object,并打印相关信息
func (c *Controller) syncToStdout(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
fmt.Printf("Pod %s does not exist\n", obj.(*v1.Pod).GetName())
} else {
fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
}
return nil
}
func (c *Controller) handleError(err error, key interface{}) {
}
这总写法的好处是自己处理各个环节,Informer和indexer,那个queue仅仅是队列,从cache缓存取数据用的,实际看看创建过程
创建lw的过程
cache.NewListWatchFromClient
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(context.TODO()).
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(context.TODO())
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
ListAndWatch方法,函数指针,关键是List和Watch的函数,跟新的写法有些许区别
创建Informer
此处默认使用DeletionHandlingMetaNamespaceKeyFunc函数创建key
func NewIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
indexers Indexers,
) (Indexer, Controller) {
// This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
transformer TransformFunc,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
}
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)
}
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
这里注意,消费delta队列的过程 ,这里是没有加锁的,即Process函数指针
另外实际上还是创建controller内置结构体,也是client-go创建的。
新式写法
config, err := clientcmd.BuildConfigFromFlags("", "~/.kube/config")//注意路径
if err != nil {
log.Fatal(err)
}
//这2行是抓包的时候使用,日常是不需要的
config.TLSClientConfig.CAData = nil
config.TLSClientConfig.Insecure = true
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
//这里可以调一些参数,defaultResync很关键
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 0, informers.WithNamespace("default"))
informer := factory.Core().V1().Pods().Informer()//获取pod的informer,实际上使用client-go的api很多informer都创建了,直接拿过来用,避免使用的时候重复创建
informer.AddEventHandler(xxx) //事件处理,是一个回调hook
stopper := make(chan struct{}, 1)
go informer.Run(stopper)
log.Println("----- list and watch pod starting...")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
close(stopper)
log.Println("main stopped...")
实际上就是很多过程封装了,比如创建Controller的过程
lw的创建过程
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
实际上实现是有pod实现的,List最后取结果略有区别
// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.PodList{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
// Watch returns a watch.Interface that watches the requested pods.
func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
}
最关键的一点,超时,老式写法是没有超时设置的,超时的重要性不言而喻,推荐使用新写法
indexer的创建
默认使用MetaNamespaceIndexFunc函数创建key
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
创建Informer的同时创建indexer
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
除了创建key的函数不同,其他一模一样 ,但是解析delta队列确加了锁
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}
实际上http请求而言,http response关闭后http的访问就结束了,本地加锁仅仅会影响本地的执行效率,api-server无影响
根源
从代码分析,两种写法没有区别,对API-Server造成的影响仅仅是Http response的解析,老式写法解析后直接返回,新式写法的意思是创建结构体,然后结构体去处理值,并带上了超时时间。
那么为什么API-Server觉得一次请求时间很长呢,比如List的过程(Watch是长轮询,不涉及请求时长),根源在于API-Server在低版本(测试版本1.20.x)分页参数会失效。笔者自己尝试的1.25.4分页是有效的。估计是中间某次提交修复了,笔者在github看到很多关于List的提交优化
还有
1.25.4的API-Server的List过程
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// For performance tracking purposes. 创建埋点
trace := utiltrace.New("List", traceFields(req)...)
namespace, err := scope.Namer.Namespace(req)
if err != nil {
scope.err(err, w, req)
return
}
// Watches for single objects are routed to this function.
// Treat a name parameter the same as a field selector entry.
hasName := true
_, name, err := scope.Namer.Name(req)
if err != nil {
hasName = false
}
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
opts := metainternalversion.ListOptions{}
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
err = errors.NewBadRequest(err.Error())
scope.err(err, w, req)
return
}
if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
scope.err(err, w, req)
return
}
// transform fields
// TODO: DecodeParametersInto should do this.
if opts.FieldSelector != nil {
fn := func(label, value string) (newLabel, newValue string, err error) {
return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value)
}
if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
// TODO: allow bad request to set field causes based on query parameters
err = errors.NewBadRequest(err.Error())
scope.err(err, w, req)
return
}
}
if hasName {
// metadata.name is the canonical internal name.
// SelectionPredicate will notice that this is a request for
// a single object and optimize the storage query accordingly.
nameSelector := fields.OneTermEqualSelector("metadata.name", name)
// Note that fieldSelector setting explicitly the "metadata.name"
// will result in reaching this branch (as the value of that field
// is propagated to requestInfo as the name parameter.
// That said, the allowed field selectors in this branch are:
// nil, fields.Everything and field selector matching metadata.name
// for our name.
if opts.FieldSelector != nil && !opts.FieldSelector.Empty() {
selectedName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name")
if !ok || name != selectedName {
scope.err(errors.NewBadRequest("fieldSelector metadata.name doesn't match requested name"), w, req)
return
}
} else {
opts.FieldSelector = nameSelector
}
}
if opts.Watch || forceWatch {
if rw == nil {
scope.err(errors.NewMethodNotSupported(scope.Resource.GroupResource(), "watch"), w, req)
return
}
// TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=.
timeout := time.Duration(0)
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
if timeout == 0 && minRequestTimeout > 0 {
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
}
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
watcher, err := rw.Watch(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}
// Log only long List requests (ignore Watch).
defer trace.LogIfLong(500 * time.Millisecond) //超过500ms就埋点打印日志,这个埋点非常好用,建议使用
trace.Step("About to List from storage")
result, err := r.List(ctx, &opts) //API-Server实际上也是去ETCD取数据
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("Listing from storage done")
defer trace.Step("Writing http response done", utiltrace.Field{"count", meta.LenList(result)})
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
}
可以看出超过500毫秒就会打印数据,笔者测试差不多500个pod的List就是差不多500毫秒少一点,Client-Go设计默认分页参数就是500条,😅精确设计。
// GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
}
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate
trace := utiltrace.New(fmt.Sprintf("List(recursive=%v) etcd3", recursive),
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"resourceVersionMatch", match},
utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
return fmt.Errorf("need ptr to slice: %v", err)
}
去读取ETCD3的数据,可以试试把k8s的低版本安装上debug试试。分析limit失效的原因,笔者是高版本的K8S,是已经修复版本。自定义的埋点List的代码
package main
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/pager"
"k8s.io/utils/trace"
"time"
)
func TimeNewFilteredPodInformer(client *kubernetes.Clientset) error {
options := metav1.ListOptions{ResourceVersion: "0"}
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(1 * time.Millisecond)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Pods(v1.NamespaceAll).Watch(context.TODO(), options)
},
}
return lw.List(opts)
}))
list, paginatedResult, err = pager.List(context.Background(), options)
initTrace.Step("Objects listed: ")
fmt.Println("list END, is pager ", paginatedResult)
if err != nil {
fmt.Println("error is : ", err.Error())
}
close(listCh)
}()
select {
case r := <-panicCh:
panic(r)
case <-listCh:
}
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
fmt.Println("list items size is : ", len(items))
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
return nil
}
func TimeNewIndexerInformer(client *kubernetes.Clientset) error {
options := metav1.ListOptions{ResourceVersion: "0"}
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(1 * time.Millisecond)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
return lw.List(opts)
}))
list, paginatedResult, err = pager.List(context.Background(), options)
initTrace.Step("Objects listed: ")
fmt.Println("list END, is pager ", paginatedResult)
if err != nil {
fmt.Println("error is : ", err.Error())
}
close(listCh)
}()
select {
case r := <-panicCh:
panic(r)
case <-listCh:
}
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
fmt.Println("list items size is : ", len(items))
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
return nil
}
trace的包好用,这里使用的k8s的包,实际上sdk基础包也有相似的功能。
func (t *Trace) durationIsWithinThreshold() bool {
if t.endTime == nil { // we don't assume incomplete traces meet the threshold
return false
}
return t.threshold == nil || *t.threshold == 0 || t.endTime.Sub(t.startTime) >= *t.threshold
}
总结
知其然知其所以然,要想知道为什么分页不生效,需要自定义API-Server debug才行,看代码很难看出原因,因为K8S实际上估计设计的时候也考虑过这个。