K8s源码分析(二)-K8s调度队列介绍

本文首发在个人博客上,欢迎来踩!
本次分析参考的K8s版本是

文章目录

  • 调度队列简介
  • 调度队列源代码分析
    • 队列初始化
    • QueuedPodInfo元素介绍
    • ActiveQ源代码介绍
    • UnschedulableQ源代码介绍
    • **BackoffQ**源代码介绍
    • 队列弹出待调度的Pod
    • 队列增加新的待调度的Pod
    • pod调度失败返回队列的处理
    • flushBackoffQCompleted
    • flushUnschedulablePodsLeftover
  • 调度队列总结

调度队列简介

这里是官方对于K8s中调度队列的介绍,很值得一看:Scheduling queue in kube-scheduler。整体的架构如下图所示。

请添加图片描述

简单来说K8s中的调度队列主要有3种:

  • ActiveQ(heap结构):在每个调度周期开始时都会从这里取出一个Pod尝试调度。一开始提交的所有没有指定.spec.nodeName的Pod都会发送到这里,也会接收来自unschedulableQ和BackoffQ刷新来的pod。默认的排序规则是按照优先级进行排列,高优先级的Pod在前面。
  • UnschedulableQ(Map结构):存储调度失败的Pod,以等待资源更新、其他相关Pod调度成功等事件,从而将其的Pod其进行重调度。
  • BackoffQ(heap结构):用来暂时退避的队列,默认的排列规则是按退避时间的长度进行排序,需要退避的时间短的Pod在前面。为了防止Pod频繁的重调度,每个Pod都会记录自己的重调度次数,退避时间随着每次失败的调度尝试呈指数增长,直到达到最大值,例如尝试失败 3 次的 Pod 的目标退避超时设置为 curTime + 2s^3 (8s)。注意有两种情况下Pod会进入到BackoffQ队列中:
    • unscheduleableQ会定时对其中的所有pod进行重调度,那么就需要计算各个pod是否退避了足够的时间,如果没有就放入到BackoffQ中再退避一段时间。
    • 如果一个Pod调度失败时,正好这时又异步地发生了资源变更事件(p.moveRequestCycle **>=** podSchedulingCycle )(schedulingCycle 是当前调度的周期,ActiveQ队列每pop一个pod,就加1,moveRequestCycle是事件发生时schedulingCycle 的值),那么就不会放入UnschedulableQ中,而是会直接放入到BackoffQ中。

调度队列机制有两个在后台运行的定期刷新 go协程,负责将 pod 移动到活动队列,后续也将详细介绍相关代码:

  • **flushUnschedulablePodsLeftover:**每 30 秒运行一次,将 Pod 从UnschedulableQ中移动,以允许未由任何事件移动的不可调度的 Pod 再次重试。
  • **flushBackoffQCompleted:**每1秒运行一次,将BackoffQ中已经回避了足够久的Pod移动到ActiveQ队列中

移动请求(move request)会触发一个事件,该事件负责将 Pod 从UnschedulableQ移动到ActiveQ或BackoffQ。集群中许多事件可以触发移动请求的发生,包括了 Pod、节点、服务、PV、PVC、存储类和 CSI 节点的更改。例如当某些pod被调度时,UnschedulableQ中与其具有亲和性要求而导致之前无法调度的pod就会被移动出去,或者当某个新node加入时,原本因为资源不够导致无法调度的Pod也会被移动出去。

调度队列源代码分析

队列初始化

Scheduler中的调度队列SchedulingQueueinternalqueue.SchedulingQueue类型,该类型的实现在pkg/scheduler/internal/queue/scheduling_queue.go:92,如下。

// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
	framework.PodNominator
	Add(pod *v1.Pod) error
	// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
	// The passed-in pods are originally compiled from plugins that want to activate Pods,
	// by injecting the pods through a reserved CycleState struct (PodsToActivate).
	Activate(pods map[string]*v1.Pod)
	// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
	// The podSchedulingCycle represents the current scheduling cycle number which can be
	// returned by calling SchedulingCycle().
	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
	// SchedulingCycle returns the current number of scheduling cycle which is
	// cached by scheduling queue. Normally, incrementing this number whenever
	// a pod is popped (e.g. called Pop()) is enough.
	SchedulingCycle() int64
	// Pop removes the head of the queue and returns it. It blocks if the
	// queue is empty and waits until a new item is added to the queue.
	Pop() (*framework.QueuedPodInfo, error)
	Update(oldPod, newPod *v1.Pod) error
	Delete(pod *v1.Pod) error
	MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
	AssignedPodAdded(pod *v1.Pod)
	AssignedPodUpdated(pod *v1.Pod)
	PendingPods() ([]*v1.Pod, string)
	// Close closes the SchedulingQueue so that the goroutine which is
	// waiting to pop items can exit gracefully.
	Close()
	// Run starts the goroutines managing the queue.
	Run()
}

