自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kubernetes API Server handler 注冊(cè)過(guò)程分析

云計(jì)算
本文以內(nèi)置資源的 handler 注冊(cè)過(guò)程為線索介紹了 APiServer 的啟動(dòng)過(guò)程和 handler 注冊(cè)過(guò)程。
作者|韓偉森,就職于中國(guó)移動(dòng)云能力中心,專注于云原生領(lǐng)域。

前言

K8s提供 Aggregated APIServer? 的擴(kuò)展方式,編寫(xiě) Aggregated APIServer? 本質(zhì)上和K8s構(gòu)建方式類似,理解 APiServer 資源的加載方式,能更好好的理解如何開(kāi)發(fā)Aggregated APIServer。本文以內(nèi)置資源的 handler 注冊(cè)過(guò)程為線索介紹了 APiServer 的啟動(dòng)過(guò)程和 handler 注冊(cè)過(guò)程。使用k8s代碼commit id為c6970e64528ba78b74bf77b86f9b78b7b61bd0cd

APIServer啟動(dòng)過(guò)程介紹

圖片

圖1 APIServer啟動(dòng)流程

圖1給出了 ApiServer 的初始化流程,首先通過(guò) CreateServerChain 構(gòu)造出3個(gè)APIServer:

  • AggregatorServer:攔截Aggregated APIServer? 中定義的資源對(duì)象請(qǐng)求,并轉(zhuǎn)發(fā)給相關(guān)的Aggregated APIServer 處理。
  • KubeAPIServer:用于處理 k8s 的內(nèi)建資源,如:Deployment,ConfigMap 等。
  • APIExtensionServer:負(fù)責(zé)處理用戶自定義資源。

它們之間的處理順序?yàn)槿缦聢D所示,當(dāng)用戶請(qǐng)求進(jìn)來(lái),先判斷 AggregatorServer? 能否處理,否則代理給 kubeApiServer? ,如果 kubeApiServer? 不能處代理給 ApiExtensionServer 處理,如果都不能處理則交給 notFoundHandler 處理。

圖片

圖2 三種 APIServer 請(qǐng)求順序

限于篇幅原因,本文主要分析 kubeapiserver 的啟動(dòng)過(guò)程。

CreateApiServerConfig? 通過(guò)調(diào)用 buildGenericConfig? 構(gòu)建 genericapiserver.Config。genericapiserver.Config? 中包含了啟動(dòng)Genericapiserver? 所需要的配置信息,比如:RequestTimeout? 定義了請(qǐng)求的超時(shí)時(shí)間,AdmissionControl? 對(duì)象進(jìn)行準(zhǔn)入控制。buildGenericConfig? 中需要注意的是 BuildHandlerChainFunc?,請(qǐng)求在路由給資源對(duì)象的handler前先經(jīng)過(guò)的BuildHandlerChainFunc? 中定義的 Filter? 。參考圖1,通過(guò)深入 buildGenericConfig? 可以發(fā)現(xiàn) BuildHandlerChainFunc? 傳入的是 DefaultBuildHandlerChain? ,其中 Filter 先定義的后調(diào)用。

// k8s.io/apiserver/pkg/server/config.go

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
// 構(gòu)造權(quán)限檢查filter
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
...
// 構(gòu)造認(rèn)證filter
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
...
// 構(gòu)造請(qǐng)求超時(shí)filter, LongRunningFunc會(huì)判斷該請(qǐng)求是否是需要LongRunning的,比如watch的請(qǐng)求,如果是,該filter不會(huì)對(duì)這類請(qǐng)求生效
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)

handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
...
// 初始化RequestInfo的filter并將其放入context中,后續(xù)的處理邏輯可以從context直接獲取RequestInfo
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
....
return handler
}

CreateKubeAPIServer? 中調(diào)用了kubeAPIServerConfig.Complete().New?構(gòu)造出了 kubeAPIServer? 的 GenericServer。kubeAPIServerConfig.Complete().New?中通過(guò)調(diào)用 m.InstallLegacyAPI? 初始化核心資源并添加進(jìn)路由中,對(duì)應(yīng)的是以 api 開(kāi)頭的資源,如:Pod,ConfigMap 等。調(diào)用 m.InstallAPI 初始化以 apis 開(kāi)頭的內(nèi)置資源如:Deployment。

handler的注冊(cè)過(guò)程

