Kubernetes API Server handler 注冊(cè)過(guò)程分析
前言
K8s提供 Aggregated APIServer? 的擴(kuò)展方式,編寫(xiě) Aggregated APIServer? 本質(zhì)上和K8s構(gòu)建方式類似,理解 APiServer 資源的加載方式,能更好好的理解如何開(kāi)發(fā)Aggregated APIServer。本文以內(nèi)置資源的 handler 注冊(cè)過(guò)程為線索介紹了 APiServer 的啟動(dòng)過(guò)程和 handler 注冊(cè)過(guò)程。使用k8s代碼commit id為c6970e64528ba78b74bf77b86f9b78b7b61bd0cd
APIServer啟動(dòng)過(guò)程介紹
圖1 APIServer啟動(dòng)流程
圖1給出了 ApiServer 的初始化流程,首先通過(guò) CreateServerChain 構(gòu)造出3個(gè)APIServer:
- AggregatorServer:攔截Aggregated APIServer? 中定義的資源對(duì)象請(qǐng)求,并轉(zhuǎn)發(fā)給相關(guān)的Aggregated APIServer 處理。
- KubeAPIServer:用于處理 k8s 的內(nèi)建資源,如:Deployment,ConfigMap 等。
- APIExtensionServer:負(fù)責(zé)處理用戶自定義資源。
它們之間的處理順序?yàn)槿缦聢D所示,當(dāng)用戶請(qǐng)求進(jìn)來(lái),先判斷 AggregatorServer? 能否處理,否則代理給 kubeApiServer? ,如果 kubeApiServer? 不能處代理給 ApiExtensionServer 處理,如果都不能處理則交給 notFoundHandler 處理。
圖2 三種 APIServer 請(qǐng)求順序
限于篇幅原因,本文主要分析 kubeapiserver 的啟動(dòng)過(guò)程。
CreateApiServerConfig? 通過(guò)調(diào)用 buildGenericConfig? 構(gòu)建 genericapiserver.Config。genericapiserver.Config? 中包含了啟動(dòng)Genericapiserver? 所需要的配置信息,比如:RequestTimeout? 定義了請(qǐng)求的超時(shí)時(shí)間,AdmissionControl? 對(duì)象進(jìn)行準(zhǔn)入控制。buildGenericConfig? 中需要注意的是 BuildHandlerChainFunc?,請(qǐng)求在路由給資源對(duì)象的handler前先經(jīng)過(guò)的BuildHandlerChainFunc? 中定義的 Filter? 。參考圖1,通過(guò)深入 buildGenericConfig? 可以發(fā)現(xiàn) BuildHandlerChainFunc? 傳入的是 DefaultBuildHandlerChain? ,其中 Filter 先定義的后調(diào)用。
// k8s.io/apiserver/pkg/server/config.go
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
// 構(gòu)造權(quán)限檢查filter
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
...
// 構(gòu)造認(rèn)證filter
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
...
// 構(gòu)造請(qǐng)求超時(shí)filter, LongRunningFunc會(huì)判斷該請(qǐng)求是否是需要LongRunning的,比如watch的請(qǐng)求,如果是,該filter不會(huì)對(duì)這類請(qǐng)求生效
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
...
// 初始化RequestInfo的filter并將其放入context中,后續(xù)的處理邏輯可以從context直接獲取RequestInfo
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
....
return handler
}
CreateKubeAPIServer? 中調(diào)用了kubeAPIServerConfig.Complete().New?構(gòu)造出了 kubeAPIServer? 的 GenericServer。kubeAPIServerConfig.Complete().New?中通過(guò)調(diào)用 m.InstallLegacyAPI? 初始化核心資源并添加進(jìn)路由中,對(duì)應(yīng)的是以 api 開(kāi)頭的資源,如:Pod,ConfigMap 等。調(diào)用 m.InstallAPI 初始化以 apis 開(kāi)頭的內(nèi)置資源如:Deployment。
handler的注冊(cè)過(guò)程
從圖1可以看出 InstallAPI? 與 InstallLegacyAPI? 的創(chuàng)建過(guò)程基本類似,本文主要介紹 InstallAPI 的初始化過(guò)程。
在調(diào)用 InstallAPI? 之前kubeAPIServerConfig.Complete().New?會(huì)先創(chuàng)建內(nèi)置資源對(duì)象的RESTStorageProvider? 作為 InstallAPI 的入?yún)?/p>
//pkg/controlplane/instance.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
...
// 構(gòu)造內(nèi)置資源的RESTStorageProvider
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
...
}
RESTStorageProvider? 是一個(gè)接口,通過(guò)其 NewRESTStorage? 構(gòu)造出 APIGroupInfo? ,APIGroupInfo? 包含注冊(cè)資源所需的基本信息比如編解碼器,組下所有資源的 Storage 對(duì)象VersionedResourcesStorageMap。
//k8s.io/apiserver/pkg/server/genericapiserver.go
// Info about an API group.
type APIGroupInfo struct {
PrioritizedVersions []schema.GroupVersion
// Info about the resources in this group. It's a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage
...
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
...
}
VersionedResourcesStorageMap? 需要重點(diǎn)注意,編寫(xiě) Aggregated APIServer?主要邏輯是通過(guò) NewDefaultAPIGroupInfo? 初始化 APIGroupInfo? 以后設(shè)置 VersionedResourcesStorageMap? 屬性。VersionedResourcesStorageMap?的簽名是 map[string]map[string]rest.Storage?。第一個(gè)key是版本號(hào),第二個(gè)key是資源名稱,資源名稱可以是 deployment 這種資源,同時(shí)也能是子資源如 pod/status? , pod/log? 等是pod的子資源有單獨(dú)的storage。最終構(gòu)建handler的請(qǐng)求路徑是基于 VersionedResourcesStorageMap? 中提供的版本號(hào)和資源名稱確定的 。rest.Storage 用于處理具體的請(qǐng)求,其聲明如下:
// k8s.io/apiserver/pkg/registry/rest/rest.go
// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
// Destroy cleans up its resources on shutdown.
// Destroy has to be implemented in thread-safe way and be prepared
// for being called more than once.
Destroy()
}
實(shí)現(xiàn) rest.Storage? 的接口最基本的,如果需要支持不同的請(qǐng)求,還需要實(shí)現(xiàn)其他的接口,相關(guān)定義在 k8s.io/apiserver/pkg/registry/rest/rest.go中,如:
// k8s.io/apiserver/pkg/registry/rest/rest.go
// 資源對(duì)象支持POST請(qǐng)求,例入通過(guò)kubectl create一個(gè)資源對(duì)象。
// Creater is an object that can create an instance of a RESTful object.
type Creater interface {
// New returns an empty object that can be used with Create after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
// Create creates a new version of a resource.
Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
}
// 資源對(duì)象支持GET請(qǐng)求,例如通過(guò)kubectl get 一個(gè)資源對(duì)象。
// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
// Get finds a resource in the storage by name and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}
// 支持對(duì)資源對(duì)象進(jìn)行watch操作 例如通過(guò)kubectl get 資源對(duì)象 -w。
type Watcher interface {
// 'label' selects on labels; 'field' selects on the object's fields. Not all fields
// are supported; an error should be returned if 'field' tries to select on a field that
// isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
// particular version.
Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error)
}
后續(xù)的處理中會(huì)依據(jù) Creater? ,Getter? 和 Watcher? 等接口生成對(duì)應(yīng)請(qǐng)求的handler,后文會(huì)進(jìn)行具體的分析。k8s的內(nèi)置資源存儲(chǔ)都使用 etcd,因此內(nèi)置資源的 Storage 是通過(guò) Store? 構(gòu)建。Store? 定義在 /k8s.io/apiserver/pkg/registry/generic/registry/store.go?文件中,已經(jīng)實(shí)現(xiàn) Creater? , Getter?, Watcher?等接口,其他的資源只需在初始化 Store 時(shí)傳入一些必須的參數(shù)即可,無(wú)需編寫(xiě)存儲(chǔ)層的交互代碼。下面給出了構(gòu)造 deployment 的 store 的過(guò)程,其他內(nèi)置資源大同小異。
// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
// 創(chuàng)建一個(gè)deployments的genericregistry.Store
store := &genericregistry.Store{
// 初始化一個(gè)空資源對(duì)象,這里使用的是internal的版本,下面定義的各種strategy操作的對(duì)象也是internal版本,這樣就不用為每一種版本編寫(xiě)一個(gè)strategy策略
NewFunc: func() runtime.Object { return &apps.Deployment{} },
// 初始化一個(gè)空資源對(duì)象列表
NewListFunc: func() runtime.Object { return &apps.DeploymentList{} },
DefaultQualifiedResource: apps.Resource("deployments"),
// 創(chuàng)建更新刪除策略 主要是做校驗(yàn)及控制那些字段不能被用戶覆蓋用
CreateStrategy: deployment.Strategy,
UpdateStrategy: deployment.Strategy,
DeleteStrategy: deployment.Strategy,
ResetFieldsStrategy: deployment.Strategy,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
// 繼續(xù)完成store其他屬性的初始化,比如初始化store.Storage屬性。Storage主要用于和底層存儲(chǔ)層交互
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
}
statusStore := *store
// deployment的status子資源也是使用store, 區(qū)別是更新策略不一樣, 即在update時(shí)會(huì)用舊對(duì)象的spec和lable覆蓋新對(duì)象的,防止非status字段被用戶意外覆蓋
statusStore.UpdateStrategy = deployment.StatusStrategy
statusStore.ResetFieldsStrategy = deployment.StatusStrategy
return &REST{store}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil
}
InstallAPIs? 調(diào)用鏈條比較深。參考圖1,最終會(huì)來(lái)到k8s.io/apiserver/pkg/endpoints/groupversion.go?的 InstallREST? 方法。InstallREST? 方法構(gòu)造出 handler 的前綴,創(chuàng)建APIInstaller?,然后調(diào)用installer.Install()方法繼續(xù)handler的注冊(cè)
// k8s.io/apiserver/pkg/endpoints/groupversion.go
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
// 從InstallAPI調(diào)用鏈下來(lái)這里的g.Root為/apis,這樣就可以確定handler的前綴為/apis/{goup}/{version}
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
installer.Install()? 方法會(huì)調(diào)用registerResourceHandlers? 方法,真正開(kāi)始創(chuàng)建和注冊(cè)處理請(qǐng)求的 handler,需要說(shuō)明的是a.group.Storage? 是上文提到的VersionedResourcesStorageMap? 傳入版本號(hào)后獲得的 map。讀者可以自行參考圖1的調(diào)用鏈進(jìn)行分析。a.registerResourceHandlers? 就是為每一種Storage注冊(cè)handlers
// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error
ws := a.newWebService()
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
// a.goup.Storage的簽名是 map[string]Storage, for循環(huán)的path是map的key,即資源名稱
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
...
}
registerResourceHandlers? 會(huì)依據(jù)rest.Storage實(shí)現(xiàn)的接口生成相關(guān)的action。最終根據(jù)action生成handler并注冊(cè)到rest容器中。
// k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
...
// 初始化rest容器,根目錄是APIInstaller的prefix屬性,從InstallAPI調(diào)用鏈下來(lái)值為/apis/{goup}/{version}
ws := a.newWebService()
...
// 進(jìn)行類型轉(zhuǎn)換判斷當(dāng)前的storage支持哪些類型的操作
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
// Get the list of actions for the given scope.
switch {
case !namespaceScoped:
// 構(gòu)造有無(wú)namespace資源的action
// Handle non-namespace scoped resources like nodes.
...
default:
// 構(gòu)造有namespace資源的action
// 構(gòu)造handler的注冊(cè)路徑
namespaceParamName := "namespaces"
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
namespacedPath := namespaceParamName + "/{namespace}/" + resource
namespaceParams := []*restful.Parameter{namespaceParam}
//resourcePath的值為 /namespaces/{namespace}/{resource}
resourcePath := namespacedPath
resourceParams := namespaceParams
// itemPath: /namespaces/{namespace}/{resource}/{name}
// name是請(qǐng)求資源對(duì)象的名字
itemPath := namespacedPath + "/{name}"
nameParams := append(namespaceParams, nameParam)
proxyParams := append(nameParams, pathParam)
itemPathSuffix := ""
if isSubresource {
itemPathSuffix = "/" + subresource
// 有子資源等情況下 resourcePath被定義為:/namespaces/{namespace}/{resource}/{name}/{subResource}
itemPath = itemPath + itemPathSuffix
// itemPath與resourcePath的值一樣
resourcePath = itemPath
resourceParams = nameParams
}
apiResource.Name = path
apiResource.Namespaced = true
apiResource.Kind = resourceKind
namer := handlers.ContextBasedNaming{
Namer: a.group.Namer,
ClusterScoped: false,
}
// 根據(jù)storage實(shí)現(xiàn)的接口添加添加相關(guān)的action
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
// list or post across namespace.
// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
if !isSubresource {
actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList)
}
}
...
for _, action := range actions {
...
switch action.Verb {
case "GET": // Get a resource.
var handler restful.RouteFunction
// 構(gòu)造get請(qǐng)求的handler
// restfulGetResourceWithOptions和restfulGetResource將handlers.GetResource函數(shù)轉(zhuǎn)換成restful.RouteFunction,即handler的函數(shù)簽名
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}
...
// 將handler注冊(cè)到rest容器中
// action.Path是上面定義的itemPath或resourcePath,對(duì)于GET來(lái)說(shuō)是itemPath
// 當(dāng)前注冊(cè)的handler的路徑是ws的根路徑加上ation.Path. 完整的路徑為:/apis/{goup}/{version}/namespaces/{namespace}/{resource}/{name}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
if isGetterWithOptions {
if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
return nil, nil, err
}
}
addParams(route, action.Params)
routes = append(routes, route)
}
case "LIST": // List all resources of a kind.
...
case "PUT": // Update a resource.
...
case "PATCH": // Partially update a resource
...
case "POST": // Create a resource.
...
case "DELETE": // Delete a resource.
....
}
...
}
registerResourceHandlers? 中創(chuàng)建的handler并不是直接調(diào)用Creater? ,Updater?等接口定義的方法,而是在外面包了一層代碼進(jìn)行一些額外的處理,例如對(duì)象的編解碼,admission control 的處理邏輯,針對(duì) watch 這種長(zhǎng)鏈接需要進(jìn)行協(xié)議的處理等,相關(guān)的定義在k8s.io/apiserver/pkg/endpoints/handlers包下。文本以Get和Create例,分析請(qǐng)求的處理邏輯。
Get請(qǐng)求的處理過(guò)程比較簡(jiǎn)單,通過(guò)請(qǐng)求的查詢串構(gòu)造出metav1.GetOptions ,然后交給 Getter 接口處理,最后在將查詢結(jié)果進(jìn)行轉(zhuǎn)換發(fā)回給請(qǐng)求者。
// k8s.io/apiserver/pkg/endpoints/handlers/get.go
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
// check for export
options := metav1.GetOptions{}
// 獲取查詢串
if values := req.URL.Query(); len(values) > 0 {
...
// 將查詢串解碼成metav1.GetOptions
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
}
if trace != nil {
trace.Step("About to Get from storage")
}
// 交給Getter接口處理
return r.Get(ctx, name, &options)
})
}
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
namespace, name, err := scope.Namer.Name(req)
...
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
...
result, err := getter(ctx, name, req, trace)
...
// 對(duì)處理結(jié)果進(jìn)行轉(zhuǎn)化為用戶期望的格式并寫(xiě)入到response中返回給用戶
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
}
}
Create的處理邏輯在 createHandler 中,代碼較長(zhǎng),主要做以下幾件事情:
- 對(duì)查詢串進(jìn)行解碼生成 metav1.CreateOptions 。
- 對(duì)請(qǐng)求的body體中的數(shù)據(jù)進(jìn)行解碼,生成資源對(duì)象。解碼的對(duì)象版本是 internal 版本,internal 版本是該資源對(duì)象所有版本字段的全集。針對(duì)不同版本的對(duì)象內(nèi)部可以使用相同的代碼進(jìn)行處理。
- 對(duì)對(duì)象進(jìn)行修改的準(zhǔn)入控制,判斷是否修需要修改對(duì)象。
- 交給creater接口創(chuàng)建資源對(duì)象。
- 將數(shù)據(jù)轉(zhuǎn)換為期望的格式寫(xiě)入 response 中,調(diào)用 creater 接口返回的結(jié)果仍然是 internal 版本,編碼時(shí),會(huì)編碼成用戶請(qǐng)求的版本返回給用戶。
// k8s.io/apiserver/pkg/endpoints/handlers/create.go
// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
// 從request中取出請(qǐng)求body
body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
...
// 對(duì)查詢傳進(jìn)行解碼生成metav1.CreateOptions
options := &metav1.CreateOptions{}
values := req.URL.Query()
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
...
}
// 將請(qǐng)求body解碼成資源對(duì)象, defaultGVK是用戶請(qǐng)求的版本,這里decoder解碼出來(lái)的對(duì)象是internal版本的對(duì)象
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
...
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
// 構(gòu)建調(diào)用create方法的函數(shù)
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
...
// 執(zhí)行mutation的admission操作,即在創(chuàng)建時(shí)對(duì)象進(jìn)行修改操作。
// admin在buildGenericConfig中初始化,通過(guò)config傳遞給genericsever,然后傳遞到此處
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// Dedup owner references again after mutating admission happens
dedupOwnerReferencesAndAddWarning(obj, req.Context(), true)
// 調(diào)用創(chuàng)建方法
result, err := requestFunc()
...
return result, err
})
...
// resutl也是internal版本的對(duì)象,transformResponseObject會(huì)轉(zhuǎn)換為用戶請(qǐng)求的版本并輸出
transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
}
Create請(qǐng)求的流程可以總結(jié)為下圖
圖3 create請(qǐng)求處理流程
總結(jié)
本文介紹了 K8s內(nèi)置資源的注冊(cè)過(guò)程,對(duì)APIServer的訪問(wèn)會(huì)先經(jīng)過(guò) filter,再路由給具體的 handler。filter 在 DefaultBuildHandlerChain? 中定義,主要對(duì)請(qǐng)求做超時(shí)處理,認(rèn)證,鑒權(quán)等操作。handler 的注冊(cè)則是初始化 APIGoupInfo? 并設(shè)置其 VersionedResourcesStorageMap? 后作為入?yún)ⅲ{(diào)用 GenericAPIServer.InstallAPIGroups?即可完成 handler 的注冊(cè)。k8s.io/apiserver/pkg/endpoints/handlers?包中的代碼則是對(duì)用戶請(qǐng)求做編解碼,對(duì)象版本轉(zhuǎn)換,協(xié)議處理等操作,最后在交給rest.Storage 具體實(shí)現(xiàn)的接口進(jìn)行處理。
參考
? https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_apiserver.html#kube-apiserver-處理流程[1]
? https://hackerain.me/2020/10/05/kubernetes/kube-apiserver-genericapiserver.html
? https://hackerain.me/2020/09/19/kubernetes/kube-apiserver-storage-overview.html
? https://github.com/gosoon/source-code-reading-notes/blob/master/kubernetes/kube_apiserver.md
? https://time.geekbang.org/column/article/41876