上述代码定义了其需要的对队列中的元素添加、删除、更新、获取、运行等方法。而其标准实现PriorityQueue

pkg/scheduler/internal/queue/scheduling_queue.go:145 中,首先查看其需要的变量:

// PriorityQueue implements a scheduling queue.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has two sub queues and a additional data structure, namely: activeQ,
// backoffQ and unschedulablePods.
//   - activeQ holds pods that are being considered for scheduling.
//   - backoffQ holds pods that moved from unschedulablePods and will move to
//     activeQ when their backoff periods complete.
//   - unschedulablePods holds pods that were already attempted for scheduling and
//     are currently determined to be unschedulable.
type PriorityQueue struct {
	*nominator

	stop  chan struct{}
	clock clock.Clock

	// pod initial backoff duration.
	podInitialBackoffDuration time.Duration
	// pod maximum backoff duration.
	podMaxBackoffDuration time.Duration
	// the maximum time a pod can stay in the unschedulablePods.
	podMaxInUnschedulablePodsDuration time.Duration

	cond sync.Cond

	// activeQ is heap structure that scheduler actively looks at to find pods to
	// schedule. Head of heap is the highest priority pod.
	activeQ *heap.Heap
	// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
	// are popped from this heap before the scheduler looks at activeQ
	podBackoffQ *heap.Heap
	// unschedulablePods holds pods that have been tried and determined unschedulable.
	unschedulablePods *UnschedulablePods
	// schedulingCycle represents sequence number of scheduling cycle and is incremented
	// when a pod is popped.
	schedulingCycle int64
	// moveRequestCycle caches the sequence number of scheduling cycle when we
	// received a move request. Unschedulable pods in and before this scheduling
	// cycle will be put back to activeQueue if we were trying to schedule them
	// when we received move request.
	moveRequestCycle int64

	clusterEventMap map[framework.ClusterEvent]sets.String
	// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.
	preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin

	// closed indicates that the queue is closed.
	// It is mainly used to let Pop() exit its control loop while waiting for an item.
	closed bool

	nsLister listersv1.NamespaceLister

	metricsRecorder metrics.MetricAsyncRecorder
	// pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled.
	pluginMetricsSamplePercent int
}

pkg/scheduler/internal/queue/scheduling_queue.go:291 中给出了生成了该队列的初始化方法

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
	lessFn framework.LessFunc,
	informerFactory informers.SharedInformerFactory,
	opts ...Option,
) *PriorityQueue {
	options := defaultPriorityQueueOptions
	if options.podLister == nil {
		options.podLister = informerFactory.Core().V1().Pods().Lister()
	}
	for _, opt := range opts {
		opt(&options)
	}

	comp := func(podInfo1, podInfo2 interface{}) bool {
		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
		return lessFn(pInfo1, pInfo2)
	}

	pq := &PriorityQueue{
		nominator:                         newPodNominator(options.podLister),
		clock:                             options.clock,
		stop:                              make(chan struct{}),
		podInitialBackoffDuration:         options.podInitialBackoffDuration,
		podMaxBackoffDuration:             options.podMaxBackoffDuration,
		podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
		activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
		unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
		moveRequestCycle:                  -1,
		clusterEventMap:                   options.clusterEventMap,
		preEnqueuePluginMap:               options.preEnqueuePluginMap,
		metricsRecorder:                   options.metricsRecorder,
		pluginMetricsSamplePercent:        options.pluginMetricsSamplePercent,
	}
	pq.cond.L = &pq.lock
	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
	pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()

	return pq
}

可以看到其包含了许多我们上面介绍的概念,包括activeQunschedulablePodspodBackoffQschedulingCyclemoveRequestCycle

QueuedPodInfo元素介绍

