自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

API Server源碼分析之入口點(diǎn)解析

云計(jì)算 云原生
從本文開始,我們將對 K8s API Server 的代碼進(jìn)行詳細(xì)分析,并探討其應(yīng)用入口點(diǎn)、框架以及與 etcd 的通信。


Kubernetes(K8s)集群中最關(guān)鍵的組件之一是 API Server,它是所有集群管理活動的入口點(diǎn)。從本文開始,我們將對 K8s API Server 的代碼進(jìn)行詳細(xì)分析,并探討其應(yīng)用入口點(diǎn)、框架以及與 etcd 的通信。

應(yīng)用入口點(diǎn)

K8s API Server 的主要入口點(diǎn)位于 cmd/kube-apiserver/apiserver.go 文件的。

// cmd/kube-apiserver/apiserver.go 

// apiserver is the main api server and master for the cluster.
// it is responsible for serving the cluster management API.
package main

import (
"os"

"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // 用于JSON日志格式注冊
_ "k8s.io/component-base/metrics/prometheus/clientgo" // 加載所有的 prometheus client-go 插件
_ "k8s.io/component-base/metrics/prometheus/version" // 用于版本指標(biāo)注冊
"k8s.io/kubernetes/cmd/kube-apiserver/app"
)

func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}

其中的 app.NewAPIServerCommand() 是構(gòu)建的一個 cobra 的命令對象,cli.Run 然后執(zhí)行該命令即可,所以我們直接查看 NewAPIServerCommand 函數(shù)是如果構(gòu)造 cobra.Command 對象的:

// cmd/kube-apiserver/app/server.go

// NewAPIServerCommand 使用默認(rèn)參數(shù)創(chuàng)建一個 *cobra.Command 對象
func NewAPIServerCommand() *cobra.Command {
// NewServerRunOptions 使用默認(rèn)參數(shù)創(chuàng)建一個新的 ServerRunOptions 對象。
// ServerRunOption 對象是運(yùn)行 apiserver 需要的對象
s := options.NewServerRunOptions()
cmd := &cobra.Command{
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,

// ......
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()

if err := s.Logs.ValidateAndApply(); err != nil {
return err
}
cliflag.PrintFlags(fs)

err := checkNonZeroInsecurePort(fs)
if err != nil {
return err
}
// 設(shè)置默認(rèn)選項(xiàng)
completedOptions, err := Complete(s)
if err != nil {
return err
}
// 校驗(yàn)選項(xiàng)
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
}

// ......

return cmd
}

該函數(shù)最核心的功能就是使用 Complete(s) 函數(shù)來生成 apiserver 啟動需要的默認(rèn)參數(shù),然后將默認(rèn)參數(shù)傳遞給 Run 函數(shù)進(jìn)行啟動。

// cmd/kube-apiserver/app/server.go
// Run 運(yùn)行指定的 APIServer,不能退出.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {

// 創(chuàng)建服務(wù)鏈(包含的3個server組件)
server, err := CreateServerChain(completeOptions, stopCh)


// 服務(wù)啟動前的準(zhǔn)備工作,包括健康檢查、存活檢查、OpenAPI路由注冊等
prepared, err := server.PrepareRun()

// 正式啟動運(yùn)行
return prepared.Run(stopCh)
}

在 Run 函數(shù)中首先會通過 CreateServerChain 函數(shù)通過委托創(chuàng)建連接的 APIServer 對象。

// cmd/kube-apiserver/app/server.go

// CreateServerChain 通過委托創(chuàng)建連接的APIServer
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
// CreateKubeAPIServerConfig 創(chuàng)建用于運(yùn)行 APIServer 的所有配置資源,但不運(yùn)行任何資源
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)

// // 創(chuàng)建 APIExtensionsServer 配置
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))

// 創(chuàng)建APIExtensionsServer并注冊路由
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))

// 創(chuàng)建KubeAPIServer并注冊路由
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)

// // 創(chuàng)建 aggregatorServer 配置
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)

// 創(chuàng)建aggregatorServer并注冊路由
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)

return aggregatorServer, nil
}

