时间:2022-12-06来源:www.pcxitongcheng.com作者:电脑系统城
从主函数找到run函数,代码较长,这里精简了一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
func Run(c *config.CompletedConfig, stopCh <- chan struct {}) error { // configz 模块,在kube-scheduler分析中已经了解 if cfgz, err := configz. New (ConfigzName); err == nil { cfgz.Set(c.ComponentConfig) } else { klog.Errorf( "unable to register configz: %v" , err) } // 健康监测与http服务,跳过 var checks []healthz.HealthChecker var unsecuredMux *mux.PathRecorderMux run := func (ctx context.Context) { rootClientBuilder := controller.SimpleControllerClientBuilder{ ClientConfig: c.Kubeconfig, } // client认证相关 var clientBuilder controller.ControllerClientBuilder // 创建controller的上下文context controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done()) if err != nil { klog.Fatalf( "error building controller context: %v" , err) } saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil { klog.Fatalf( "error starting controllers: %v" , err) } // 这里的 InformerFactory 和我们在kube-scheduler中看的 SharedInformerFactory 基本一致 controllerContext.InformerFactory.Start(controllerContext.Stop) controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop) close (controllerContext.InformersStarted) select {} } // 是否进行选举 if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { run(context.TODO()) panic ( "unreachable" ) } // 拼接出一个全局唯一的id id, err := os.Hostname() if err != nil { return err } id = id + "_" + string (uuid.NewUUID()) rl, err := resourcelock. New (c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceNamespace, c.ComponentConfig.Generic.LeaderElection.ResourceName, c.LeaderElectionClient.CoreV1(), c.LeaderElectionClient.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: c.EventRecorder, }) if err != nil { klog.Fatalf( "error creating lock: %v" , err) } // 正常情况下都是阻塞在RunOrDie这个函数中,不停地进行选举相关的工作 leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ // 开始成为Leader的时候,调用run函数 OnStartedLeading: run, OnStoppedLeading: func () { klog.Fatalf( "leaderelection lost" ) }, }, WatchDog: electionChecker, Name: "kube-controller-manager" , }) panic ( "unreachable" ) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map [ string ]InitFunc, unsecuredMux *mux.PathRecorderMux) error { // 关键性的循环,启动每个controllers,key为控制器名字,value为初始化函数 for controllerName, initFn := range controllers { // 是否允许启动 if !ctx.IsControllerEnabled(controllerName) { klog.Warningf( "%q is disabled" , controllerName) continue } time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) klog.V( 1 ).Infof( "Starting %q" , controllerName) // 调用init函数进行启动 debugHandler, started, err := initFn(ctx) if err != nil { klog.Errorf( "Error starting %q" , controllerName) return err } if !started { klog.Warningf( "Skipping %q" , controllerName) continue } // 注册对应controller到debug的url中 if debugHandler != nil && unsecuredMux != nil { basePath := "/debug/controllers/" + controllerName unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler)) unsecuredMux.UnlistedHandlePrefix(basePath+ "/" , http.StripPrefix(basePath, debugHandler)) } klog.Infof( "Started %q" , controllerName) } return nil } // 我们再去传入controller的函数去看看,对应的controller有哪些,这里有我们很多常见的概念,不一一细讲 func NewControllerInitializers(loopMode ControllerLoopMode) map [ string ]InitFunc { controllers := map [ string ]InitFunc{} controllers[ "endpoint" ] = startEndpointController controllers[ "endpointslice" ] = startEndpointSliceController controllers[ "endpointslicemirroring" ] = startEndpointSliceMirroringController controllers[ "replicationcontroller" ] = startReplicationController controllers[ "podgc" ] = startPodGCController controllers[ "resourcequota" ] = startResourceQuotaController controllers[ "namespace" ] = startNamespaceController controllers[ "serviceaccount" ] = startServiceAccountController controllers[ "garbagecollector" ] = startGarbageCollectorController controllers[ "daemonset" ] = startDaemonSetController controllers[ "job" ] = startJobController controllers[ "deployment" ] = startDeploymentController controllers[ "replicaset" ] = startReplicaSetController controllers[ "horizontalpodautoscaling" ] = startHPAController controllers[ "disruption" ] = startDisruptionController controllers[ "statefulset" ] = startStatefulSetController controllers[ "cronjob" ] = startCronJobController controllers[ "csrsigning" ] = startCSRSigningController controllers[ "csrapproving" ] = startCSRApprovingController controllers[ "csrcleaner" ] = startCSRCleanerController controllers[ "ttl" ] = startTTLController controllers[ "bootstrapsigner" ] = startBootstrapSignerController controllers[ "tokencleaner" ] = startTokenCleanerController controllers[ "nodeipam" ] = startNodeIpamController controllers[ "nodelifecycle" ] = startNodeLifecycleController if loopMode == IncludeCloudLoops { controllers[ "service" ] = startServiceController controllers[ "route" ] = startRouteController controllers[ "cloud-node-lifecycle" ] = startCloudNodeLifecycleController } controllers[ "persistentvolume-binder" ] = startPersistentVolumeBinderController controllers[ "attachdetach" ] = startAttachDetachController controllers[ "persistentvolume-expander" ] = startVolumeExpandController controllers[ "clusterrole-aggregation" ] = startClusterRoleAggregrationController controllers[ "pvc-protection" ] = startPVCProtectionController controllers[ "pv-protection" ] = startPVProtectionController controllers[ "ttl-after-finished" ] = startTTLAfterFinishedController controllers[ "root-ca-cert-publisher" ] = startRootCACertPublisher controllers[ "ephemeral-volume" ] = startEphemeralVolumeController return controllers } |
由于我们的示例是创建一个nginx的pod,涉及到kube-controller-manager的内容很少。
但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明:
A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。 因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。
简单来说,ReplicaSet 就是用来生成指定个数的Pod
代码在pkg/controller/replica_set.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool , error ) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps" , Version: "v1" , Resource: "replicasets" }] { return nil , false , nil } // 用goroutine异步运行,包含了 ReplicaSet和Pod 的两个Informer // 这一点很好理解:我们是要控制ReplicaSet声明的数量和运行的Pod数量一致,需要同时观察者两种资源 go replicaset.NewReplicaSetController( ctx.InformerFactory.Apps().V1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie( "replicaset-controller" ), replicaset.BurstReplicas, ).Run( int (ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop) return nil , true , nil } // 运行函数 func (rsc *ReplicaSetController) Run(workers int , stopCh <- chan struct {}) { defer utilruntime.HandleCrash() defer rsc.queue.ShutDown() controllerName := strings.ToLower(rsc.Kind) klog.Infof( "Starting %v controller" , controllerName) defer klog.Infof( "Shutting down %v controller" , controllerName) if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) { return } for i := 0 ; i < workers; i++ { // 工作的函数 go wait.Until(rsc.worker, time.Second, stopCh) } <-stopCh } func (rsc *ReplicaSetController) worker() { // 继续查找实现 for rsc.processNextWorkItem() { } } func (rsc *ReplicaSetController) processNextWorkItem() bool { // 这里也有个queue的概念,可以类比kube-scheduler中的实现 // 不同的是,这里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具体实现今天不细看 // 获取元素 key, quit := rsc.queue.Get() if quit { return false } defer rsc.queue.Done(key) // 处理对应的元素 err := rsc.syncHandler(key.( string )) if err == nil { rsc.queue.Forget(key) return true } utilruntime.HandleError(fmt.Errorf( "sync %q failed with %v" , key, err)) rsc.queue.AddRateLimited(key) return true } // 再回过头,去查看syncHandler的具体实现 func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset. Interface , burstReplicas int , gvk schema.GroupVersionKind, metricOwnerName, queueName string , podControl controller.PodControlInterface) *ReplicaSetController { rsc.syncHandler = rsc.syncReplicaSet return rsc } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
func (rsc *ReplicaSetController) syncReplicaSet(key string ) error { startTime := time.Now() defer func () { klog.V( 4 ).Infof( "Finished syncing %v %q (%v)" , rsc.Kind, key, time.Since(startTime)) }() // 从key中拆分出 namespace 和 name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } // 根据name,从 Lister 获取对应的 ReplicaSets 信息 rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) if errors.IsNotFound(err) { klog.V( 4 ).Infof( "%v %v has been deleted" , rsc.Kind, key) rsc.expectations.DeleteExpectations(key) return nil } if err != nil { return err } rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) // 获取 selector (k8s 是根据selector中的label来匹配 ReplicaSets 和 Pod 的) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { utilruntime.HandleError(fmt.Errorf( "error converting pod selector to selector: %v" , err)) return nil } // 根据namespace和labels获取所有的pod allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) if err != nil { return err } // 过滤无效的pod filteredPods := controller.FilterActivePods(allPods) // 根据selector再过滤pod filteredPods, err = rsc.claimPods(rs, selector, filteredPods) if err != nil { return err } var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { // 管理 ReplicaSet,下面详细分析 manageReplicasErr = rsc.manageReplicas(filteredPods, rs) } rs = rs.DeepCopy() newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) // 更新状态 updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) if err != nil { return err } if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) { rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) } return manageReplicasErr } // 我们再一起看看,当Pod数量和ReplicaSet中声明的不同时,是怎么工作的 func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { // diff = 当前pod数 - 期望pod数 diff := len (filteredPods) - int (*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { utilruntime.HandleError(fmt.Errorf( "couldn't get key for %v %#v: %v" , rsc.Kind, rs, err)) return nil } // diff小于0,表示需要扩容,即新增Pod if diff < 0 { // 具体的实现暂时不细看 // diff 大于0,即需要缩容 } else if diff > 0 { } return nil } |
站在前人的肩膀上,向前辈致敬,Respect!
2024-07-07
myeclipse怎么导入tomcat教程2024-07-07
myeclipse如何启动tomcat2024-07-07
myeclipse如何绑定tomcat上线了一个小的预约程序,配置通过Nginx进行访问入口,默认的日志是没有请求时间的,因此需要配置一下,将每一次的请求的访问响应时间记录出来,备查与优化使用....
2023-03-17