这里也多次出现了QueuedPodInfo这个关键的数据结构,它是Pod中的基础元素,在此进行介绍,其定义在pkg/scheduler/framework/types.go:98 中,包括了PodInfo、添加时间、尝试次数等

// QueuedPodInfo is a Pod wrapper with additional information related to
// the pod's status in the scheduling queue, such as the timestamp when
// it's added to the queue.
type QueuedPodInfo struct {
	*PodInfo
	// The time pod added to the scheduling queue.
	Timestamp time.Time
	// Number of schedule attempts before successfully scheduled.
	// It's used to record the # attempts metric.
	Attempts int
	// The time when the pod is added to the queue for the first time. The pod may be added
	// back to the queue multiple times before it's successfully scheduled.
	// It shouldn't be updated once initialized. It's used to record the e2e scheduling
	// latency for a pod.
	InitialAttemptTimestamp time.Time
	// If a Pod failed in a scheduling cycle, record the plugin names it failed by.
	UnschedulablePlugins sets.String
	// Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not.
	Gated bool
}

PodInfo的定义在pkg/scheduler/framework/types.go:131

// PodInfo is a wrapper to a Pod with additional pre-computed information to
// accelerate processing. This information is typically immutable (e.g., pre-processed
// inter-pod affinity selectors).
type PodInfo struct {
	Pod                        *v1.Pod
	RequiredAffinityTerms      []AffinityTerm
	RequiredAntiAffinityTerms  []AffinityTerm
	PreferredAffinityTerms     []WeightedAffinityTerm
	PreferredAntiAffinityTerms []WeightedAffinityTerm
}

Pod的定义在staging/src/k8s.io/api/core/v1/types.go:4202

// Pod is a collection of containers that can run on a host. This resource is created
// by clients and scheduled onto hosts.
type Pod struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	// +optional
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	// Specification of the desired behavior of the pod.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
	// +optional
	Spec PodSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

	// Most recently observed status of the pod.
	// This data may not be up to date.
	// Populated by the system.
	// Read-only.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
	// +optional
	Status PodStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

ActiveQ源代码介绍

从初始化代码中可以看到ActiveQ是一个heap,其相关定义在pkg/scheduler/internal/heap/heap.go

// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
	// data stores objects and has a queue that keeps their ordering according
	// to the heap invariant.
	data *data
	// metricRecorder updates the counter when elements of a heap get added or
	// removed, and it does nothing if it's nil
	metricRecorder metrics.MetricRecorder
}
// data is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type data struct {
	// items is a map from key of the objects to the objects and their index.
	// We depend on the property that items in the map are in the queue and vice versa.
	items map[string]*heapItem
	// queue implements a heap data structure and keeps the order of elements
	// according to the heap invariant. The queue keeps the keys of objects stored
	// in "items".
	queue []string

	// keyFunc is used to make the key used for queued item insertion and retrieval, and
	// should be deterministic.
	keyFunc KeyFunc
	// lessFunc is used to compare two objects in the heap.
	lessFunc lessFunc
}

可以看到他是用queue实现了一个heap。

ActiveQ的默认排序代码在pkg/scheduler/framework/plugins/queuesort/priority_sort.go:42中,即按优先级进行排序,如果优先级相同就提交时间的早晚进行排序。

// Less is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodQueueInfo.timestamp.
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
	p1 := corev1helpers.PodPriority(pInfo1.Pod)
	p2 := corev1helpers.PodPriority(pInfo2.Pod)
	return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}

UnschedulableQ源代码介绍

UnschedulableQ进行初始化的具体代码在pkg/scheduler/internal/queue/scheduling_queue.go:998

// newUnschedulablePods initializes a new object of UnschedulablePods.
func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRecorder) *UnschedulablePods {
	return &UnschedulablePods{
		podInfoMap:            make(map[string]*framework.QueuedPodInfo),
		keyFunc:               util.GetPodFullName,
		unschedulableRecorder: unschedulableRecorder,
		gatedRecorder:         gatedRecorder,
	}
}

其具体的定义代码在pkg/scheduler/internal/queue/scheduling_queue.go:939 ,

// UnschedulablePods holds pods that cannot be scheduled. This data structure
// is used to implement unschedulablePods.
type UnschedulablePods struct {
	// podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo.
	podInfoMap map[string]*framework.QueuedPodInfo
	keyFunc    func(*v1.Pod) string
	// unschedulableRecorder/gatedRecorder updates the counter when elements of an unschedulablePodsMap
	// get added or removed, and it does nothing if it's nil.
	unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}