上面的函數(shù)中可以看到 CreateServerChain 會創(chuàng)建3個 server:APIExtensionServer、KubeAPIServer、AggregratorServer,APIServer 就是依靠這3個組件來對不同類型的請求進(jìn)行處理的:

  • APIExtensionServer: 主要負(fù)責(zé)處理 CustomResourceDefinition(CRD)方面的請求。
  • KubeAPIServer: 主要負(fù)責(zé)處理 K8s 內(nèi)置資源的請求,此外還會包括通用處理、認(rèn)證、鑒權(quán)等。
  • AggregratorServer: 主要負(fù)責(zé)聚合器方面的處理,它充當(dāng)一個代理服務(wù)器,將請求轉(zhuǎn)發(fā)到聚合進(jìn)來的 K8s service 中。

圖片

創(chuàng)建每個 server 都有對應(yīng)的 config,可以看出上面函數(shù)中的 apiExtensionServer 和 aggregatorServer 的 Config 需要依賴 kubeAPIServerConfig,而這幾個 ServerConfig 都需要依賴 GenericConfig,CreateKubeAPIServerConfig 函數(shù)創(chuàng)建 kubeAPIServerConfig ,在該函數(shù)中通過調(diào)用 buildGenericConfig 來創(chuàng)建 GenericConfig 對象,如下代碼所示。

// cmd/kube-apiserver/app/server.go
// CreateKubeAPIServerConfig 創(chuàng)建用于運(yùn)行 APIServer 的所有配置資源
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
proxyTransport := CreateProxyTransport()
// 構(gòu)建通用配置
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)

// ......

config := &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
StorageFactory: storageFactory,
EventTTL: s.EventTTL,
KubeletClientConfig: s.KubeletConfig,
EnableLogsSupport: s.EnableLogsHandler,
ProxyTransport: proxyTransport,

ServiceIPRange: s.PrimaryServiceClusterIPRange,
APIServerServiceIP: s.APIServerServiceIP,
SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,

APIServerServicePort: 443,

ServiceNodePortRange: s.ServiceNodePortRange,
KubernetesServiceNodePort: s.KubernetesServiceNodePort,

EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
MasterCount: s.MasterCount,

ServiceAccountIssuer: s.ServiceAccountIssuer,
ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration,

VersionedInformers: versionedInformers,

IdentityLeaseDurationSeconds: s.IdentityLeaseDurationSeconds,
IdentityLeaseRenewIntervalSeconds: s.IdentityLeaseRenewIntervalSeconds,
},
}

// ......

return config, serviceResolver, pluginInitializers, nil
}

func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
)(...){
//創(chuàng)建一個通用配置對象
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)

// ......

//創(chuàng)建認(rèn)證實(shí)例
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}

// ...
// openapi/swagger配置,OpenAPIConfig 用于生成 OpenAPI 規(guī)范
getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)

// storageFactoryConfig 對象定義了 kube-apiserver 與 etcd 的交互方式,如:etcd認(rèn)證、地址、存儲前綴等
// 該對象也定義了資源存儲方式,如:資源信息、資源編碼信息、資源狀態(tài)等
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)

storageFactory, lastErr = completedStorageFactoryConfig.New()

if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}

// ......

// 初始化 SharedInformerFactory
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

// 認(rèn)證配置,內(nèi)部調(diào)用 authenticatorConfig.New()
// K8s提供了9種認(rèn)證機(jī)制,每種認(rèn)證機(jī)制被實(shí)例化后都成為認(rèn)證器
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}

// 創(chuàng)建鑒權(quán)實(shí)例,K8s也提供了6種授權(quán)機(jī)制,每種授權(quán)機(jī)制被實(shí)例化后都成為授權(quán)器
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)

// ...
// 審計(jì)
lastErr = s.Audit.ApplyTo(genericConfig)

// 準(zhǔn)入控制器
// k8s資源在認(rèn)證和授權(quán)通過,被持久化到etcd之前進(jìn)入準(zhǔn)入控制邏輯
// 準(zhǔn)入控制包括:對請求的資源進(jìn)行自定義操作(校驗(yàn)、修改、拒絕)
// 準(zhǔn)入控制器通過 Plugins 數(shù)據(jù)結(jié)構(gòu)統(tǒng)一注冊、存放、管理
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)

err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
feature.DefaultFeatureGate,
pluginInitializers...)

// ...

}

然后我們再來分別看看這3個 Server 是如何構(gòu)建的。

go-restful框架

這里我們就不得不先了解下 go-restful 這個框架了,因?yàn)?APIServer 就使用的這個框架。下面的代碼是 go-restful 官方的一個示例,這個 demo 了解后基本上就知道 go-restful 框架是如何使用的了:

package main

