概述

前文我们介绍了kubelet参数初始化,接下来我们分析下kubelet服务实际启动逻辑。

整体流程大致如下:

  1. 设置全局门控特性
  2. kubelet参数合法性检测
  3. 注册当前配置至/configz端点
  4. 检查kubelet启动模式是否为standalone模式
  5. 检测kubeDeps是否为空,为空则初始化
  6. 获取主机名称,用于初始化事件记录器
  7. standalone模式下关闭所有客户端连接
  8. 初始化身份认证接口
  9. 初始化cgroups
  10. 初始化cAdvisor
  11. 初始化事件记录器,用于向kubelet端事件
  12. 初始化容器管理器
  13. 检测是否以root用户运行kubelet
  14. kubelet进程设置OOM分数
  15. 容器运行时初始化
  16. 启动kubelet
  17. 如果开启动态配置,则监听动态配置中的配置变化
  18. 开启/healthz端点
  19. 通知init进程kubelet服务启动完毕

针对上述步骤我们接下来逐一分析,针对部分内容(容器运行时启动流程、kubelet启动流程)不做过多讨论,后续篇幅再做分析。

部分通用内容(如cAdvisor/configz端点、/healthz端点,OOM分数等),后续篇幅讨论。

函数调用

if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
    klog.Fatal(err)
}
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())
    if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
        return fmt.Errorf("failed OS init: %v", err)
    }
    if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)
    }
    return nil
}

