时间:2022-12-06来源:www.pcxitongcheng.com作者:电脑系统城
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
func (c *controller) processLoop() { for { // Pop出Object元素 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // 重新进队列 c.config.Queue.AddIfNotPresent(obj) } } } } // 去查看Pop的具体实现 点进Pop 找到fifo.go func (f *FIFO) Pop(process PopProcessFunc) ( interface {}, error ) { f.lock.Lock() defer f.lock.Unlock() for { // 调用process去处理item,然后返回 item, ok := f.items[id] delete (f.items, id) err := process(item) return item, err } } // 然后去查一下 PopProcessFunc 的定义,在创建controller前 share_informer.go的Run()里面 cfg := &Config{ Process: s.HandleDeltas, } func (s *sharedIndexInformer) HandleDeltas(obj interface {}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() for _, d := range obj.(Deltas) { switch d. Type { // 增、改、替换、同步 case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) // 先去indexer查询 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { // 如果数据已经存在,就执行Update逻辑 if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d. Type == Sync: isSync = true case d. Type == Replaced: if accessor, err := meta.Accessor(d.Object); err == nil { isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } // 分发Update事件 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { // 没查到数据,就执行Add操作 if err := s.indexer.Add(d.Object); err != nil { return err } // 分发 Add 事件 s.processor.distribute(addNotification{newObj: d.Object}, false ) } // 删除 case Deleted: // 去indexer删除 if err := s.indexer. Delete (d.Object); err != nil { return err } // 分发 delete 事件 s.processor.distribute(deleteNotification{oldObj: d.Object}, false ) } } return nil } |
Index
的定义为资源的本地存储,保持与etcd中的资源信息一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// 我们去看看Index是怎么创建的 func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, // indexer 的初始化 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf( "%T" , exampleObject)), clock: realClock, } return sharedIndexInformer } // 生成一个map和func组合而成的Indexer func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } // ThreadSafeStore的底层是一个并发安全的map,具体实现我们暂不考虑 func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ items: map [ string ] interface {}{}, indexers: indexers, indices: indices, } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
// 在上面的Process代码中,我们看到了将数据存储到Indexer后,调用了一个分发的函数 s.processor.distribute() // 分发process的创建 func NewSharedIndexInformer() SharedIndexInformer { sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, } return sharedIndexInformer } // sharedProcessor的结构 type sharedProcessor struct { listenersStarted bool // 读写锁 listenersLock sync.RWMutex // 普通监听列表 listeners []*processorListener // 同步监听列表 syncingListeners []*processorListener clock clock.Clock wg wait.Group } // 查看distribute函数 func (p *sharedProcessor) distribute(obj interface {}, sync bool ) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() // 将object分发到 同步监听 或者 普通监听 的列表 if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } } // 这个add的操作是利用了channel func (p *processorListener) add(notification interface {}) { p.addCh <- notification } |
在前面,我们了解了Pod调度算法的注册和Informer机制来监听kube-apiserver上的资源变化,这一次,我们就将两者串联起来,看看在kube-scheduler中,Informer监听到资源变化后,如何用调度算法将pod进行调度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
// 在setup()中找到scheduler // 在运行 kube-scheduler 的初期,我们创建了一个Scheduler的数据结构,回头再看看有什么和pod调度算法相关的 type Scheduler struct { SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm // 获取下一个需要调度的Pod NextPod func () *framework.QueuedPodInfo Error func (*framework.QueuedPodInfo, error ) StopEverything <- chan struct {} // 等待调度的Pod队列,我们重点看看这个队列是什么 SchedulingQueue internalqueue.SchedulingQueue Profiles profile. Map scheduledPodsHasSynced func () bool client clientset. Interface } // Scheduler的实例化函数 在最新的版本中少了create这一层 直接是进行里面的逻辑 func New (){ var sched *Scheduler switch { // 从 Provider 创建 case source.Provider != nil : sc, err := configurator.createFromProvider(*source.Provider) sched = sc // 从文件或者ConfigMap中创建 case source.Policy != nil : sc, err := configurator.createFromConfig(*policy) sched = sc default : return nil , fmt.Errorf( "unsupported algorithm source: %v" , source) } } // 两个创建方式,底层都是调用的 create 函数 func (c *Configurator) createFromProvider(providerName string ) (*Scheduler, error ) { return c.create() } func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error ){ return c.create() } func (c *Configurator) create() (*Scheduler, error ) { // 实例化 podQueue podQueue := internalqueue.NewSchedulingQueue( lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), ) return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, Profiles: profiles, // NextPod 函数依赖于 podQueue NextPod: internalqueue.MakeNextPodFunc(podQueue), Error : MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), StopEverything: c.StopEverything, // 调度队列被赋值为podQueue SchedulingQueue: podQueue, }, nil } // 再看看这个调度队列的初始化函数,点进去podQueue,从命名可以看到是一个优先队列,它的实现细节暂不细看 // 结合实际情况思考下,pod会有重要程度的区分,所以调度的顺序需要考虑优先级的 func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue { return NewPriorityQueue(lessFn, opts...) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
// 在上面实例化Scheduler后,有个注册事件 Handler 的函数:addAllEventHandlers(sched, informerFactory, podInformer) informer接到消息之后触发对应的Handler func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, ) { /* 函数前后有很多注册的Handler,但是和未调度pod添加到队列相关的,只有这个 */ podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ // 定义过滤函数:必须为未调度的pod FilterFunc: func (obj interface {}) bool { switch t := obj.( type ) { case *v1.Pod: return !assignedPod(t) && responsibleForPod(t, sched.Profiles) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles) } utilruntime.HandleError(fmt.Errorf( "unable to convert object %T to *v1.Pod in %T" , obj, sched)) return false default : utilruntime.HandleError(fmt.Errorf( "unable to handle object in %T: %T" , sched, obj)) return false } }, // 增改删三个操作对应的Handler,操作到对应的Queue Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToSchedulingQueue, UpdateFunc: sched.updatePodInSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue, }, }, ) } // 牢记我们第一阶段要分析的对象:create nginx pod,所以进入这个add的操作,对应加入到队列 func (sched *Scheduler) addPodToSchedulingQueue(obj interface {}) { pod := obj.(*v1.Pod) klog.V( 3 ).Infof( "add event for unscheduled pod %s/%s" , pod.Namespace, pod.Name) // 加入到队列 if err := sched.SchedulingQueue.Add(pod); err != nil { utilruntime.HandleError(fmt.Errorf( "unable to queue %T: %v" , obj, err)) } } // 在实例化Scheduler的地方 // 入队操作我们清楚了,那出队呢?我们回过头去看看上面定义的NextPod的方法实现 func MakeNextPodFunc(queue SchedulingQueue) func () *framework.QueuedPodInfo { return func () *framework.QueuedPodInfo { // 从队列中弹出 podInfo, err := queue.Pop() if err == nil { klog.V( 4 ).Infof( "About to try and schedule pod %v/%v" , podInfo.Pod.Namespace, podInfo.Pod.Name) return podInfo } klog.Errorf( "Error while retrieving next pod from scheduling queue: %v" , err) return nil } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
// 了解入队和出队操作后,我们看一下Scheduler运行的过程 func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } sched.SchedulingQueue.Run() // 调度一个pod对象 wait.UntilWithContext(ctx, sched.scheduleOne, 0 ) sched.SchedulingQueue. Close () } // 接下来scheduleOne方法代码很长,我们一步一步来看 func (sched *Scheduler) scheduleOne(ctx context.Context) { // podInfo 就是从队列中获取到的pod对象 podInfo := sched.NextPod() // 检查pod的有效性 if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod // 根据定义的 pod.Spec.SchedulerName 查到对应的profile prof, err := sched.profileForPod(pod) if err != nil { klog. Error (err) return } // 可以跳过调度的情况,一般pod进不来 if sched.skipPodSchedule(prof, pod) { return } // 调用调度算法,获取结果 scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) if err != nil { /* 出现调度失败的情况: 这个时候可能会触发抢占preempt,抢占是一套复杂的逻辑,后面我们专门会讲 目前假设各类资源充足,能正常调度 */ } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) // assumePod 是假设这个Pod按照前面的调度算法分配后,进行验证 assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // SuggestedHost 为建议的分配的Host err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { // 失败就重新分配,不考虑这种情况 } // 运行相关插件的代码先跳过 比如一些抢占插件 // 异步绑定pod go func () { // 有一系列的检查工作 // 真正做绑定的动作 err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) if err != nil { // 错误处理,清除状态并重试 } else { // 打印结果,调试时将log level调整到2以上 if klog.V( 2 ).Enabled() { klog.InfoS( "Successfully bound pod to node" , "pod" , klog.KObj(pod), "node" , scheduleResult.SuggestedHost, "evaluatedNodes" , scheduleResult.EvaluatedNodes, "feasibleNodes" , scheduleResult.FeasibleNodes) } // metrics中记录相关的监控指标 metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start)) metrics.PodSchedulingAttempts.Observe( float64 (podInfo.Attempts)) metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // 运行绑定后的插件 prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
// 调用算法下的Schedule func New (){ scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) } func (c *Configurator) create() (*Scheduler, error ) { algo := core.NewGenericScheduler( c.schedulerCache, c.nodeInfoSnapshot, extenders, c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), c.disablePreemption, c.percentageOfNodesToScore, ) return &Scheduler{ Algorithm: algo, }, nil } // genericScheduler 的 Schedule 的实现 func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error ) { // 对 pod 进行 pvc 的信息检查 if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { return result, err } // 对当前的信息做一个快照 if err := g.snapshot(); err != nil { return result, err } // Node 节点数量为0,表示无可用节点 if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } // Predict阶段:找到所有满足调度条件的节点feasibleNodes,不满足的就直接过滤 feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) // 没有可用节点直接报错 if len (feasibleNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, } } // 只有一个节点就直接选用 if len (feasibleNodes) == 1 { return ScheduleResult{ SuggestedHost: feasibleNodes[ 0 ].Name, EvaluatedNodes: 1 + len (filteredNodesStatuses), FeasibleNodes: 1 , }, nil } // Priority阶段:通过打分,找到一个分数最高、也就是最优的节点 priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes) host, err := g.selectHost(priorityList) return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len (feasibleNodes) + len (filteredNodesStatuses), FeasibleNodes: len (feasibleNodes), }, err } /* Predict 和 Priority 是选择调度节点的两个关键性步骤, 它的底层调用了各种algorithm算法。我们暂时不细看。 以我们前面讲到过的 NodeName 算法为例,节点必须与 NodeName 匹配,它是属于Predict阶段的。 在新版本中 这部分算法的实现放到了extenders,逻辑是一样的 */ |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
func (sched *Scheduler) assume(assumed *v1.Pod, host string ) error { // 将 host 填入到 pod spec字段的nodename,假定分配到对应的节点上 assumed.Spec.NodeName = host // 调用 SchedulerCache 下的 AssumePod if err := sched.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf( "scheduler cache AssumePod failed: %v" , err) return err } if sched.SchedulingQueue != nil { sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) } return nil } // 回头去找 SchedulerCache 初始化的地方 func (c *Configurator) create() (*Scheduler, error ) { return &Scheduler{ SchedulerCache: c.schedulerCache, }, nil } func New () (*Scheduler, error ) { // 这里就是初始化的实例 schedulerCache schedulerCache := internalcache. New ( 30 *time.Second, stopEverything) configurator := &Configurator{ schedulerCache: schedulerCache, } } // 看看AssumePod做了什么 func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { // 获取 pod 的 uid key, err := framework.GetPodKey(pod) if err != nil { return err } // 加锁操作,保证并发情况下的一致性 cache.mu.Lock() defer cache.mu.Unlock() // 根据 uid 找不到 pod 当前的状态 看看被调度了没有 if _, ok := cache.podStates[key]; ok { return fmt.Errorf( "pod %v is in the cache, so can't be assumed" , key) } // 把 Assume Pod 的信息放到对应 Node 节点中 cache.addPod(pod) // 把 pod 状态设置为 Assume 成功 ps := &podState{ pod: pod, } cache.podStates[key] = ps cache.assumedPods[key] = true return nil } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string , state *framework.CycleState) (err error ) { start := time.Now() // 把 assumed 的 pod 信息保存下来 defer func () { sched.finishBinding(prof, assumed, targetNode, start, err) }() // 阶段1: 运行扩展绑定进行验证,如果已经绑定报错 bound, err := sched.extendersBinding(assumed, targetNode) if bound { return err } // 阶段2:运行绑定插件验证状态 bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode) if bindStatus.IsSuccess() { return nil } if bindStatus.Code() == framework. Error { return bindStatus.AsError() } return fmt.Errorf( "bind status: %s, %v" , bindStatus.Code(). String (), bindStatus.Message()) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// 这块的代码我不做细致的逐层分析了,大家根据兴趣自行探索 func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string ) *framework.Status { klog.V( 3 ).Infof( "Attempting to bind %v/%v to %v" , p.Namespace, p.Name, nodeName) binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, Target: v1.ObjectReference{Kind: "Node" , Name: nodeName}, } // ClientSet就是访问kube-apiserver的客户端,将数据更新上去 err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return framework.NewStatus(framework. Error , err. Error ()) } return nil } |
站在前人的肩膀上,向前辈致敬,Respect!
Informer
依赖于 Reflector
模块,它有个组件为 xxxInformer,如 podInformer
Informer
包含了一个连接到kube-apiserver
的client
,通过List
和Watch
接口查询资源变更情况检测到资源发生变化后,通过Controller
将数据放入队列DeltaFIFOQueue
里,生产阶段完成
在DeltaFIFOQueue
的另一端,有消费者在不停地处理资源变化的事件,处理逻辑主要分2步
distribute
将object分发到同步监听或者普通监听的列表,然后被对应的handler处理
SchedulingQueue
异步工作的单个pod的调度主要分为3个步骤:
Assume
这个Pod被调度到对应的Node,保存到cache,加锁保证一致性。Bind
绑定成功后,将数据通过client向kube-apiserver发送,更新etcd
以上就是Kubernetes Informer数据存储Index与Pod分配流程解析的详细内容
2024-07-07
myeclipse怎么导入tomcat教程2024-07-07
myeclipse如何启动tomcat2024-07-07
myeclipse如何绑定tomcat上线了一个小的预约程序,配置通过Nginx进行访问入口,默认的日志是没有请求时间的,因此需要配置一下,将每一次的请求的访问响应时间记录出来,备查与优化使用....
2023-03-17