import (
"log"
"net/http"

restfulspec "github.com/emicklei/go-restful-openapi/v2"
restful "github.com/emicklei/go-restful/v3"
"github.com/go-openapi/spec"
)

// UserResource is the REST layer to the User domain
type UserResource struct {
// normally one would use DAO (data access object)
users map[string]User
}

// WebService creates a new service that can handle REST requests for User resources.
func (u UserResource) WebService() *restful.WebService {
ws := new(restful.WebService)
ws.
Path("/users").
Consumes(restful.MIME_XML, restful.MIME_JSON).
Produces(restful.MIME_JSON, restful.MIME_XML) // you can specify this per route as well

tags := []string{"users"}

ws.Route(ws.GET("/").To(u.findAllUsers).
// docs
Doc("get all users").
Metadata(restfulspec.KeyOpenAPITags, tags).
Writes([]User{}).
Returns(200, "OK", []User{}))

ws.Route(ws.GET("/{user-id}").To(u.findUser).
// docs
Doc("get a user").
Param(ws.PathParameter("user-id", "identifier of the user").DataType("integer").DefaultValue("1")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Writes(User{}). // on the response
Returns(200, "OK", User{}).
Returns(404, "Not Found", nil))

ws.Route(ws.PUT("/{user-id}").To(u.updateUser).
// docs
Doc("update a user").
Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Reads(User{})) // from the request

ws.Route(ws.PUT("").To(u.createUser).
// docs
Doc("create a user").
Metadata(restfulspec.KeyOpenAPITags, tags).
Reads(User{})) // from the request

ws.Route(ws.DELETE("/{user-id}").To(u.removeUser).
// docs
Doc("delete a user").
Metadata(restfulspec.KeyOpenAPITags, tags).
Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")))

return ws
}

// GET http://localhost:8080/users
//
func (u UserResource) findAllUsers(request *restful.Request, response *restful.Response) {
list := []User{}
for _, each := range u.users {
list = append(list, each)
}
response.WriteEntity(list)
}

// GET http://localhost:8080/users/1
//
func (u UserResource) findUser(request *restful.Request, response *restful.Response) {
id := request.PathParameter("user-id")
usr := u.users[id]
if len(usr.ID) == 0 {
response.WriteErrorString(http.StatusNotFound, "User could not be found.")
} else {
response.WriteEntity(usr)
}
}

// PUT http://localhost:8080/users/1
// <User><Id>1</Id><Name>Melissa Raspberry</Name></User>
//
func (u *UserResource) updateUser(request *restful.Request, response *restful.Response) {
usr := new(User)
err := request.ReadEntity(&usr)
if err == nil {
u.users[usr.ID] = *usr
response.WriteEntity(usr)
} else {
response.WriteError(http.StatusInternalServerError, err)
}
}

// PUT http://localhost:8080/users/1
// <User><Id>1</Id><Name>Melissa</Name></User>
//
func (u *UserResource) createUser(request *restful.Request, response *restful.Response) {
usr := User{ID: request.PathParameter("user-id")}
err := request.ReadEntity(&usr)
if err == nil {
u.users[usr.ID] = usr
response.WriteHeaderAndEntity(http.StatusCreated, usr)
} else {
response.WriteError(http.StatusInternalServerError, err)
}
}

// DELETE http://localhost:8080/users/1
//
func (u *UserResource) removeUser(request *restful.Request, response *restful.Response) {
id := request.PathParameter("user-id")
delete(u.users, id)
}

func main() {
u := UserResource{map[string]User{}}
restful.DefaultContainer.Add(u.WebService())

config := restfulspec.Config{
WebServices: restful.RegisteredWebServices(), // you control what services are visible
APIPath: "/apidocs.json",
PostBuildSwaggerObjectHandler: enrichSwaggerObject}
restful.DefaultContainer.Add(restfulspec.NewOpenAPIService(config))

// Optionally, you can install the Swagger Service which provides a nice Web UI on your REST API
// You need to download the Swagger HTML5 assets and change the FilePath location in the config below.
// Open http://localhost:8080/apidocs/?url=http://localhost:8080/apidocs.json
http.Handle("/apidocs/", http.StripPrefix("/apidocs/", http.FileServer(http.Dir("/Users/emicklei/Projects/swagger-ui/dist"))))

log.Printf("start listening on localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}

func enrichSwaggerObject(swo *spec.Swagger) {
swo.Info = &spec.Info{
InfoProps: spec.InfoProps{
Title: "UserService",
Description: "Resource for managing Users",
Contact: &spec.ContactInfo{
ContactInfoProps: spec.ContactInfoProps{
Name: "john",
Email: "john@doe.rp",
URL: "http://johndoe.org",
},
},
License: &spec.License{
LicenseProps: spec.LicenseProps{
Name: "MIT",
URL: "http://mit.org",
},
},
Version: "1.0.0",
},
}
swo.Tags = []spec.Tag{spec.Tag{TagProps: spec.TagProps{
Name: "users",
Description: "Managing users"}}}
}

// User is just a sample type
type User struct {
ID string `json:"id" description:"identifier of the user"`
Name string `json:"name" description:"name of the user" default:"john"`
Age int `json:"age" description:"age of the user" default:"21"`
}

這個示例代碼,就是使用 go-restful 的核心功能實(shí)現(xiàn)了一個簡單的 RESTful 的 API,實(shí)現(xiàn)了對 User 的增刪查改,其中有這么幾個核心概念:Container、WebService、Route。

  • Container:服務(wù)器容器,包含多個 WebService 和一個 http.ServerMux。
  • WebService:服務(wù),由多個 Route 組成,一個 WebService 其實(shí)代表某一個對象相關(guān)的服務(wù),如上例中的 /users,針對該 /users 要實(shí)現(xiàn)RESTful API,那么需要向其添加增刪查改的路由,即 Route,它是 Route 的集合。
  • Route:路由,包含了 url,http 方法,接收和響應(yīng)的媒體類型以及處理函數(shù)。每一個 Route,根據(jù) Method 和 Path,映射到對應(yīng)的方法中,即是 Method/Path 到 Function 映射關(guān)系的抽象,如上例中的 ws.Route(ws.GET("/{user-id}").To(u.findUser)),就是針對 /users/{user-id}該路徑的GET請求,則被路由到 findUser 方法中進(jìn)行處理。
  • Container 是 WebService 的集合,可以向 Container 中添加多個 WebService,而 Container 因?yàn)閷?shí)現(xiàn)了 ServeHTTP() 方法,其本質(zhì)上還是一個http Handler,可以直接用在 http Server 中。

Kubernetes 中對 go-restful 的使用比較基礎(chǔ),就使用到了其最基礎(chǔ)的路由功能,由于 K8s 有很多內(nèi)置的資源對象,也包括 CRD 這種自定義資源對象,所以一開始并不是直接將這些資源對應(yīng)對應(yīng)的接口硬編碼的,而是通過一系列代碼動態(tài)注冊的,所以接下來我們分析的其實(shí)就是想辦法讓 APIServer 能夠提供如下所示的路由處理出來:

GET   /apis/apps/v1/namespaces/{namespace}/deployments/{name}
POST /apis/apps/v1/namespaces/{namespace}/deployments

GET /apis/apps/v1/namespaces/{namespace}/daemonsets/{name}
POST /apis/apps/v1/namespaces/{namespace}/daemonsets

對 go-restful 有一個基礎(chǔ)了解后,后面就可以去了解下這3個 Server 具體是如何實(shí)例化的了。

責(zé)任編輯:姜華 來源: k8s技術(shù)圈
相關(guān)推薦

2022-01-06 07:06:52

KubernetesResourceAPI

2021-09-16 15:08:08

鴻蒙HarmonyOS應(yīng)用

2021-11-25 09:54:54

鴻蒙HarmonyOS應(yīng)用

2022-07-19 20:04:31

NAPI模塊鴻蒙

2023-11-19 20:16:43

RESTAPIPOST

2011-05-26 10:05:48

MongoDB

2010-08-03 12:53:51

FlexBuilder

2009-12-10 13:43:08

使用PHPExcel

2021-02-20 06:09:46

libtask協(xié)程鎖機(jī)制

2021-05-20 11:13:22

Linux紅外文件

2010-06-17 15:54:24

UML總結(jié)

2021-03-23 09:17:58

SpringMVCHttpServletJavaEE

2021-07-06 09:29:38

Cobar源碼AST

2024-06-13 07:55:19

2023-02-26 08:42:10

源碼demouseEffect

2012-09-20 10:07:29

Nginx源碼分析Web服務(wù)器

2017-04-10 13:26:06

Go語言源碼

2009-12-29 16:36:47

Silverlight

2011-05-26 16:18:51

Mongodb

2022-02-14 14:47:11

SystemUIOpenHarmon鴻蒙
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號