最终启动逻辑为run函数

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
    // Set global feature gates based on the value on the initial KubeletServer
    err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
    if err != nil {
        return err
    }
    // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
    if err := options.ValidateKubeletServer(s); err != nil {
        return err
    }

    // Obtain Kubelet Lock File
    if s.ExitOnLockContention && s.LockFilePath == "" {
        return errors.New("cannot exit on lock file contention: no lock file specified")
    }
    done := make(chan struct{})
    if s.LockFilePath != "" {
        klog.Infof("acquiring file lock on %q", s.LockFilePath)
        if err := flock.Acquire(s.LockFilePath); err != nil {
            return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
        }
        if s.ExitOnLockContention {
            klog.Infof("watching for inotify events for: %v", s.LockFilePath)
            if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
                return err
            }
        }
    }

    // Register current configuration with /configz endpoint
    err = initConfigz(&s.KubeletConfiguration)
    if err != nil {
        klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
    }

    if len(s.ShowHiddenMetricsForVersion) > 0 {
        metrics.SetShowHidden()
    }

    // About to get clients and such, detect standaloneMode
    standaloneMode := true
    if len(s.KubeConfig) > 0 {
        standaloneMode = false
    }

    if kubeDeps == nil {
        kubeDeps, err = UnsecuredDependencies(s, featureGate)
        if err != nil {
            return err
        }
    }

    if kubeDeps.Cloud == nil {
        if !cloudprovider.IsExternal(s.CloudProvider) {
            cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
            if err != nil {
                return err
            }
            if cloud == nil {
                klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
            } else {
                klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
            }
            kubeDeps.Cloud = cloud
        }
    }

    hostName, err := nodeutil.GetHostname(s.HostnameOverride)
    if err != nil {
        return err
    }
    nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
    if err != nil {
        return err
    }

    // if in standalone mode, indicate as much by setting all clients to nil
    switch {
    case standaloneMode:
        kubeDeps.KubeClient = nil
        kubeDeps.EventClient = nil
        kubeDeps.HeartbeatClient = nil
        klog.Warningf("standalone mode, no API client")

    case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
        clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
        if err != nil {
            return err
        }
        if closeAllConns == nil {
            return errors.New("closeAllConns must be a valid function other than nil")
        }
        kubeDeps.OnHeartbeatFailure = closeAllConns

        kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet client: %v", err)
        }

        // make a separate client for events
        eventClientConfig := *clientConfig
        eventClientConfig.QPS = float32(s.EventRecordQPS)
        eventClientConfig.Burst = int(s.EventBurst)
        kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet event client: %v", err)
        }

        // make a separate client for heartbeat with throttling disabled and a timeout attached
        heartbeatClientConfig := *clientConfig
        heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
        // The timeout is the minimum of the lease duration and status update frequency
        leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
        if heartbeatClientConfig.Timeout > leaseTimeout {
            heartbeatClientConfig.Timeout = leaseTimeout
        }

        heartbeatClientConfig.QPS = float32(-1)
        kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
        }
    }

    if kubeDeps.Auth == nil {
        auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
        if err != nil {
            return err
        }
        kubeDeps.Auth = auth
        runAuthenticatorCAReload(stopCh)
    }

    var cgroupRoots []string

    cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
    kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
    if err != nil {
        klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)
    } else if kubeletCgroup != "" {
        cgroupRoots = append(cgroupRoots, kubeletCgroup)
    }

    runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
    if err != nil {
        klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
    } else if runtimeCgroup != "" {
        // RuntimeCgroups is optional, so ignore if it isn't specified
        cgroupRoots = append(cgroupRoots, runtimeCgroup)
    }

    if s.SystemCgroups != "" {
        // SystemCgroups is optional, so ignore if it isn't specified
        cgroupRoots = append(cgroupRoots, s.SystemCgroups)
    }

    if kubeDeps.CAdvisorInterface == nil {
        imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
        kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
        if err != nil {
            return err
        }
    }

    // Setup event recorder if required.
    makeEventRecorder(kubeDeps, nodeName)

    if kubeDeps.ContainerManager == nil {
        if s.CgroupsPerQOS && s.CgroupRoot == "" {
            klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
            s.CgroupRoot = "/"
        }

        var reservedSystemCPUs cpuset.CPUSet
        var errParse error
        if s.ReservedSystemCPUs != "" {
            reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
            if errParse != nil {
                // invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
                klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
                return errParse
            }
            // is it safe do use CAdvisor here ??
            machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
            if err != nil {
                // if can't use CAdvisor here, fall back to non-explicit cpu list behavor
                klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
                reservedSystemCPUs = cpuset.NewCPUSet()
            } else {
                reservedList := reservedSystemCPUs.ToSlice()
                first := reservedList[0]
                last := reservedList[len(reservedList)-1]
                if first < 0 || last >= machineInfo.NumCores {
                    // the specified cpuset is outside of the range of what the machine has
                    klog.Infof("Invalid cpuset specified by --reserved-cpus")
                    return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
                }
            }
        } else {
            reservedSystemCPUs = cpuset.NewCPUSet()
        }

        if reservedSystemCPUs.Size() > 0 {
            // at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
            klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
            if s.KubeReserved != nil {
                delete(s.KubeReserved, "cpu")
            }
            if s.SystemReserved == nil {
                s.SystemReserved = make(map[string]string)
            }
            s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
            klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
        }
        kubeReserved, err := parseResourceList(s.KubeReserved)
        if err != nil {
            return err
        }
        systemReserved, err := parseResourceList(s.SystemReserved)
        if err != nil {
            return err
        }
        var hardEvictionThresholds []evictionapi.Threshold
        // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
        if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
            hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
            if err != nil {
                return err
            }
        }
        experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
        if err != nil {
            return err
        }

        devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

        kubeDeps.ContainerManager, err = cm.NewContainerManager(
            kubeDeps.Mounter,
            kubeDeps.CAdvisorInterface,
            cm.NodeConfig{
                RuntimeCgroupsName:    s.RuntimeCgroups,
                SystemCgroupsName:     s.SystemCgroups,
                KubeletCgroupsName:    s.KubeletCgroups,
                ContainerRuntime:      s.ContainerRuntime,
                CgroupsPerQOS:         s.CgroupsPerQOS,
                CgroupRoot:            s.CgroupRoot,
                CgroupDriver:          s.CgroupDriver,
                KubeletRootDir:        s.RootDirectory,
                ProtectKernelDefaults: s.ProtectKernelDefaults,
                NodeAllocatableConfig: cm.NodeAllocatableConfig{
                    KubeReservedCgroupName:   s.KubeReservedCgroup,
                    SystemReservedCgroupName: s.SystemReservedCgroup,
                    EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
                    KubeReserved:             kubeReserved,
                    SystemReserved:           systemReserved,
                    ReservedSystemCPUs:       reservedSystemCPUs,
                    HardEvictionThresholds:   hardEvictionThresholds,
                },
                QOSReserved:                           *experimentalQOSReserved,
                ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
                ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
                ExperimentalPodPidsLimit:              s.PodPidsLimit,
                EnforceCPULimits:                      s.CPUCFSQuota,
                CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
                ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,
            },
            s.FailSwapOn,
            devicePluginEnabled,
            kubeDeps.Recorder)

        if err != nil {
            return err
        }
    }

    if err := checkPermissions(); err != nil {
        klog.Error(err)
    }

    utilruntime.ReallyCrash = s.ReallyCrashForTesting

    // TODO(vmarmol): Do this through container config.
    oomAdjuster := kubeDeps.OOMAdjuster
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
        klog.Warning(err)
    }

    err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
        kubeDeps, &s.ContainerRuntimeOptions,
        s.ContainerRuntime,
        s.RuntimeCgroups,
        s.RemoteRuntimeEndpoint,
        s.RemoteImageEndpoint,
        s.NonMasqueradeCIDR)
    if err != nil {
        return err
    }

    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
        return err
    }

    // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
    if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
        kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
        if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
            return err
        }
    }

    if s.HealthzPort > 0 {
        mux := http.NewServeMux()
        healthz.InstallHandler(mux)
        go wait.Until(func() {
            err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
            if err != nil {
                klog.Errorf("Starting healthz server failed: %v", err)
            }
        }, 5*time.Second, wait.NeverStop)
    }

    if s.RunOnce {
        return nil
    }

    // If systemd is used, notify it that we have started
    go daemon.SdNotify(false, "READY=1")

    select {
    case <-done:
        break
    case <-stopCh:
        break
    }

    return nil
}

