apiserver 源码阅读(一)
Table of Contents
1 前言
apiserver 提供了给其他组件对资源对象的增删改查等,其他组件通过监听apiserver的资源执行相应的操作。
2 Authorizer
主要通过role,rolebinding获取对应的配置。根据配置判断是否角色有对应权限.
func (r *RBACAuthorizer) Authorize(ctx context.Context, requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) { ruleCheckingVisitor := &authorizingVisitor{requestAttributes: requestAttributes} r.authorizationRuleResolver.VisitRulesFor(requestAttributes.GetUser(), requestAttributes.GetNamespace(), ruleCheckingVisitor.visit) if ruleCheckingVisitor.allowed { return authorizer.DecisionAllow, ruleCheckingVisitor.reason, nil } // Build a detailed log of the denial. // Make the whole block conditional so we don't do a lot of string-building we won't use. if klog.V(5).Enabled() { var operation string if requestAttributes.IsResourceRequest() { b := &bytes.Buffer{} b.WriteString(`"`) b.WriteString(requestAttributes.GetVerb()) b.WriteString(`" resource "`) b.WriteString(requestAttributes.GetResource()) if len(requestAttributes.GetAPIGroup()) > 0 { b.WriteString(`.`) b.WriteString(requestAttributes.GetAPIGroup()) } if len(requestAttributes.GetSubresource()) > 0 { b.WriteString(`/`) b.WriteString(requestAttributes.GetSubresource()) } b.WriteString(`"`) if len(requestAttributes.GetName()) > 0 { b.WriteString(` named "`) b.WriteString(requestAttributes.GetName()) b.WriteString(`"`) } operation = b.String() } else { operation = fmt.Sprintf("%q nonResourceURL %q", requestAttributes.GetVerb(), requestAttributes.GetPath()) } var scope string if ns := requestAttributes.GetNamespace(); len(ns) > 0 { scope = fmt.Sprintf("in namespace %q", ns) } else { scope = "cluster-wide" } klog.Infof("RBAC: no rules authorize user %q with groups %q to %s %s", requestAttributes.GetUser().GetName(), requestAttributes.GetUser().GetGroups(), operation, scope) } reason := "" if len(ruleCheckingVisitor.errors) > 0 { reason = fmt.Sprintf("RBAC: %v", utilerrors.NewAggregate(ruleCheckingVisitor.errors)) } return authorizer.DecisionNoOpinion, reason, nil }
3 APIServerHandler
type APIServerHandler struct { FullHandlerChain http.Handler GoRestfulContainer *restful.Container NonGoRestfulMux *mux.PathRecorderMux Director http.Handler }
- FullHandlerChain 处理所有的请求,是对Director的封装,多了一个自定义中间件功能。
- GoRestfulContainer 是处理所有的restful风格的的请求,对资源的操作一般来说都是restful风格的。
- NonGoRestfulMux 主要处理一些非restful风格的请求,比如监控,探针,pprof等。
- Director 主要是对 restful跟非restful风格的包装。
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) { path := req.URL.Path // check to see if our webservices want to claim this path for _, ws := range d.goRestfulContainer.RegisteredWebServices() { switch { case ws.RootPath() == "/apis": // if we are exactly /apis or /apis/, then we need special handling in loop. // normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly. // We can't rely on a prefix match since /apis matches everything (see the big comment on Director above) if path == "/apis" || path == "/apis/" { klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath()) // don't use servemux here because gorestful servemuxes get messed up when removing webservices // TODO fix gorestful, remove TPRs, or stop using gorestful d.goRestfulContainer.Dispatch(w, req) return } case strings.HasPrefix(path, ws.RootPath()): // ensure an exact match or a path boundary match if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' { klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath()) // don't use servemux here because gorestful servemuxes get messed up when removing webservices // TODO fix gorestful, remove TPRs, or stop using gorestful d.goRestfulContainer.Dispatch(w, req) return } } } // if we didn't find a match, then we just skip gorestful altogether klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path) d.nonGoRestfulMux.ServeHTTP(w, req) }
从上面可以看出Director结构体的逻辑:
- 如果请求路径以apis开头的,则肯定是resultful风格的,由goresultcontainer处理
- 如果当前请求路径已经注册到resultful风格下,则由goresultcontainer处理
- 其他情况由非resultful风格的handler处理
4 ServiceResolver
主要通过service跟endpoint找到对应的服务名的ip地址。
5 主流程
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error { // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) server, err := CreateServerChain(completeOptions, stopCh) if err != nil { return err } prepared, err := server.PrepareRun() if err != nil { return err } return prepared.Run(stopCh) } // CreateServerChain creates the apiservers connected via delegation. func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) { nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions) if err != nil { return nil, err } kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) if err != nil { return nil, err } // If additional API servers are added, they should be gated. apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount, serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)) if err != nil { return nil, err } apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err } kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer) if err != nil { return nil, err } // aggregator comes last in the chain aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer) if err != nil { return nil, err } aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return nil, err } return aggregatorServer, nil }
// APIAggregator contains state for a Kubernetes cluster master/api server. type APIAggregator struct { GenericAPIServer *genericapiserver.GenericAPIServer delegateHandler http.Handler // proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity proxyCurrentCertKeyContent certKeyFunc proxyTransport *http.Transport // proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name proxyHandlers map[string]*proxyHandler // handledGroups are the groups that already have routes handledGroups sets.String // lister is used to add group handling for /apis/<group> aggregator lookups based on // controller state lister listers.APIServiceLister // provided for easier embedding APIRegistrationInformers informers.SharedInformerFactory // Information needed to determine routing for the aggregator serviceResolver ServiceResolver // Enable swagger and/or OpenAPI if these configs are non-nil. openAPIConfig *openapicommon.Config // openAPIAggregationController downloads and merges OpenAPI specs. openAPIAggregationController *openapicontroller.AggregationController // egressSelector selects the proper egress dialer to communicate with the custom apiserver // overwrites proxyTransport dialer if not nil egressSelector *egressselector.EgressSelector } func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { // add post start hook before generic PrepareRun in order to be before /healthz installation if s.openAPIConfig != nil { s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error { go s.openAPIAggregationController.Run(context.StopCh) return nil }) } prepared := s.GenericAPIServer.PrepareRun() // delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers if s.openAPIConfig != nil { specDownloader := openapiaggregator.NewDownloader() openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator( &specDownloader, s.GenericAPIServer.NextDelegate(), s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(), s.openAPIConfig, s.GenericAPIServer.Handler.NonGoRestfulMux) if err != nil { return preparedAPIAggregator{}, err } s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator) } return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil } func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error { return s.runnable.Run(stopCh) }
5.1 注册API Handler
CreateKubeAPIServer调用
kubeAPIServerConfig.Complete().New(delegateAPIServer)
注册apifunc (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error { legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) if err != nil { return fmt.Errorf("error building core storage: %v", err) } controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient()) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { return fmt.Errorf("error in registering group versions: %v", err) } return nil }
5.2 启动HTTP Server
- APIAggregator 通过调用GenericAPIServer 的PrepareRun 生成可以Run的preparedGenericAPIServer
- preparedGenericAPIServer.Run 启动一个http server.