k8s APIServer源码: 服务启动

基于版本 1.6.7

启动流程

  • cmd/kube-apiserver/apiserver.go
func main() {
   app.Run(s)
}
  • cmd/kube-apiserver/app/server.go
func Run(s *options.ServerRunOptions) error {
	// 构建master配置信息
	config, sharedInformers, err := BuildMasterConfig(s)
	// 调用RunServer
	return RunServer(config, sharedInformers, wait.NeverStop)
}

func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
	// 执行相关初始化
	m, err := config.Complete().New()     // => TO: Container初始化
	// 启动
	return m.GenericAPIServer.PrepareRun().Run(stopCh)  // => next
}
  • vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

启动主体函数都在这个文件中, 绑定地址/端口号, 并最终启动

func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
	s.NonBlockingRun(stopCh)
}

func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
	s.serveSecurely(internalStopCh)
	// or
	s.serveInsecurely(internalStopCh) // => next
}
  • vendor/k8s.io/apiserver/pkg/server/serve.go
func (s *GenericAPIServer) serveInsecurely(stopCh <-chan struct{}) error {
	insecureServer := &http.Server{
		Addr:           s.InsecureServingInfo.BindAddress,
		Handler:        s.InsecureHandler,   // s.Hnalder for secure
		MaxHeaderBytes: 1 << 20,
	}
   runServer(insecureServer, s.InsecureServingInfo.BindNetwork, stopCh) // => next
}


func runServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) {
	go func() {
		for {
			var listener net.Listener
			listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
			// *http.Server
			err := server.Serve(listener)
			}
	}()
}

Container初始化

  • cmd/kube-apiserver/app/server.go
func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
	// 执行相关初始化
	m, err := config.Complete().New()     // => TO: Container初始化
	// 启动
	return m.GenericAPIServer.PrepareRun().Run(stopCh)  // => next
}
  • kubernetes/pkg/master/master.go
func (c completedConfig) New() (*Master, error) {
   // m.GenericAPIServer.HandlerContainer = APIContainer,   APIContainer.Container =  restful.NewContainer()
	s, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time
   m := &Master{
		GenericAPIServer: s,
	}
}
  • vendor/k8s.io/apiserver/pkg/server/config.go

到这里, 完成了 s.Handler, s.InsecureHandler 的初始化

func (c completedConfig) New() (*GenericAPIServer, error) {
  s := &GenericAPIServer{
  }
  // s.HandlerContainer = APIContainer
  	s.HandlerContainer = mux.NewAPIContainer(http.NewServeMux(), c.Serializer)  // => next 1

  // 生成 Handler
  	s.Handler, s.InsecureHandler = c.BuildHandlerChainsFunc(s.HandlerContainer.ServeMux, c.Config)  // => next 2
}
  • 1: vendor/k8s.io/apiserver/pkg/server/mux/container.go

新建一个APIContainer, 包含

// NewAPIContainer constructs a new container for APIs
func NewAPIContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer) *APIContainer {
	c := APIContainer{
		// 新建一个Container
		Container: restful.NewContainer(),
		NonSwaggerRoutes: PathRecorderMux{
			mux: mux,
		},
		UnlistedRoutes: mux,
	}
	// 配置 http.ServerMux
	c.Container.ServeMux = mux
	// 配置路由方式, 使用CurlyRouter
	c.Container.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
	return &c
}
  • 2: vendor/k8s.io/apiserver/pkg/server/config.go
type Config struct {
	BuildHandlerChainsFunc func(apiHandler http.Handler, c *Config) (secure, insecure http.Handler)
}


func NewConfig() *Config {
	return &Config{
			BuildHandlerChainsFunc:      DefaultBuildHandlerChain,
	}
}


func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insecure http.Handler) {
	return generic(protect(apiHandler)), generic(audit(apiHandler)) // add filters to handler
}

注意, 这里传递的参数是: s.HandlerContainer.ServeMux, DefaultBuildHandlerChain的参数是apiHandler http.Handler, 前者包含后者interface定义的方法.

  • net/http/server.go
type Handler interface {
        ServeHTTP(ResponseWriter, *Request)
}

// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
	if r.RequestURI == "*" {
		if r.ProtoAtLeast(1, 1) {
			w.Header().Set("Connection", "close")
		}
		w.WriteHeader(StatusBadRequest)
		return
	}
	h, _ := mux.Handler(r)
	h.ServeHTTP(w, r)
}

初始化后, Hnalder 以及 InsecureHandler赋值Container, 然后在new Server前, 将handler放入

&http.Server{
		Addr:           s.InsecureServingInfo.BindAddress,
		Handler:        s.InsecureHandler,   // s.Hanlder for secure
		MaxHeaderBytes: 1 << 20,
}

paas

710 Words

2017-09-23 13:00 +0800