代码量较大,我们按步骤分析

1.设置全局门控特性

err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
    if err != nil {
    return err
}

2.kubelet参数合法性检测

if err := options.ValidateKubeletServer(s); err != nil {
    return err
}

检测内容:配置标识及门控特性

3.注册当前配置至/configz端点

err = initConfigz(&s.KubeletConfiguration)
if err != nil {
    klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}

4.检查kubelet启动模式是否为standalone模式

此模式下不会和api-server交互,主要用于kubelet的调试

standaloneMode := true
if len(s.KubeConfig) > 0 {
    standaloneMode = false
}

5.检测kubeDeps是否为空,为空则初始化

前文我们讲到,执行Run函数前已经初始化kubeDepskubeDeps是一个与运行时各种资源(网络、卷、容器运行时等)交互的接口集合对象。

if kubeDeps == nil {
    kubeDeps, err = UnsecuredDependencies(s, featureGate)
    if err != nil {
        return err
    }
}

if kubeDeps.Cloud == nil {
    if !cloudprovider.IsExternal(s.CloudProvider) {
        cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
        if err != nil {
            return err
        }
        if cloud == nil {
            klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
        } else {
            klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
        }
        kubeDeps.Cloud = cloud
    }
}

6.获取主机名称

用于后续初始化事件记录器

  • 如果指定--cloud-provider,获取云主机节点名称。
  • 如果未指定--cloud-provider,并且指定了--hostname-override,返回--hostname-override值作为主机名
  • 如果未指定--cloud-provider--hostname-override,返回节点hostname
    hostName, err := nodeutil.GetHostname(s.HostnameOverride)
    if err != nil {
      return err
    }
    nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
    if err != nil {
      return err
    }
    

7.standalone模式下关闭所有客户端连接

switch {
case standaloneMode:
    kubeDeps.KubeClient = nil
    kubeDeps.EventClient = nil
    kubeDeps.HeartbeatClient = nil
    klog.Warningf("standalone mode, no API client")

case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
    clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
    if err != nil {
        return err
    }
    if closeAllConns == nil {
        return errors.New("closeAllConns must be a valid function other than nil")
    }
    kubeDeps.OnHeartbeatFailure = closeAllConns

    kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
    if err != nil {
        return fmt.Errorf("failed to initialize kubelet client: %v", err)
    }

    // make a separate client for events
    eventClientConfig := *clientConfig
    eventClientConfig.QPS = float32(s.EventRecordQPS)
    eventClientConfig.Burst = int(s.EventBurst)
    kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
    if err != nil {
        return fmt.Errorf("failed to initialize kubelet event client: %v", err)
    }

    // make a separate client for heartbeat with throttling disabled and a timeout attached
    heartbeatClientConfig := *clientConfig
    heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
    // The timeout is the minimum of the lease duration and status update frequency
    leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
    if heartbeatClientConfig.Timeout > leaseTimeout {
        heartbeatClientConfig.Timeout = leaseTimeout
    }

    heartbeatClientConfig.QPS = float32(-1)
    kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
    if err != nil {
        return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
    }
}

8.初始化身份认证接口

BuildAuth创建一个身份验证器、一个授权器,以及一个与kubelet需要兼容的匹配的授权器属性getter.