可以看到他没有进行heap的包装,而是直接采用Map结构进行保存。

BackoffQ源代码介绍

BackoffQ也是一个heap,与ActiveQ不同的一点在于排序函数不同,其排序函数的定义在pkg/scheduler/internal/queue/scheduling_queue.go:888

func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
	pInfo1 := podInfo1.(*framework.QueuedPodInfo)
	pInfo2 := podInfo2.(*framework.QueuedPodInfo)
	bo1 := p.getBackoffTime(pInfo1)
	bo2 := p.getBackoffTime(pInfo2)
	return bo1.Before(bo2)
}

getBackoffTime的定义在pkg/scheduler/internal/queue/scheduling_queue.go:911中,即计算完成避让的时间

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
	duration := p.calculateBackoffDuration(podInfo)
	backoffTime := podInfo.Timestamp.Add(duration)
	return backoffTime
}

可以看到队列排序时会将完成避让最早的pod放在前面。

然后再看其是如何计算避让时间的,在pkg/scheduler/internal/queue/scheduling_queue.go

// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
	duration := p.podInitialBackoffDuration
	for i := 1; i < podInfo.Attempts; i++ {
		// Use subtraction instead of addition or multiplication to avoid overflow.
		if duration > p.podMaxBackoffDuration-duration {
			return p.podMaxBackoffDuration
		}
		duration += duration
	}
	return duration
}

其计算可以理解为初次为p.podInitialBackoffDuration,每次需要的避让时间都是前一次的两倍,如果计算得到的避让时间大于p.podMaxBackoffDuration/2 ,就将避让时间设置为p.podMaxBackoffDuration

队列弹出待调度的Pod

其代码在pkg/scheduler/internal/queue/scheduling_queue.go:593

// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
	p.lock.Lock()
	defer p.lock.Unlock()
	for p.activeQ.Len() == 0 {
		// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
		// When Close() is called, the p.closed is set and the condition is broadcast,
		// which causes this loop to continue and return from the Pop().
		if p.closed {
			return nil, fmt.Errorf(queueClosed)
		}
		p.cond.Wait()
	}
	obj, err := p.activeQ.Pop()
	if err != nil {
		return nil, err
	}
	pInfo := obj.(*framework.QueuedPodInfo)
	pInfo.Attempts++
	p.schedulingCycle++
	return pInfo, nil
}

可以看到如果activeQ中没有需要调度的Pod了那么就会使用p.cond.Wait来进行等待,否则就冲activeQ中Pop一个元素QueuedPodInfo,同时这个QueuedPodInfo 的Attempts会+1,整个队列中的schedulingCycle也会增加。

队列增加新的待调度的Pod

其代码在pkg/scheduler/internal/queue/scheduling_queue.go:398

// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(pod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()

	pInfo := p.newQueuedPodInfo(pod)
	gated := pInfo.Gated
	if added, err := p.addToActiveQ(pInfo); !added {
		return err
	}
	if p.unschedulablePods.get(pod) != nil {
		klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
		p.unschedulablePods.delete(pod, gated)
	}
	// Delete pod from backoffQ if it is backing off
	if err := p.podBackoffQ.Delete(pInfo); err == nil {
		klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
	}
	klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQName)
	metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
	p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
	p.cond.Broadcast()

	return nil
}

主要是将要加入的pod转化为QueuedPodInfo类型,然后添加到activeQ队列中,还需要检查其他队列中是否有这个pod,如果有就删除,同时做一些日志相关记录,然后还会调用p.cond.Broadcast()来解除上述提到的p.cond.Wait 的等待。

pod调度失败返回队列的处理

当Pod调度失败后,会调用来AddUnschedulableIfNotPresent函数来进行处理,其代码位置在pkg/scheduler/internal/queue/scheduling_queue.go 中。

// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
// unschedulable pods in `unschedulablePods`. But if there has been a recent move
// request, then the pod is put in `podBackoffQ`.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	pod := pInfo.Pod
	if p.unschedulablePods.get(pod) != nil {
		return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
	}

	if _, exists, _ := p.activeQ.Get(pInfo); exists {
		return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
	}
	if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
		return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
	}

	// Refresh the timestamp since the pod is re-added.
	pInfo.Timestamp = p.clock.Now()

	// If a move request has been received, move it to the BackoffQ, otherwise move
	// it to unschedulablePods.
	for plugin := range pInfo.UnschedulablePlugins {
		metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
	}
	if p.moveRequestCycle >= podSchedulingCycle {
		if err := p.podBackoffQ.Add(pInfo); err != nil {
			return fmt.Errorf("error adding pod %v to the backoff queue: %v", klog.KObj(pod), err)
		}
		klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", backoffQName)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
	} else {
		p.unschedulablePods.addOrUpdate(pInfo)
		klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", unschedulablePods)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()

	}

	p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
	return nil
}

这里首先检查了其他队列中是否含有该pod,如果有就返回错误,然后比较moveRequestCyclepodSchedulingCycle ,如果p.moveRequestCycle >= podSchedulingCycle 那就说明在刚刚调度这个pod的时候集群发生了变化,可能现在可以成功调度这个pod了,将其转入backoffQ中,不然就正常加入unschedulableQ中。

flushBackoffQCompleted

在队列运行时会初始化两个go协程,来分别不停检查backoffQunschedulableQ,以及时将相关的Pod移出。代码在pkg/scheduler/internal/queue/scheduling_queue.go:333

// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

对于flushBackoffQCompleted即是每1s运行一次,直到接收到p.stop信息。对flushBackoffQCompleted 函数的具体定义在pkg/scheduler/internal/queue/scheduling_queue.go:537中,如下

// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {
	p.lock.Lock()
	defer p.lock.Unlock()
	activated := false
	for {
		rawPodInfo := p.podBackoffQ.Peek()
		if rawPodInfo == nil {
			break
		}
		pInfo := rawPodInfo.(*framework.QueuedPodInfo)
		pod := pInfo.Pod
		if p.isPodBackingoff(pInfo) {
			break
		}
		_, err := p.podBackoffQ.Pop()
		if err != nil {
			klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
			break
		}
		if err := p.activeQ.Add(pInfo); err != nil {
			klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
		} else {
			klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName)
			metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
			activated = true
		}
	}

	if activated {
		p.cond.Broadcast()
	}
}

其主要内容就是从backOffQ的首个元素开始查看,检查器是否已经过了避让时间,如果过了就将其放入到activeQ队列中,直到首个元素没有达到避让时间或者队列为空。

flushUnschedulablePodsLeftover

flushUnschedulablePodsLeftover每30s运行一次,这部分的代码在pkg/scheduler/internal/queue/scheduling_queue.go:572中,如下

// flushUnschedulablePodsLeftover moves pods which stay in unschedulablePods
// longer than podMaxInUnschedulablePodsDuration to backoffQ or activeQ.
func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
	p.lock.Lock()
	defer p.lock.Unlock()

	var podsToMove []*framework.QueuedPodInfo
	currentTime := p.clock.Now()
	for _, pInfo := range p.unschedulablePods.podInfoMap {
		lastScheduleTime := pInfo.Timestamp
		if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
			podsToMove = append(podsToMove, pInfo)
		}
	}

	if len(podsToMove) > 0 {
		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
	}
}

可以看到其主要作用是遍历所有的pod,如果其在unschedulableQ中呆的时间如果超过了最大的p.podMaxInUnschedulablePodsDuration时间,就会将其移出去,至于是移动到activeQ中还是移动到backoffQ中,取决于movePodsToActiveOrBackoffQueue函数,在pkg/scheduler/internal/queue/scheduling_queue.go:771中,如下

// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
	activated := false
	for _, pInfo := range podInfoList {
		// If the event doesn't help making the Pod schedulable, continue.
		// Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes
		// either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
		// In that case, it's desired to move it anyways.
		if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
			continue
		}
		pod := pInfo.Pod
		if p.isPodBackingoff(pInfo) {
			if err := p.podBackoffQ.Add(pInfo); err != nil {
				klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
			} else {
				klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName)
				metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
				p.unschedulablePods.delete(pod, pInfo.Gated)
			}
		} else {
			gated := pInfo.Gated
			if added, _ := p.addToActiveQ(pInfo); added {
				klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName)
				activated = true
				metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
				p.unschedulablePods.delete(pod, gated)
			}
		}
	}
	p.moveRequestCycle = p.schedulingCycle
	if activated {
		p.cond.Broadcast()
	}
}

