kubelet 源码阅读(一)
Table of Contents
1 前言
一直在使用k8s,但是也一直没有看过k8s的源码,一来是因为一直都是使用的状态,有问题就呼叫容器组;二来是觉得项目比较大,会耗费很多时间。目前换了个比较不大忙的工作,所以就想着把这块内容补起来,起码得把k8s几大组件都简单看一遍。
1.1 Kubelet
kubelet 就个人理解来说,主要作用有两个:
- 上报节点数据
- 保持Pod的状态跟api server中声明的一样。
2 Kubelet 主循环
kubelet 主流程如下:
- 从api server(主要是api server)中获取更新的数据
- podWorkers 每个Pod开启一个goroutinue,处理pod的更新
- 删除Pod
- 执行删除前的Hook
- 停止podWorkers的goroutinue
- 遍历pod里的容器,对每个容器执行删除操作
- 更新Pod
- 计算Pod的状态
- 等待volume挂载完成
- 调用kubeGenericRuntimeManager的SyncPod处理
- 调用podSandboxChanged计算应该如何处理的容器跟pod
- 调用CRI接口进行处理
2.1 TODO 监听api server的变更
kubelet会监听node上的pod资源的变更,然后合并多个源的变更. 代码如下:
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
case PodConfigNotificationSnapshotAndUpdates:
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
case PodConfigNotificationUnknown:
fallthrough
default:
panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
}
return nil
}
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups() error
}
2.2 TODO podWorkers 通过每个Pod一个goroutinue的方式处理Pod消息的更新
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
2.3 TODO 计算pod的变更跟需要的执行的动作
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
}
}
//省略
}
3 ProbeManager 的作用与实现
ProbeManager 作用是用于执行pod的探针。每个探针都开启一个goroutinue周期执行。并把结果设置到status中。
4 ImageManager 的作用与实现
ImageManager 用于管理主机上的image的生命周期。
type ImageGCManager interface {
GarbageCollect() error
Start()
GetImageList() ([]container.Image, error)
DeleteUnusedImages() error
}
- Start 函数每五分钟检测本机是否有新的镜像,更新镜像使用时间。
- Start 函数每30s获取本机的所有镜像,更新到自己的缓存中。
- GetImageList 从缓存中获取本地的所有镜像。
- GarbageCollect 获取本机的镜像使用情况,如果使用量达到一定的水位,则会尝试删除未使用的镜像。(如果镜像存在时间过短也不会删除,防止刚拉取的镜像还未使用就被删除,这个时间默认是2分钟)
- DeleteUnusedImages 删除未使用的镜像,回收磁盘空间,跟 GarbageCollect 不同的 GC 是当镜像磁盘使用率达到一定空间后才会触发,而且只回收一定的空间(满足空间未达到高水位), 而此函数会尽可能的回收所有空间。
5 VolumeManager 的作用与实现
type VolumeManager interface {
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
WaitForAttachAndMount(pod *v1.Pod) error
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
GetVolumesInUse() []v1.UniqueVolumeName
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
- Run 会启动desiredStateOfWorldPopulator 和 reconciler 两个goroutinue。
- desiredStateOfWorldPopulator 从PodManager中获取Pods,计算最终的数据,存放在desiredStateOfWorld.
- reconciler 通过desiredStateOfWorld 把现在的状态慢慢的迁移到期望的状态, reconciler 主要执行以下三个操作.
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountAttachVolumes()
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
}
- WaitForAttachAndMount 会检查pod是否达到期望状态,会等待2分钟。
- GetMountedVolumesForPod 获取当前pod的卷的实际状态。
- GetVolumesInUse 获取实际状态和期望状态的在使用的卷。
- VolumeIsAttached 如果volumeName在实际状态中,则表示在已经attach
6 其他一些简单goroutinue的工作
- 同步节点状态
- 同步租约
- 更新容器运行时状态
- 每秒执行PodKilling
7 StatusManager 的作用与实现
type Manager interface {
PodStatusProvider
Start()
SetPodStatus(pod *v1.Pod, status v1.PodStatus)
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
TerminatePod(pod *v1.Pod)
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
- Start 启动一个goroutinue 定时同步Pod的状态到api server
- SetPodStatus 在监听到pod状态改变的时候更新Pod的状态