它返回一个AuthInterface认证接口,一个运行方法来启动内部控制器(如重新加载证书)和错误。

if kubeDeps.Auth == nil {
    auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
    if err != nil {
        return err
    }
    kubeDeps.Auth = auth
    runAuthenticatorCAReload(stopCh)
}

9.初始化cgroups

包含如下:

  • kubelet cgroups
  • 容器运行时cgroups
  • 系统cgroups
var cgroupRoots []string

cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
    klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)
} else if kubeletCgroup != "" {
    cgroupRoots = append(cgroupRoots, kubeletCgroup)
}

runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
    klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
} else if runtimeCgroup != "" {
    // RuntimeCgroups is optional, so ignore if it isn't specified
    cgroupRoots = append(cgroupRoots, runtimeCgroup)
}

if s.SystemCgroups != "" {
    // SystemCgroups is optional, so ignore if it isn't specified
    cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}

10.初始化cAdvisor

docker容器运行时内置cAdvisor获取容器指标数据

if kubeDeps.CAdvisorInterface == nil {
    imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
    kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
    if err != nil {
        return err
    }
}

11.初始化事件记录器,用于向kubelet端事件

// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)

事件格式如下:

$ kubectl describe pod -n ddd portal-f6f4b4486-grhb7
...
Events:
  Type     Reason     Age                       From            Message
  ----     ------     ----                      ----            -------
  Warning  Unhealthy  7m49s (x22310 over 7d3h)  kubelet, node1  Liveness probe failed: Get http://10.233.90.203:7002/actuator/health: dial tcp 10.233.90.203:7002: connect: connection refused
  Warning  BackOff    2m49s (x27215 over 7d3h)  kubelet, node1  Back-off restarting failed container

12.初始化容器管理器

容器管理器主要用来管理容器:

1.如果开启--cgroups-per-qos,并且--cgroup-root未指定,cgroups的根为/。 即启用基于QoSCgroup层次结构,所有的BurstableBestEffort类型pod都在它们特定的顶级QoS cgroup之下。

如:

$ ls /sys/fs/cgroup/cpu/kubepods.slice
cgroup.clone_children  cpuacct.usage_percpu_sys   cpu.rt_period_us           kubepods-pod347e1023_78aa_4aa6_a1bb_c11e60e995e1.slice
cgroup.procs           cpuacct.usage_percpu_user  cpu.rt_runtime_us          kubepods-podafe0da25_4a42_4a71_82c8_afcd7faf3b52.slice
cpuacct.stat           cpuacct.usage_sys          cpu.shares                 kubepods-pode61df7e6_b184_4c86_bd1e_734c818a4a1f.slice
cpuacct.usage          cpuacct.usage_user         cpu.stat                   notify_on_release
cpuacct.usage_all      cpu.cfs_period_us          kubepods-besteffort.slice  tasks
cpuacct.usage_percpu   cpu.cfs_quota_us           kubepods-burstable.slice

2.--reserved-cpus如果非空,初始化系统CPU预留资源。 当--reserved-cpus被设置时,--system-reserved--kube-reserved将无效化。初始化逻辑如下:

a. 检测--reserved-cpus值合法性,如果非法则置空,避免--system-reserved--kube-reserved无效化

b. 检测是否可以从CAdvisor中获取主机信息,如果获取不了则置空--reserved-cpus

c. 检测--reserved-cpus值是否在宿主机CPU核数有效区间,非法则返回异常(如宿主机8核,指令预留12核,大于宿主机CPU实际核数)

d. 解析赋值容器管理器其他字段:

kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
    return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
    return err
}
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
    hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
    if err != nil {
        return err
    }
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
    return err
}

devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

