UP | HOME

kubelet 源码阅读(一)

Table of Contents

1 前言

一直在使用k8s,但是也一直没有看过k8s的源码,一来是因为一直都是使用的状态,有问题就呼叫容器组;二来是觉得项目比较大,会耗费很多时间。目前换了个比较不大忙的工作,所以就想着把这块内容补起来,起码得把k8s几大组件都简单看一遍。

1.1 Kubelet

kubelet 就个人理解来说,主要作用有两个:

  • 上报节点数据
  • 保持Pod的状态跟api server中声明的一样。

2 Kubelet 主循环

kubelet 主流程如下:

  • 从api server(主要是api server)中获取更新的数据
  • podWorkers 每个Pod开启一个goroutinue,处理pod的更新
  • 删除Pod
    • 执行删除前的Hook
    • 停止podWorkers的goroutinue
    • 遍历pod里的容器,对每个容器执行删除操作
  • 更新Pod
    • 计算Pod的状态
    • 等待volume挂载完成
    • 调用kubeGenericRuntimeManager的SyncPod处理
      • 调用podSandboxChanged计算应该如何处理的容器跟pod
      • 调用CRI接口进行处理

2.1 TODO 监听api server的变更

kubelet会监听node上的pod资源的变更,然后合并多个源的变更. 代码如下:

func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
	newSourceApiserverFromLW(lw, updates)
}
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
	}
	r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
	go r.Run(wait.NeverStop)
}
func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
	adds, updates, deletes, removes, reconciles := s.merge(source, change)
	firstSet := !seenBefore && s.sourcesSeen.Has(source)

	// deliver update notifications
	switch s.mode {
	case PodConfigNotificationIncremental:
		if len(removes.Pods) > 0 {
			s.updates <- *removes
		}
		if len(adds.Pods) > 0 {
			s.updates <- *adds
		}
		if len(updates.Pods) > 0 {
			s.updates <- *updates
		}
		if len(deletes.Pods) > 0 {
			s.updates <- *deletes
		}
		if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
			// Send an empty update when first seeing the source and there are
			// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
			// the source is ready.
			s.updates <- *adds
		}
		// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
		if len(reconciles.Pods) > 0 {
			s.updates <- *reconciles
		}

	case PodConfigNotificationSnapshotAndUpdates:
		if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
			s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
		}
		if len(updates.Pods) > 0 {
			s.updates <- *updates
		}
		if len(deletes.Pods) > 0 {
			s.updates <- *deletes
		}

	case PodConfigNotificationSnapshot:
		if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
			s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
		}

	case PodConfigNotificationUnknown:
		fallthrough
	default:
		panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
	}

	return nil
}
type SyncHandler interface {
	HandlePodAdditions(pods []*v1.Pod)
	HandlePodUpdates(pods []*v1.Pod)
	HandlePodRemoves(pods []*v1.Pod)
	HandlePodReconcile(pods []*v1.Pod)
	HandlePodSyncs(pods []*v1.Pod)
	HandlePodCleanups() error
}

2.2 TODO podWorkers 通过每个Pod一个goroutinue的方式处理Pod消息的更新

func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
	pod := options.Pod
	uid := pod.UID
	var podUpdates chan UpdatePodOptions
	var exists bool

	p.podLock.Lock()
	defer p.podLock.Unlock()
	if podUpdates, exists = p.podUpdates[uid]; !exists {
		podUpdates = make(chan UpdatePodOptions, 1)
		p.podUpdates[uid] = podUpdates
		go func() {
			defer runtime.HandleCrash()
			p.managePodLoop(podUpdates)
		}()
	}
	if !p.isWorking[pod.UID] {
		p.isWorking[pod.UID] = true
		podUpdates <- *options
	} else {
		update, found := p.lastUndeliveredWorkUpdate[pod.UID]
		if !found || update.UpdateType != kubetypes.SyncPodKill {
			p.lastUndeliveredWorkUpdate[pod.UID] = *options
		}
	}
}

2.3 TODO 计算pod的变更跟需要的执行的动作

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	podContainerChanges := m.computePodActions(pod, podStatus)
	klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
	if podContainerChanges.CreateSandbox {
		ref, err := ref.GetReference(legacyscheme.Scheme, pod)
		if err != nil {
			klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
		}
		if podContainerChanges.SandboxID != "" {
			m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
		} else {
			klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
		}
	}
//省略
}

3 ProbeManager 的作用与实现

ProbeManager 作用是用于执行pod的探针。每个探针都开启一个goroutinue周期执行。并把结果设置到status中。

4 ImageManager 的作用与实现

ImageManager 用于管理主机上的image的生命周期。

type ImageGCManager interface {
       GarbageCollect() error
       Start()
       GetImageList() ([]container.Image, error)
       DeleteUnusedImages() error
}
  • Start 函数每五分钟检测本机是否有新的镜像,更新镜像使用时间。
  • Start 函数每30s获取本机的所有镜像,更新到自己的缓存中。
  • GetImageList 从缓存中获取本地的所有镜像。
  • GarbageCollect 获取本机的镜像使用情况,如果使用量达到一定的水位,则会尝试删除未使用的镜像。(如果镜像存在时间过短也不会删除,防止刚拉取的镜像还未使用就被删除,这个时间默认是2分钟)
  • DeleteUnusedImages 删除未使用的镜像,回收磁盘空间,跟 GarbageCollect 不同的 GC 是当镜像磁盘使用率达到一定空间后才会触发,而且只回收一定的空间(满足空间未达到高水位), 而此函数会尽可能的回收所有空间。

5 VolumeManager 的作用与实现

type VolumeManager interface {
	Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
	WaitForAttachAndMount(pod *v1.Pod) error
	GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
	GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
	GetVolumesInUse() []v1.UniqueVolumeName
	VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
	MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
  • Run 会启动desiredStateOfWorldPopulator 和 reconciler 两个goroutinue。
  • desiredStateOfWorldPopulator 从PodManager中获取Pods,计算最终的数据,存放在desiredStateOfWorld.
  • reconciler 通过desiredStateOfWorld 把现在的状态慢慢的迁移到期望的状态, reconciler 主要执行以下三个操作.
func (rc *reconciler) reconcile() {
	// Unmounts are triggered before mounts so that a volume that was
	// referenced by a pod that was deleted and is now referenced by another
	// pod is unmounted from the first pod before being mounted to the new
	// pod.
	rc.unmountVolumes()

	// Next we mount required volumes. This function could also trigger
	// attach if kubelet is responsible for attaching volumes.
	// If underlying PVC was resized while in-use then this function also handles volume
	// resizing.
	rc.mountAttachVolumes()

	// Ensure devices that should be detached/unmounted are detached/unmounted.
	rc.unmountDetachDevices()
}
  • WaitForAttachAndMount 会检查pod是否达到期望状态,会等待2分钟。
  • GetMountedVolumesForPod 获取当前pod的卷的实际状态。
  • GetVolumesInUse 获取实际状态和期望状态的在使用的卷。
  • VolumeIsAttached 如果volumeName在实际状态中,则表示在已经attach

6 其他一些简单goroutinue的工作

  • 同步节点状态
  • 同步租约
  • 更新容器运行时状态
  • 每秒执行PodKilling

7 StatusManager 的作用与实现

type Manager interface {
	PodStatusProvider
	Start()
	SetPodStatus(pod *v1.Pod, status v1.PodStatus)
	SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
	SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
	TerminatePod(pod *v1.Pod)
	RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
  • Start 启动一个goroutinue 定时同步Pod的状态到api server
  • SetPodStatus 在监听到pod状态改变的时候更新Pod的状态