從圖1可以看出 InstallAPI? 與 InstallLegacyAPI? 的創(chuàng)建過(guò)程基本類似,本文主要介紹 InstallAPI 的初始化過(guò)程。

在調(diào)用 InstallAPI? 之前kubeAPIServerConfig.Complete().New?會(huì)先創(chuàng)建內(nèi)置資源對(duì)象的RESTStorageProvider? 作為 InstallAPI 的入?yún)?/p>

//pkg/controlplane/instance.go

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
...
// 構(gòu)造內(nèi)置資源的RESTStorageProvider
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
...
}

RESTStorageProvider? 是一個(gè)接口,通過(guò)其 NewRESTStorage? 構(gòu)造出 APIGroupInfo? ,APIGroupInfo? 包含注冊(cè)資源所需的基本信息比如編解碼器,組下所有資源的 Storage 對(duì)象VersionedResourcesStorageMap。

//k8s.io/apiserver/pkg/server/genericapiserver.go

// Info about an API group.
type APIGroupInfo struct {
PrioritizedVersions []schema.GroupVersion
// Info about the resources in this group. It's a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage
...
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
...
}

VersionedResourcesStorageMap? 需要重點(diǎn)注意,編寫(xiě) Aggregated APIServer?主要邏輯是通過(guò) NewDefaultAPIGroupInfo? 初始化 APIGroupInfo? 以后設(shè)置 VersionedResourcesStorageMap? 屬性。VersionedResourcesStorageMap?的簽名是 map[string]map[string]rest.Storage?。第一個(gè)key是版本號(hào),第二個(gè)key是資源名稱,資源名稱可以是 deployment 這種資源,同時(shí)也能是子資源如 pod/status? , pod/log? 等是pod的子資源有單獨(dú)的storage。最終構(gòu)建handler的請(qǐng)求路徑是基于 VersionedResourcesStorageMap? 中提供的版本號(hào)和資源名稱確定的 。rest.Storage 用于處理具體的請(qǐng)求,其聲明如下:

// k8s.io/apiserver/pkg/registry/rest/rest.go

// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object

// Destroy cleans up its resources on shutdown.
// Destroy has to be implemented in thread-safe way and be prepared
// for being called more than once.
Destroy()
}

實(shí)現(xiàn) rest.Storage? 的接口最基本的,如果需要支持不同的請(qǐng)求,還需要實(shí)現(xiàn)其他的接口,相關(guān)定義在 k8s.io/apiserver/pkg/registry/rest/rest.go中,如:

// k8s.io/apiserver/pkg/registry/rest/rest.go

// 資源對(duì)象支持POST請(qǐng)求,例入通過(guò)kubectl create一個(gè)資源對(duì)象。
// Creater is an object that can create an instance of a RESTful object.
type Creater interface {
// New returns an empty object that can be used with Create after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object

// Create creates a new version of a resource.
Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
}
// 資源對(duì)象支持GET請(qǐng)求,例如通過(guò)kubectl get 一個(gè)資源對(duì)象。
// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
// Get finds a resource in the storage by name and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}
// 支持對(duì)資源對(duì)象進(jìn)行watch操作 例如通過(guò)kubectl get 資源對(duì)象 -w。
type Watcher interface {
// 'label' selects on labels; 'field' selects on the object's fields. Not all fields
// are supported; an error should be returned if 'field' tries to select on a field that
// isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
// particular version.
Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error)
}

后續(xù)的處理中會(huì)依據(jù) Creater? ,Getter? 和 Watcher? 等接口生成對(duì)應(yīng)請(qǐng)求的handler,后文會(huì)進(jìn)行具體的分析。k8s的內(nèi)置資源存儲(chǔ)都使用 etcd,因此內(nèi)置資源的 Storage 是通過(guò) Store? 構(gòu)建。Store? 定義在 /k8s.io/apiserver/pkg/registry/generic/registry/store.go?文件中,已經(jīng)實(shí)現(xiàn) Creater? , Getter?, Watcher?等接口,其他的資源只需在初始化 Store 時(shí)傳入一些必須的參數(shù)即可,無(wú)需編寫(xiě)存儲(chǔ)層的交互代碼。下面給出了構(gòu)造 deployment 的 store 的過(guò)程,其他內(nèi)置資源大同小異。

// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
// 創(chuàng)建一個(gè)deployments的genericregistry.Store
store := &genericregistry.Store{
// 初始化一個(gè)空資源對(duì)象,這里使用的是internal的版本,下面定義的各種strategy操作的對(duì)象也是internal版本,這樣就不用為每一種版本編寫(xiě)一個(gè)strategy策略
NewFunc: func() runtime.Object { return &apps.Deployment{} },
// 初始化一個(gè)空資源對(duì)象列表
NewListFunc: func() runtime.Object { return &apps.DeploymentList{} },

DefaultQualifiedResource: apps.Resource("deployments"),
// 創(chuàng)建更新刪除策略 主要是做校驗(yàn)及控制那些字段不能被用戶覆蓋用
CreateStrategy: deployment.Strategy,
UpdateStrategy: deployment.Strategy,
DeleteStrategy: deployment.Strategy,
ResetFieldsStrategy: deployment.Strategy,

TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
// 繼續(xù)完成store其他屬性的初始化,比如初始化store.Storage屬性。Storage主要用于和底層存儲(chǔ)層交互
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
}

statusStore := *store
// deployment的status子資源也是使用store, 區(qū)別是更新策略不一樣, 即在update時(shí)會(huì)用舊對(duì)象的spec和lable覆蓋新對(duì)象的,防止非status字段被用戶意外覆蓋
statusStore.UpdateStrategy = deployment.StatusStrategy
statusStore.ResetFieldsStrategy = deployment.StatusStrategy
return &REST{store}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil
}

InstallAPIs? 調(diào)用鏈條比較深。參考圖1,最終會(huì)來(lái)到k8s.io/apiserver/pkg/endpoints/groupversion.go?的 InstallREST? 方法。InstallREST? 方法構(gòu)造出 handler 的前綴,創(chuàng)建APIInstaller?,然后調(diào)用installer.Install()方法繼續(xù)handler的注冊(cè)

// k8s.io/apiserver/pkg/endpoints/groupversion.go

func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
// 從InstallAPI調(diào)用鏈下來(lái)這里的g.Root為/apis,這樣就可以確定handler的前綴為/apis/{goup}/{version}
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}

apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}

installer.Install()? 方法會(huì)調(diào)用registerResourceHandlers? 方法,真正開(kāi)始創(chuàng)建和注冊(cè)處理請(qǐng)求的 handler,需要說(shuō)明的是a.group.Storage? 是上文提到的VersionedResourcesStorageMap? 傳入版本號(hào)后獲得的 map。讀者可以自行參考圖1的調(diào)用鏈進(jìn)行分析。a.registerResourceHandlers? 就是為每一種Storage注冊(cè)handlers

// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error
ws := a.newWebService()

// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
// a.goup.Storage的簽名是 map[string]Storage, for循環(huán)的path是map的key,即資源名稱
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
...
}

registerResourceHandlers? 會(huì)依據(jù)rest.Storage實(shí)現(xiàn)的接口生成相關(guān)的action。最終根據(jù)action生成handler并注冊(cè)到rest容器中。

// k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
...
// 初始化rest容器,根目錄是APIInstaller的prefix屬性,從InstallAPI調(diào)用鏈下來(lái)值為/apis/{goup}/{version}
ws := a.newWebService()
...
// 進(jìn)行類型轉(zhuǎn)換判斷當(dāng)前的storage支持哪些類型的操作
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)

// Get the list of actions for the given scope.
switch {
case !namespaceScoped:
// 構(gòu)造有無(wú)namespace資源的action
// Handle non-namespace scoped resources like nodes.
...
default:
// 構(gòu)造有namespace資源的action
// 構(gòu)造handler的注冊(cè)路徑
namespaceParamName := "namespaces"
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
namespacedPath := namespaceParamName + "/{namespace}/" + resource
namespaceParams := []*restful.Parameter{namespaceParam}

//resourcePath的值為 /namespaces/{namespace}/{resource}
resourcePath := namespacedPath
resourceParams := namespaceParams
// itemPath: /namespaces/{namespace}/{resource}/{name}
// name是請(qǐng)求資源對(duì)象的名字
itemPath := namespacedPath + "/{name}"
nameParams := append(namespaceParams, nameParam)
proxyParams := append(nameParams, pathParam)
itemPathSuffix := ""
if isSubresource {
itemPathSuffix = "/" + subresource
// 有子資源等情況下 resourcePath被定義為:/namespaces/{namespace}/{resource}/{name}/{subResource}
itemPath = itemPath + itemPathSuffix
// itemPath與resourcePath的值一樣
resourcePath = itemPath
resourceParams = nameParams
}
apiResource.Name = path
apiResource.Namespaced = true
apiResource.Kind = resourceKind
namer := handlers.ContextBasedNaming{
Namer: a.group.Namer,
ClusterScoped: false,
}
// 根據(jù)storage實(shí)現(xiàn)的接口添加添加相關(guān)的action
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)