kubeDeps.ContainerManager, err = cm.NewContainerManager(
    kubeDeps.Mounter,
    kubeDeps.CAdvisorInterface,
    cm.NodeConfig{
        RuntimeCgroupsName:    s.RuntimeCgroups,
        SystemCgroupsName:     s.SystemCgroups,
        KubeletCgroupsName:    s.KubeletCgroups,
        ContainerRuntime:      s.ContainerRuntime,
        CgroupsPerQOS:         s.CgroupsPerQOS,
        CgroupRoot:            s.CgroupRoot,
        CgroupDriver:          s.CgroupDriver,
        KubeletRootDir:        s.RootDirectory,
        ProtectKernelDefaults: s.ProtectKernelDefaults,
        NodeAllocatableConfig: cm.NodeAllocatableConfig{
            KubeReservedCgroupName:   s.KubeReservedCgroup,
            SystemReservedCgroupName: s.SystemReservedCgroup,
            EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
            KubeReserved:             kubeReserved,
            SystemReserved:           systemReserved,
            ReservedSystemCPUs:       reservedSystemCPUs,
            HardEvictionThresholds:   hardEvictionThresholds,
        },
        QOSReserved:                           *experimentalQOSReserved,
        ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
        ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
        ExperimentalPodPidsLimit:              s.PodPidsLimit,
        EnforceCPULimits:                      s.CPUCFSQuota,
        CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
        ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,
    },
    s.FailSwapOn,
    devicePluginEnabled,
    kubeDeps.Recorder)

容器控制器初始化部分源码:

if kubeDeps.ContainerManager == nil {
    if s.CgroupsPerQOS && s.CgroupRoot == "" {
        klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
        s.CgroupRoot = "/"
    }

    var reservedSystemCPUs cpuset.CPUSet
    var errParse error
    if s.ReservedSystemCPUs != "" {
        reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
        if errParse != nil {
            // invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
            klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
            return errParse
        }
        // is it safe do use CAdvisor here ??
        machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
        if err != nil {
            // if can't use CAdvisor here, fall back to non-explicit cpu list behavor
            klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
            reservedSystemCPUs = cpuset.NewCPUSet()
        } else {
            reservedList := reservedSystemCPUs.ToSlice()
            first := reservedList[0]
            last := reservedList[len(reservedList)-1]
            if first < 0 || last >= machineInfo.NumCores {
                // the specified cpuset is outside of the range of what the machine has
                klog.Infof("Invalid cpuset specified by --reserved-cpus")
                return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
            }
        }
    } else {
        reservedSystemCPUs = cpuset.NewCPUSet()
    }

    if reservedSystemCPUs.Size() > 0 {
        // at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
        klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
        if s.KubeReserved != nil {
            delete(s.KubeReserved, "cpu")
        }
        if s.SystemReserved == nil {
            s.SystemReserved = make(map[string]string)
        }
        s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
        klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
    }
    kubeReserved, err := parseResourceList(s.KubeReserved)
    if err != nil {
        return err
    }
    systemReserved, err := parseResourceList(s.SystemReserved)
    if err != nil {
        return err
    }
    var hardEvictionThresholds []evictionapi.Threshold
    // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
    if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
        hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
        if err != nil {
            return err
        }
    }
    experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
    if err != nil {
        return err
    }

    devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

    kubeDeps.ContainerManager, err = cm.NewContainerManager(
        kubeDeps.Mounter,
        kubeDeps.CAdvisorInterface,
        cm.NodeConfig{
            RuntimeCgroupsName:    s.RuntimeCgroups,
            SystemCgroupsName:     s.SystemCgroups,
            KubeletCgroupsName:    s.KubeletCgroups,
            ContainerRuntime:      s.ContainerRuntime,
            CgroupsPerQOS:         s.CgroupsPerQOS,
            CgroupRoot:            s.CgroupRoot,
            CgroupDriver:          s.CgroupDriver,
            KubeletRootDir:        s.RootDirectory,
            ProtectKernelDefaults: s.ProtectKernelDefaults,
            NodeAllocatableConfig: cm.NodeAllocatableConfig{
                KubeReservedCgroupName:   s.KubeReservedCgroup,
                SystemReservedCgroupName: s.SystemReservedCgroup,
                EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
                KubeReserved:             kubeReserved,
                SystemReserved:           systemReserved,
                ReservedSystemCPUs:       reservedSystemCPUs,
                HardEvictionThresholds:   hardEvictionThresholds,
            },
            QOSReserved:                           *experimentalQOSReserved,
            ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
            ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
            ExperimentalPodPidsLimit:              s.PodPidsLimit,
            EnforceCPULimits:                      s.CPUCFSQuota,
            CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
            ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,
        },
        s.FailSwapOn,
        devicePluginEnabled,
        kubeDeps.Recorder)

    if err != nil {
        return err
    }
}

13.检测是否以root用户运行kubelet

如果非root用户则返回异常。

if err := checkPermissions(); err != nil {
    klog.Error(err)
}
...
func checkPermissions() error {
    if uid := os.Getuid(); uid != 0 {
        return fmt.Errorf("kubelet needs to run as uid `0`. It is being run as %d", uid)
    }
    // TODO: Check if kubelet is running in the `initial` user namespace.
    // http://man7.org/linux/man-pages/man7/user_namespaces.7.html
    return nil
}

