kubelet 源码阅读(一)
Table of Contents
1 前言
一直在使用k8s,但是也一直没有看过k8s的源码,一来是因为一直都是使用的状态,有问题就呼叫容器组;二来是觉得项目比较大,会耗费很多时间。目前换了个比较不大忙的工作,所以就想着把这块内容补起来,起码得把k8s几大组件都简单看一遍。
1.1 Kubelet
kubelet 就个人理解来说,主要作用有两个:
- 上报节点数据
- 保持Pod的状态跟api server中声明的一样。
2 Kubelet 主循环
kubelet 主流程如下:
- 从api server(主要是api server)中获取更新的数据
- podWorkers 每个Pod开启一个goroutinue,处理pod的更新
- 删除Pod
- 执行删除前的Hook
- 停止podWorkers的goroutinue
- 遍历pod里的容器,对每个容器执行删除操作
- 更新Pod
- 计算Pod的状态
- 等待volume挂载完成
- 调用kubeGenericRuntimeManager的SyncPod处理
- 调用podSandboxChanged计算应该如何处理的容器跟pod
- 调用CRI接口进行处理
2.1 TODO 监听api server的变更
kubelet会监听node上的pod资源的变更,然后合并多个源的变更. 代码如下:
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) { lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName))) newSourceApiserverFromLW(lw, updates) } func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) { send := func(objs []interface{}) { var pods []*v1.Pod for _, o := range objs { pods = append(pods, o.(*v1.Pod)) } updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource} } r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0) go r.Run(wait.NeverStop) }
func (s *podStorage) Merge(source string, change interface{}) error { s.updateLock.Lock() defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source) adds, updates, deletes, removes, reconciles := s.merge(source, change) firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications switch s.mode { case PodConfigNotificationIncremental: if len(removes.Pods) > 0 { s.updates <- *removes } if len(adds.Pods) > 0 { s.updates <- *adds } if len(updates.Pods) > 0 { s.updates <- *updates } if len(deletes.Pods) > 0 { s.updates <- *deletes } if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 { // Send an empty update when first seeing the source and there are // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that // the source is ready. s.updates <- *adds } // Only add reconcile support here, because kubelet doesn't support Snapshot update now. if len(reconciles.Pods) > 0 { s.updates <- *reconciles } case PodConfigNotificationSnapshotAndUpdates: if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} } if len(updates.Pods) > 0 { s.updates <- *updates } if len(deletes.Pods) > 0 { s.updates <- *deletes } case PodConfigNotificationSnapshot: if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet { s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} } case PodConfigNotificationUnknown: fallthrough default: panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode)) } return nil }
type SyncHandler interface { HandlePodAdditions(pods []*v1.Pod) HandlePodUpdates(pods []*v1.Pod) HandlePodRemoves(pods []*v1.Pod) HandlePodReconcile(pods []*v1.Pod) HandlePodSyncs(pods []*v1.Pod) HandlePodCleanups() error }
2.2 TODO podWorkers 通过每个Pod一个goroutinue的方式处理Pod消息的更新
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) { pod := options.Pod uid := pod.UID var podUpdates chan UpdatePodOptions var exists bool p.podLock.Lock() defer p.podLock.Unlock() if podUpdates, exists = p.podUpdates[uid]; !exists { podUpdates = make(chan UpdatePodOptions, 1) p.podUpdates[uid] = podUpdates go func() { defer runtime.HandleCrash() p.managePodLoop(podUpdates) }() } if !p.isWorking[pod.UID] { p.isWorking[pod.UID] = true podUpdates <- *options } else { update, found := p.lastUndeliveredWorkUpdate[pod.UID] if !found || update.UpdateType != kubetypes.SyncPodKill { p.lastUndeliveredWorkUpdate[pod.UID] = *options } } }
2.3 TODO 计算pod的变更跟需要的执行的动作
// SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create ephemeral containers. // 6. Create init containers. // 7. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes. podContainerChanges := m.computePodActions(pod, podStatus) klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod)) if podContainerChanges.CreateSandbox { ref, err := ref.GetReference(legacyscheme.Scheme, pod) if err != nil { klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err) } if podContainerChanges.SandboxID != "" { m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.") } else { klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod)) } } //省略 }
3 ProbeManager 的作用与实现
ProbeManager 作用是用于执行pod的探针。每个探针都开启一个goroutinue周期执行。并把结果设置到status中。
4 ImageManager 的作用与实现
ImageManager 用于管理主机上的image的生命周期。
type ImageGCManager interface { GarbageCollect() error Start() GetImageList() ([]container.Image, error) DeleteUnusedImages() error }
- Start 函数每五分钟检测本机是否有新的镜像,更新镜像使用时间。
- Start 函数每30s获取本机的所有镜像,更新到自己的缓存中。
- GetImageList 从缓存中获取本地的所有镜像。
- GarbageCollect 获取本机的镜像使用情况,如果使用量达到一定的水位,则会尝试删除未使用的镜像。(如果镜像存在时间过短也不会删除,防止刚拉取的镜像还未使用就被删除,这个时间默认是2分钟)
- DeleteUnusedImages 删除未使用的镜像,回收磁盘空间,跟 GarbageCollect 不同的 GC 是当镜像磁盘使用率达到一定空间后才会触发,而且只回收一定的空间(满足空间未达到高水位), 而此函数会尽可能的回收所有空间。
5 VolumeManager 的作用与实现
type VolumeManager interface { Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) WaitForAttachAndMount(pod *v1.Pod) error GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 GetVolumesInUse() []v1.UniqueVolumeName VolumeIsAttached(volumeName v1.UniqueVolumeName) bool MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName) }
- Run 会启动desiredStateOfWorldPopulator 和 reconciler 两个goroutinue。
- desiredStateOfWorldPopulator 从PodManager中获取Pods,计算最终的数据,存放在desiredStateOfWorld.
- reconciler 通过desiredStateOfWorld 把现在的状态慢慢的迁移到期望的状态, reconciler 主要执行以下三个操作.
func (rc *reconciler) reconcile() { // Unmounts are triggered before mounts so that a volume that was // referenced by a pod that was deleted and is now referenced by another // pod is unmounted from the first pod before being mounted to the new // pod. rc.unmountVolumes() // Next we mount required volumes. This function could also trigger // attach if kubelet is responsible for attaching volumes. // If underlying PVC was resized while in-use then this function also handles volume // resizing. rc.mountAttachVolumes() // Ensure devices that should be detached/unmounted are detached/unmounted. rc.unmountDetachDevices() }
- WaitForAttachAndMount 会检查pod是否达到期望状态,会等待2分钟。
- GetMountedVolumesForPod 获取当前pod的卷的实际状态。
- GetVolumesInUse 获取实际状态和期望状态的在使用的卷。
- VolumeIsAttached 如果volumeName在实际状态中,则表示在已经attach
6 其他一些简单goroutinue的工作
- 同步节点状态
- 同步租约
- 更新容器运行时状态
- 每秒执行PodKilling
7 StatusManager 的作用与实现
type Manager interface { PodStatusProvider Start() SetPodStatus(pod *v1.Pod, status v1.PodStatus) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) TerminatePod(pod *v1.Pod) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) }
- Start 启动一个goroutinue 定时同步Pod的状态到api server
- SetPodStatus 在监听到pod状态改变的时候更新Pod的状态