// list or post across namespace.
// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
if !isSubresource {
actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList)
}
}
...
for _, action := range actions {
...
switch action.Verb {
case "GET": // Get a resource.
var handler restful.RouteFunction
// 構(gòu)造get請(qǐng)求的handler
// restfulGetResourceWithOptions和restfulGetResource將handlers.GetResource函數(shù)轉(zhuǎn)換成restful.RouteFunction,即handler的函數(shù)簽名
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}
...
// 將handler注冊(cè)到rest容器中
// action.Path是上面定義的itemPath或resourcePath,對(duì)于GET來(lái)說(shuō)是itemPath
// 當(dāng)前注冊(cè)的handler的路徑是ws的根路徑加上ation.Path. 完整的路徑為:/apis/{goup}/{version}/namespaces/{namespace}/{resource}/{name}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
if isGetterWithOptions {
if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
return nil, nil, err
}
}
addParams(route, action.Params)
routes = append(routes, route)
}
case "LIST": // List all resources of a kind.
...
case "PUT": // Update a resource.
...
case "PATCH": // Partially update a resource
...
case "POST": // Create a resource.
...
case "DELETE": // Delete a resource.
....
}
...

}

registerResourceHandlers? 中創(chuàng)建的handler并不是直接調(diào)用Creater? ,Updater?等接口定義的方法,而是在外面包了一層代碼進(jìn)行一些額外的處理,例如對(duì)象的編解碼,admission control 的處理邏輯,針對(duì) watch 這種長(zhǎng)鏈接需要進(jìn)行協(xié)議的處理等,相關(guān)的定義在k8s.io/apiserver/pkg/endpoints/handlers包下。文本以Get和Create例,分析請(qǐng)求的處理邏輯。

Get請(qǐng)求的處理過(guò)程比較簡(jiǎn)單,通過(guò)請(qǐng)求的查詢串構(gòu)造出metav1.GetOptions ,然后交給 Getter 接口處理,最后在將查詢結(jié)果進(jìn)行轉(zhuǎn)換發(fā)回給請(qǐng)求者。

// k8s.io/apiserver/pkg/endpoints/handlers/get.go

// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
// check for export
options := metav1.GetOptions{}
// 獲取查詢串
if values := req.URL.Query(); len(values) > 0 {
...
// 將查詢串解碼成metav1.GetOptions
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
}
if trace != nil {
trace.Step("About to Get from storage")
}
// 交給Getter接口處理
return r.Get(ctx, name, &options)
})
}

// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
namespace, name, err := scope.Namer.Name(req)
...
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
...
result, err := getter(ctx, name, req, trace)
...
// 對(duì)處理結(jié)果進(jìn)行轉(zhuǎn)化為用戶期望的格式并寫(xiě)入到response中返回給用戶
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
}
}

Create的處理邏輯在 createHandler 中,代碼較長(zhǎng),主要做以下幾件事情:

  • 對(duì)查詢串進(jìn)行解碼生成 metav1.CreateOptions 。
  • 對(duì)請(qǐng)求的body體中的數(shù)據(jù)進(jìn)行解碼,生成資源對(duì)象。解碼的對(duì)象版本是 internal 版本,internal 版本是該資源對(duì)象所有版本字段的全集。針對(duì)不同版本的對(duì)象內(nèi)部可以使用相同的代碼進(jìn)行處理。
  • 對(duì)對(duì)象進(jìn)行修改的準(zhǔn)入控制,判斷是否修需要修改對(duì)象。
  • 交給creater接口創(chuàng)建資源對(duì)象。
  • 將數(shù)據(jù)轉(zhuǎn)換為期望的格式寫(xiě)入 response 中,調(diào)用 creater 接口返回的結(jié)果仍然是 internal 版本,編碼時(shí),會(huì)編碼成用戶請(qǐng)求的版本返回給用戶。
