概述
我们在学习kubernetes除了要了解其相关概念外,我们还需要深入了解整个kubernetes的实现机制,如果还能了解其源码,那我们才算是对kubernetes比较熟悉。我将用kubernetes是如何生成一个deployment的资源,并且附带源码解读的方式讲解kubernetes的整个实现机制。源码版本是1.22
之前的文章
- kubernetes中apply命令执行的全过程源码解析-概述
- kubectl执行apply命令的源码解析
- kube-apiserver接受创建deploy资源请求的源码详解
- kube-controller-manager处理创建deployment资源之informer监听处理
- kube-controller-manager在创建deployment时如何处理数据
- kube-scheduler资源调度原理深入解析
前面的文章提到过,一个depoyment资源的创建
- 由kubectl发送请求到apiserver,apiserver负责将请求的资源信息存入到etcd中。
- 而实际对这个资源进行控制的是controller-manager。deployment-controller-manager和replicaset-controller-manager配合最终生成pod资源。
- pod资源存储在etcd后,kube-scheduler获取到资源变更事件后,对资源进行调度计算,确定调度节点后,将信息再次写入etcd中。
- 最终pod如何在对应节点启动的呢?而这启动的过程又经历了哪些事情呢?这篇文章将会剖析下k8s是如何在调度节点上生成最终期望的pod。
原理分析
kubectl apply -f deploy.yaml这个命令执行后,从之前的文章分析来看,pod还一直都未创建。而kubelet就是用来干这个活的。当然kubelet还不仅仅只是干这个活,它还有负责整个节点上资源的状态监听,static pod的维护等。本文主要还是分析kubelet是如何来创建这个pod的。要弄清这个问题,我们可以思考如下几个问题:
- 这个pod具体有哪些信息?
- kubelet如何来根据这个pod的信息来进行创建?
- 在代码层面上它是如何实现这些配置创建的呢?
pod信息
pod和container的关系
在说明pod信息之前,我们先说明下pod和container的关系。pod是作为kubernetes里面的一个概念,它是作为容器调度的最小单位。pod相对于container是更高纬度的一个抽象,一个pod是可以有多个container的。container更多提供一个运行时的生命周期管理,而pod还要包含一个服务的生命周期的管理。
pod可提供的信息
我们再回过头来看我们之前的文章中提到的deployment的信息,而kubelet最终要生成的pod的信息就是基于deployment中的apps.template里的信息。
## pod的模板
template:
metadata:
creationTimestamp: null
## 这里的labels要与上面选中的一致
labels:
app: nginx
## pod的期望状态
spec:
## 容器配置
containers:
- image: nginx
## 镜像拉取策略
imagePullPolicy: Always
name: nginx
ports:
- containerPort: 80
protocol: TCP
resources: {}
## 中断日志
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
## dns策略
dnsPolicy: ClusterFirst
## 重启策略,一般配合liveness使用
restartPolicy: Always
## 该pod的调度器名称
schedulerName: default-scheduler
## 安全上下文
securityContext: {}
## 当删除pod的时,最长等待终止时间。
terminationGracePeriodSeconds: 30
复制代码
从上面我们可以看出,spec中的信息相对比较少,主要是基于containers的信息,而实际上pod可以配置的信息还远不止于此。实际上我们pod中可以
- 配置pod的QoS
- 配置卷
- 配置pod账户
- 配置存活,就绪和启动探测器
- 配置pod初始化
这里我就不再一一赘述了,大家可以参考官方文档:kubernetes.io/zh/docs/tas…
kubelet如何创建pod
kubelet不仅仅只是创建pod,大部分时间kubelet都是在维护节点上的各pod的运行状态,并将状态信息与apiserver进行交互。下面这张图应该是比较经典的一张图,主要表现的kubelet一个大SyncLoop,用于监听pod各类型事件。多个小syncLoop用于对node,network,pleg等的状态管理。
而pod是如何创建的,那依据这个大的SyncLoop获取到要添加pod的事件,并将该事件交给worker去执行。而事件执行的过程还要经历各个manager来进行状态的更新,最终达到一个期望的状态。整个过程可以看成如下几个步骤:
- 获取pod信息
- SyncPod
- 运行时生成
另外这整个过程涉及到的一些组件,可以参考下图:
获取pod信息
- 把所有的 pod 按照创建日期进行排序,保证最先创建的 pod 会最先被处理
- 把它加入到
podManager
中,因为podManager
是 kubelet 的 source of truth,所有被管理的 pod 都要出现在里面。如果podManager
中找不到某个 pod,就认为这个 pod 被删除了 - 如果是 mirror pod调用其单独的方法
- 验证 pod 是否能在该节点运行,如果不可以直接拒绝
- 把 pod 分配给给 worker 做异步处理
- 在
probeManager
中添加 pod,如果 pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测
SyncPod
- 如果是删除 pod,立即执行并返回
- 检查 pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息
- 如果是 static Pod,就创建或者更新对应的 mirrorPod
- 创建 pod 的数据目录,存放 volume 和 plugin 信息
- 如果定义了 PV,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情)
- 如果有 image secrets,去 apiserver 获取对应的 secrets 数据
- 调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑
运行时生成
- pasue 镜像
- 网络模型有没有变化
- 暴露的端口号有没有变化
- 镜像拉取策略
- 环境变量
以上的一个创建过程,你会发现与pod中的相关信息都是可以对应上的。这是这些信息的创建的先后顺序会有差异,那这些顺序为什么要这样呢,大家可以思考下这个问题。
源码分析
这张图呈现了kubelet的代码架构,让我们对kubelet的实现方式有个大体印象。源码这块我就从启动过程,和创建过程两个角度去分析。因为这两个过程是相互呼应。
启动分析
所有的源码分析,我建议都可以从启动分析入手,这里我们就清楚了kubelet是启动了哪些模块,这些模块具体做了什么事情。
这种图是kubernetes1.16版本的启动流程图,可以作为一个参考。
本文还着重去看pod的监听SyncLoop()函数是如何一步一步的启动的,后面pod的创建就要依赖这个函数的监听事件来触发。
// cmd/kubelet/kubelet.go
// 这里是kubelet的主函数
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewKubeletCommand()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
// cmd/kubelet/app/server.go
// 生成一个kubelet的command,该commadn会有一个Run方法
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.ErrorS(err, "Failed to create a new kubelet configuration")
os.Exit(1)
}
cmd := &cobra.Command{
...
}
}
// cmd/kubelet/app/server.go
// 这里主要是为kubelet的启动做一些基本的配置及检查工作
// 其中核心方法是RunKubelet
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
...
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
...
return nil
}
// cmd/kubelet/app/server.go
// RunKubelet 中主要调用了 createAndInitKubelet 方法执行 kubelet 组件的初始化,然后调用 startKubelet 启动 kubelet 中的组件。
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
// 该函数进行kubelet的初始化,各个控制器的初始化,kubelet依赖的所有模块的初始化
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime,
hostname,
hostnameOverridden,
nodeName,
nodeIPs,
kubeServer.ProviderID,
kubeServer.CloudProvider,
kubeServer.CertDirectory,
kubeServer.RootDirectory,
kubeServer.ImageCredentialProviderConfigFile,
kubeServer.ImageCredentialProviderBinDir,
kubeServer.RegisterNode,
kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls,
kubeServer.ExperimentalMounterPath,
kubeServer.KernelMemcgNotification,
kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeServer.MinimumGCAge,
kubeServer.MaxPerPodContainerCount,
kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable,
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.SeccompProfileRoot,
kubeServer.NodeStatusMaxImages)
if err != nil {
return fmt.Errorf("failed to create kubelet: %w", err)
}
...
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %w", err)
}
klog.InfoS("Started kubelet as runonce")
} else {
// 启动kubelet服务
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
}
return nil
}
// cmd/kubelet/app/server.go
// 在startKubelet 中通过调用 k.Run 来启动 kubelet 中的所有模块以及主流程,然后启动 kubelet 所需要的 http server
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
复制代码
// kubernetes/pkg/kubelet/kubelet.go
// 这个函数将对之前初始化的模块,进行逐一的启动,我们主要看看SyncLoop
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
// 调用 kl.syncLoop 监听 pod 变化
kl.syncLoop(updates, kl)
}
// kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
for {
...
// syncLoopIteration 方法会监听多个 channel,当发现任何一个 channel 有数据就交给 handler 去处理,在 handler 中通过调用 dispatchWork 分发任务
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
...
}
}
// kubernetes/pkg/kubelet/kubelet.go
// 这里我们就看到了监听创建pod事件以及对应的处理方法
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
...
switch u.Op {
case kubetypes.ADD:
...
handler.HandlePodAdditions(u.Pods)
...
}
return true
}
复制代码
创建pod流程
这里有张图展示个pod创建过程中使用到的函数,仅供参考。
在前面的kubelet的启动流程中, 我们已经知道syncLoop会监听pod的事件变化,当创建一个pod时,监听到事件变化,具体是哪个模块实例监听,哪个模块实例进行事件处理呢?具体的处理过程是怎样的呢?
我们基于前面的启动分析,从syncLoopiteration来分析
监听 pod 变化(syncLoopIteration)
我们看syncLoopIteration的传参
// kubernetes/pkg/kubelet/kubelet.go
// 这里我们就看到了监听创建pod事件以及对应的处理方法
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
...
switch u.Op {
case kubetypes.ADD:
...
handler.HandlePodAdditions(u.Pods)
...
}
return true
}
复制代码
从这里我们可以分别去看下configCh和SyncHandler
- configCh:该信息源由 kubeDeps 对象中的 PodConfig 子模块提供,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作。
- SyncHandler中包含HandlePodAddtions方法,用于对pod新增事件进行处理
pod事件监听configCh
我们通过kubelet的初始化查看,configCh是如何生成
// kubernetes/pkg/kubelet/kubelet.go
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
...
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
...
return cfg, nil
}
// kubernetes/pkg/kubelet/config/config.go
// 创建podConfig
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}
复制代码
弄清楚podConfig的结构以及初始化流程后,那kubelet是如何监听到事件转换成podConfig的呢?这里其实也同样用到了informer,我就不再赘述了。大家可以参考:juejin.cn/post/697582…
pod新增处理(HandlePodAddtions)
我们可以基于启动流程分析中查看,找到当时kubelet创建时,初始化的是什么handler,其对应的方法是什么逻辑。handler其实是个接口,而kubelet都满足这个接口的要求,所以kubelet也是一个handler实例。
// kubernetes/pkg/kubelet/kubelet.go
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not
// terminated.
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
// 通过 dispatchWork 分发 pod 做异步处理,dispatchWork 主要工作就是把接收到的参数封装成 UpdatePodOptions,调用 UpdatePod 方法.
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
复制代码
下发任务
dispatchWorker 的主要作用是把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers。
// kubernetes/pkg/kubelet/kubelet.go
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
...
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
}
},
})
...
}
复制代码
更新事件
podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。而 podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。
// UpdatePod apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
...
if podUpdates, exists = p.podUpdates[uid]; !exists {
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
...
}
复制代码
用 syncPodFn 方法同步 pod(managePodLoop)
managePodLoop 调用 syncPodFn 方法去同步 pod,syncPodFn 实际上就是kubelet.SyncPod。在完成这次 sync 动作之后,会调用 wrapUp 函数,这个函数将会做几件事情:
- 将这个 pod 信息插入 kubelet 的 workQueue 队列中,等待下一次周期性的对这个 pod 的状态进行 sync
- 将在这次 sync 期间堆积的没有能够来得及处理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即处理。
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
...
p.wrapUp(update.Pod.UID, err)
}
}
复制代码
完成创建容器前的准备工作(SyncPod)
managePodLoop将pod的更新信息插入workQueue中,而kubelet将有一个goroutine去监听这个队列,进行pod的创建。创建kubelet时,有一个函数newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)。这里我们就看看这个函数。
// kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// if we want to kill a pod, do it now!
/// 判断是否删除pod
if updateType == kubetypes.SyncPodKill {
...
}
...
// 判断pod是否能在本节点运行
runnable := kl.canRunPod(pod)
if !runnable.Admit {
...
}
// 更新pod的状态
// Update status in the status manager
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// 如果pod非running,则直接kill掉
// Kill pod if it should not be running
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
...
}
// 加载网络插件
// If the network plugin is not ready, only start the pod if it uses the host network
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
...
}
// Create Cgroups for the pod and apply resource parameters
// to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
if !kl.podIsTerminated(pod) {
// 创建并更新pod的cgroups
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
...
}
}
}
// Create Mirror Pod for Static Pod if it doesn't already exist
// 如果是static pod,则创建对应的mirror pod,用于客户端调用apiserver能查到信息。
if kubetypes.IsStaticPod(pod) {
...
}
// 创建数据目录
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
...
}
// 挂载volume
// Volume manager will not mount volumes for terminated pods
if !kl.podIsTerminated(pod) {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
...
}
}
// 获取secret信息
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// 调用containerRuntime的SyncPod方法开始创建容器
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
...
return nil
}
return nil
}
复制代码
创建容器
containerRuntime(pkg/kubelet/kuberuntime)子模块的 SyncPod 函数才是真正完成 pod 内容器实体的创建。
// kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
/ SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
...
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
...
} else {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
...
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
...
}
}
}
...
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
...
}
}
...
// Step 5: start ephemeral containers
// These are started "prior" to init containers to allow running ephemeral containers even when there
// are errors starting an init container. In practice init containers will start first since ephemeral
// containers cannot be specified on pod creation.
if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
for _, idx := range podContainerChanges.EphemeralContainersToStart {
start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
}
// Step 6: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
if err := start("init container", containerStartSpec(container)); err != nil {
return
}
// Successfully started the container; clear the entry in the failure
klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}
// Step 7: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
start("container", containerStartSpec(&pod.Spec.Containers[idx]))
}
return
}
复制代码
启动容器
最终由 startContainer 完成容器的启动,启动容器实际上是调用的runtime的api,根据你使用的runtime不同,实现逻辑也会不一样。
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
container := spec.container
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
if err != nil {
...
}
// Step 2: create the container.
// For a new container, the RestartCount should be 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
...
// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)
if err != nil {
...
}
...
// Step 4: execute the post start hook.
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
// runner.Run 这个方法的主要作用就是在业务容器起来的时候,
// 首先会执行一个 container hook(PostStart 和 PreStop),做一些预处理工作。
// 只有 container hook 执行成功才会运行具体的业务服务,否则容器异常。
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
...
}
}
return "", nil
}
复制代码
总结
本文主要讲述了 kubelet 从监听到容器调度至本节点再到创建容器的一个过程,kubelet 最终调用CRI的接口来创建容器的。我们再次梳理下本篇文章的重点
- pod的创建通过informer监听到事件,再通过syncLoop处理进行任务的下发,实际上是由podWorker来实现最终任务的处理
- kubelet创建pod不仅仅是启动容器,还要考虑到volume,secret等配置
- kubelet的实现过程中,通过goroutine进行循环监听处理。除了监听pod事件外,还有很多manger进行状态监听。
结束语
kubelet最终创建完pod后,此次系列文章也告一段落。下一阶段我将会抽空出一些番外篇,例如etcd的原理分析。kubernetes庖丁解牛系列中,文章中必然会有一些不严谨的地方,还希望大家包涵,大家吸取精华(如果有的话),去其糟粕。如果大家感兴趣可以关我的公众号:gungunxi。我的微信号:lcomedy2021
近期评论