注意这个函数不仅仅是在flushUnschedulablePodsLeftover中被调用,还会在处理其他移动请求时触发,只不过这里的移动请求是UnschedulableTimeout ,判断到底是如何移动也很容易从代码中看出,如果已经到达了避让时间,就加入到activeQ中,如果没有就加入到backoffQ中,注意到如果有移动进activeQ中,也是需要执行p.cond.Broadcast(),同时注意到这里更新了moveRequestCycleschedulingCycle,这也是其统一更新moveRequestCycle 的地方。

调度队列总结

考虑到调度队列的细节,我们可以用下图来对其进行归纳回顾。

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/612849.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++:week3:数据结构与算法

文章目录 (十一) 常用数据结构1.动态数组(1)模型(2).h与.c(3)实现 2.链表(1)模型(2)分类(3)基本操作(API)(4)实现(5)链表常见面试题(6)空间与时间 3.栈(1)模型(2)基本操作(3)实现(4)栈的应用 4.队列(1)模型(2)基本操作(API)(3)实现(4)队列的应用 5.哈希表(1)哈希表的提出原因(2…

支付宝小程序如何去除页面下拉回弹

描述&#xff1a;支付宝小程序页面下拉时会产生回弹&#xff0c;如果页面上有拖拽功能&#xff0c;会有影响 解决方法&#xff1a; 页面xx.config.js中设置&#xff1a;allowsBounceVertical: “NO” 官方文档&#xff1a;https://opensupport.alipay.com/support/FAQ/7110b5d…

什么是Jetpack

Jetpack Jetpack 是一套组件库、工具&#xff0c;可帮助开发人员遵循最佳做法&#xff0c;减少样板代码并编写可在 Android 版本和设备上一致工作的代码&#xff0c;以便开发人员可以专注于他们关心的代码 组成 主要包含四部分&#xff1a;架构&#xff08;Architecture&…

手写一个SPI FLASH 读写擦除控制器(未完)

文章目录 flash读写数据的特点1. 扇擦除SE&#xff08;Sector Erase&#xff09;1.1 flash_se 模块设计1.1.1 信号连接示意图&#xff1a;1.1.2 SE状态机1.1.3 波形图设计&#xff1a;1.1.4 代码 2. 页写PP(Page Program)2.1 flash_pp模块设计2.1.1 信号连接示意图&#xff1a;…

社交媒体数据恢复:快手、快手极速版

一、备份快手数据 登录快手账号&#xff1a;首先&#xff0c;打开快手APP&#xff0c;登录您的快手账号。 进入设置&#xff1a;在快手首页点击右上角的三条横线图标&#xff0c;进入设置页面。 数据备份&#xff1a;在设置页面找到“备份与恢复”选项&#xff0c;点击进入。…

【机器学习】 人工智能和机器学习辅助决策在空战中的未来选择

&#x1f680;传送门 &#x1f680;文章引言&#x1f512;技术层面&#x1f4d5;作战结构&#x1f308;替代决策选项&#x1f3ac;选项 1&#xff1a;超级战争&#xff08;Hyperwar&#xff09;&#x1f320;选项 2&#xff1a;超越OODA&#x1f302;选项 3&#xff1a;阻止其他…

【漏洞复现】用友U8-Cloud XChangeServlet XXE漏洞

0x01 产品简介 用友U8Cloud是用友推出的新一代云ERP,主要聚焦成长型、创新型企业,提供企业级云ERP整体解决方案。 0x02 漏洞概述 用友U8 cloud /service/XChangeServlet接口存在XXE漏洞,未授权的攻击者可通过此漏洞获取数据库敏感信息,从而盗取服务器数据,造成服务器信…

API接口开发实现一键智能化自动抓取电商平台热销商品数据支持高并发免费接入示例

为了实现一键智能化自动抓取电商平台热销商品数据支持高并发免费接入&#xff0c;你可以使用Python编程语言和相关库&#xff08;如requests、BeautifulSoup等&#xff09;来开发一个API接口&#xff0c;也可以使用封装好的api接口获取&#xff0c;注册一个api账号获取key和sec…

代码审计平台sonarqube的安装及使用

docker搭建代码审计平台sonarqube 一、代码审计关注的质量指标二、静态分析技术分类三、使用sonarqube的目的四、sonarqube流程五、docker快速搭建sonarqube六、sonarqube scanner的安装和使用七、sonarqube对maven项目进行分析八、sonarqube分析报告解析九、代码扫描规则定制十…

使用 AI Assistant for Observability 和组织的运行手册增强 SRE 故障排除

作者&#xff1a;Almudena Sanz Oliv, Katrin Freihofner, Tom Grabowski 通过本指南&#xff0c;你的 SRE 团队可以实现增强的警报修复和事件管理。 可观测性 AI 助手可帮助用户使用自然语言界面探索和分析可观测性数据&#xff0c;利用自动函数调用来请求、分析和可视化数据…

【二叉树算法题记录】二叉树的所有路径,路径总和——回溯

目录 257. 二叉树的所有路径题目描述题目分析cpp代码 112. 路径总和题目描述题目分析cpp代码 257. 二叉树的所有路径 题目描述 给你一个二叉树的根节点root &#xff0c;按任意顺序&#xff0c;返回所有从根节点到叶子节点的路径。 题目分析 其实从根节点往下走&#xff0c…

VM虚拟机安装调试(步骤如下图)

VM虚拟机安装调试 随着一顿安装操作&#xff0c;还有enter键敲下&#xff0c;出现如下界面。

从木匠到编程匠的传承

我的父亲在1906年出生于宁波北仑西岙村。年轻时他在老家从木匠学徒做起&#xff0c;学到了一手好技艺。 宁波乡下的老式雕花木床分为三弯、五弯、七弯等三种档次&#xff0c;其中七弯的做工最复杂。父亲说&#xff0c;他是会做七弯木床的。 上世纪三十年代&#xff0c;父亲举…

Spring Cloud Gateway 11种断言工厂

系列文章目录 文章目录 系列文章目录前言前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 Spring Cloud Gateway路由匹配是Spring WebFlux基础功能的一部分,在Spri…

商场综合体能源监管平台,实现能源高效管理

商场作为大型综合体建筑&#xff0c;其能源消耗一直是备受关注的问题。为了有效管理商场能耗&#xff0c;提高商场能源效率&#xff0c;商场综合体能源监管平台应运而生。 商场综合体能源监管平台可通过软硬件一起进行节能监管&#xff0c;硬件设备包括各种传感器、监测仪表和…

设置多用户远程登录windows server2016服务器

1、远程登录windows server 2016 运行—>mstsc—>远程IP地址—>用户和密码 2、远程windows服务器设置多用户策略 运行—>gpedit.msc->计算机配置—管理模板—windows组件—远程桌面服务—远程桌面会话主机----连接,如下图所示: 1、《限制连接的数量》设置为…

前端Vue uView 组件<u-search> 自定义右侧搜索按钮样式

前言 uView 文档的效果不是ui设计的样式 需要重新编辑 原效果 ui设计效果 解决方案 设置里说明的需要传一个样式对象 这个对象 需要写在 script 标签里面 这里需要遵循驼峰命名 比如font-size 改为 fontSize lineHeight和textAlign为水平锤子居中效果 searchStyle: {ba…

手机一键换ip地址到湖南

怎么把手机ip改成湖南&#xff1f;想要手机一键换ip地址到湖南&#xff0c;可以试试使用虎观代理app&#xff0c;一键操作&#xff0c;简单又方便。你会惊讶地发现&#xff0c;更改手机IP至湖南竟然如此便捷。 选用APP优势&#xff1a; ★IP资源丰富 已经在国内各大省份提供服…

Python作业三:扫描目录文件,发送到指定邮箱

问&#xff1a; 作业任务&#xff1a;编写python代码&#xff0c;扫描指定的目录下的所有文件&#xff0c;将这些扫描的文本内容邮件发送到指定邮箱(如&#xff1a;自己的qq邮箱) 发送邮箱&#xff1a;yagmail 以 163 邮箱为例&#xff0c;在编码之前&#xff0c;我们需要开…

C++ 多态 - 下

目录 1. 多态的原理 1.1. 虚函数表 1.2. 多态原理 1.3. 静态绑定和动态绑定 1.3.1. 运行时决议 1.3.2. 编译时决议 1.4. 为什么基类的对象调用虚函数不能构成多态 2. 单继承中的虚函数表 2.1. 同类型对象的虚表 2.2. 单继承的对象的虚表 2.2.1. 内存窗口查看 2.2.2…