// k8s.io/apiserver/pkg/endpoints/handlers/create.go

// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}

func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
// 從request中取出請(qǐng)求body
body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
...
// 對(duì)查詢傳進(jìn)行解碼生成metav1.CreateOptions
options := &metav1.CreateOptions{}
values := req.URL.Query()
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
...
}
// 將請(qǐng)求body解碼成資源對(duì)象, defaultGVK是用戶請(qǐng)求的版本,這里decoder解碼出來(lái)的對(duì)象是internal版本的對(duì)象
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
...
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
// 構(gòu)建調(diào)用create方法的函數(shù)
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
...
// 執(zhí)行mutation的admission操作,即在創(chuàng)建時(shí)對(duì)象進(jìn)行修改操作。
// admin在buildGenericConfig中初始化,通過(guò)config傳遞給genericsever,然后傳遞到此處
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// Dedup owner references again after mutating admission happens
dedupOwnerReferencesAndAddWarning(obj, req.Context(), true)
// 調(diào)用創(chuàng)建方法
result, err := requestFunc()
...
return result, err
})
...
// resutl也是internal版本的對(duì)象,transformResponseObject會(huì)轉(zhuǎn)換為用戶請(qǐng)求的版本并輸出
transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
}

Create請(qǐng)求的流程可以總結(jié)為下圖

圖片

圖3 create請(qǐng)求處理流程

總結(jié)

本文介紹了 K8s內(nèi)置資源的注冊(cè)過(guò)程,對(duì)APIServer的訪問(wèn)會(huì)先經(jīng)過(guò) filter,再路由給具體的 handler。filter 在 DefaultBuildHandlerChain? 中定義,主要對(duì)請(qǐng)求做超時(shí)處理,認(rèn)證,鑒權(quán)等操作。handler 的注冊(cè)則是初始化 APIGoupInfo? 并設(shè)置其 VersionedResourcesStorageMap? 后作為入?yún)ⅲ{(diào)用 GenericAPIServer.InstallAPIGroups?即可完成 handler 的注冊(cè)。k8s.io/apiserver/pkg/endpoints/handlers?包中的代碼則是對(duì)用戶請(qǐng)求做編解碼,對(duì)象版本轉(zhuǎn)換,協(xié)議處理等操作,最后在交給rest.Storage 具體實(shí)現(xiàn)的接口進(jìn)行處理。

參考

? https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_apiserver.html#kube-apiserver-處理流程[1]

? https://hackerain.me/2020/10/05/kubernetes/kube-apiserver-genericapiserver.html

? https://hackerain.me/2020/09/19/kubernetes/kube-apiserver-storage-overview.html

? https://github.com/gosoon/source-code-reading-notes/blob/master/kubernetes/kube_apiserver.md

? https://time.geekbang.org/column/article/41876

責(zé)任編輯:未麗燕 來(lái)源: 云原生社區(qū)動(dòng)態(tài)
相關(guān)推薦

2022-01-06 07:06:52

KubernetesResourceAPI

2024-06-26 00:22:35

2023-11-29 16:21:30

Kubernetes服務(wù)注冊(cè)

2012-02-20 14:47:08

JavaPlay

2023-03-17 07:53:20

K8sAPIServerKubernetes

2022-06-21 08:12:17

K8sAPI對(duì)象Kubernetes

2022-07-01 17:57:45

KubernetesAPI

2016-10-21 13:03:18

androidhandlerlooper

2022-06-07 16:17:45

KubernetesAPI Schema

2023-11-07 07:08:57

2024-01-30 07:58:41

KubernetesGAMMA網(wǎng)關(guān)

2021-10-15 08:27:14

Kubernetes 工具Mizu

2011-08-29 10:55:03

SQL Server分頁(yè)存儲(chǔ)過(guò)程優(yōu)化效率分

2010-07-15 12:38:14

SQL Server存

2014-01-06 16:51:06

Mesos注冊(cè)

2009-07-10 11:28:39

2021-07-12 08:00:21

Nacos 服務(wù)注冊(cè)源碼分析

2022-01-06 07:46:01

Traefik 開(kāi)源Gateway API

2015-08-10 14:41:39

Kubernetes監(jiān)控開(kāi)源容器管理

2016-06-15 10:35:59

云計(jì)算
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)