14.为kubelet进程设置OOM分数

即设置为--oom-score-adj的值,可选区间为[-1000, 1000],默认值为-999,并且该值越小越不容易被kill掉。

oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
    klog.Warning(err)
}

15.容器运行时初始化

  • 当容器运行时为docker时,初始化以下内容:
    • 网络插件名称(一般为cni
    • CIDR
    • CNI插件配置、缓存、二进制文件目录
    • MTU
    • 网桥模式
    • 创建启动CRI shim进程,作为连接kubelet与容器运行时间的桥梁
    • 设置是否使用cAdvisor采集容器指标数据

16.启动kubelet

if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
    return err
}

启动流程主要如下:

a. 初始化事件记录器

hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
    return err
}
// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
    return err
}
// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)

b. kubelet进程开启所有Linux CAP

capabilities.Initialize(capabilities.Capabilities{
    AllowPrivileged: true,
})

c. 初始化kubelet操作操作系统接口方法

if kubeDeps.OSInterface == nil {
    kubeDeps.OSInterface = kubecontainer.RealOS{}
}

接口如下:

type OSInterface interface {
    MkdirAll(path string, perm os.FileMode) error
    Symlink(oldname string, newname string) error
    Stat(path string) (os.FileInfo, error)
    Remove(path string) error
    RemoveAll(path string) error
    Create(path string) (*os.File, error)
    Chmod(path string, perm os.FileMode) error
    Hostname() (name string, err error)
    Chtimes(path string, atime time.Time, mtime time.Time) error
    Pipe() (r *os.File, w *os.File, err error)
    ReadDir(dirname string) ([]os.FileInfo, error)
    Glob(pattern string) ([]string, error)
    Open(name string) (*os.File, error)
    OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
    Rename(oldpath, newpath string) error
}

d. 创建初始化kubelet服务

初始化逻辑后续我们深入探讨

k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
      kubeDeps,
      &kubeServer.ContainerRuntimeOptions,
      kubeServer.ContainerRuntime,
      kubeServer.HostnameOverride,
      kubeServer.NodeIP,
      kubeServer.ProviderID,
      kubeServer.CloudProvider,
      kubeServer.CertDirectory,
      kubeServer.RootDirectory,
      kubeServer.RegisterNode,
      kubeServer.RegisterWithTaints,
      kubeServer.AllowedUnsafeSysctls,
      kubeServer.ExperimentalMounterPath,
      kubeServer.ExperimentalKernelMemcgNotification,
      kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
      kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
      kubeServer.MinimumGCAge,
      kubeServer.MaxPerPodContainerCount,
      kubeServer.MaxContainerCount,
      kubeServer.MasterServiceNamespace,
      kubeServer.RegisterSchedulable,
      kubeServer.KeepTerminatedPodVolumes,
      kubeServer.NodeLabels,
      kubeServer.SeccompProfileRoot,
      kubeServer.BootstrapCheckpointPath,
      kubeServer.NodeStatusMaxImages)
if err != nil {
    return fmt.Errorf("failed to create kubelet: %v", err)
}

// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
    return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig

e. 设置kubelet进程最大文件打开数

rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))

f. 启动kubelet服务

// process pods and exit.
if runOnce {
    if _, err := k.RunOnce(podCfg.Updates()); err != nil {
        return fmt.Errorf("runonce failed: %v", err)
    }
    klog.Info("Started kubelet as runonce")
} else {
    startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
    klog.Info("Started kubelet")
}

17.如果开启动态配置,则监听动态配置中的配置变化

// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
    kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
    if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
        return err
    }
}

18.开启/healthz端点

if s.HealthzPort > 0 {
    mux := http.NewServeMux()
    healthz.InstallHandler(mux)
    go wait.Until(func() {
        err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
        if err != nil {
            klog.Errorf("Starting healthz server failed: %v", err)
        }
    }, 5*time.Second, wait.NeverStop)
}

19.通知init进程kubelet服务启动完毕

if s.RunOnce {
    return nil
}

// If systemd is used, notify it that we have started
go daemon.SdNotify(false, "READY=1")
Copyright © weiliang 2021 all right reserved,powered by Gitbook本书发布时间: 2024-04-22 16:03:41

results matching ""

    No results matching ""