Skip to content

Commit

Permalink
Load initializers from dynamic config
Browse files Browse the repository at this point in the history
Handle failure cases on startup gracefully to avoid causing cascading
errors and poor initialization in other components. Initial errors from
config load cause the initializer to pause and hold requests. Return
typed errors to better communicate failures to clients.

Add code to handle two specific cases - admin wants to bypass
initialization defaulting, and mirror pods (which want to bypass
initialization because the kubelet owns their lifecycle).
  • Loading branch information
smarterclayton committed Jun 5, 2017
1 parent 034f06d commit 772ab8e
Show file tree
Hide file tree
Showing 23 changed files with 617 additions and 62 deletions.
1 change: 1 addition & 0 deletions cmd/kube-apiserver/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/networking:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/cloudprovider:go_default_library",
Expand Down
10 changes: 8 additions & 2 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/networking"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/cloudprovider"
Expand Down Expand Up @@ -379,6 +380,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
}
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)

genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
Expand All @@ -397,6 +402,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
pluginInitializer, err := BuildAdmissionPluginInitializer(
s,
client,
externalClient,
sharedInformers,
genericConfig.Authorizer,
)
Expand All @@ -414,7 +420,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
}

// BuildAdmissionPluginInitializer constructs the admission plugin initializer
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
var cloudConfig []byte

if s.CloudProvider.CloudConfigFile != "" {
Expand All @@ -432,7 +438,7 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna
// do not require us to open watches for all items tracked by quota.
quotaRegistry := quotainstall.NewRegistry(nil, nil)

pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry)
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry)

// Read client cert/key for plugins that need to make calls out
if len(s.ProxyClientCertFile) > 0 && len(s.ProxyClientKeyFile) > 0 {
Expand Down
1 change: 1 addition & 0 deletions federation/cmd/federation-apiserver/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/install:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/cloudprovider/providers:go_default_library",
Expand Down
7 changes: 6 additions & 1 deletion federation/cmd/federation-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/generated/openapi"
Expand Down Expand Up @@ -180,6 +181,10 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("failed to create clientset: %v", err)
}
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return fmt.Errorf("failed to create external clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)

authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
Expand All @@ -199,7 +204,7 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
// NOTE: we do not provide informers to the quota registry because admission level decisions
// do not require us to open watches for all items tracked by quota.
quotaRegistry := quotainstall.NewRegistry(nil, nil)
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry)
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry)

err = s.Admission.ApplyTo(
genericConfig,
Expand Down
1 change: 1 addition & 0 deletions pkg/kubeapiserver/admission/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/apis/admissionregistration:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/quota:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubeapiserver/admission/configuration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
Expand Down
21 changes: 19 additions & 2 deletions pkg/kubeapiserver/admission/configuration/configuration_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const (
defaultFailureThreshold = 5
)

var (
ErrNotReady = fmt.Errorf("configuration is not ready")
ErrDisabled = fmt.Errorf("disabled")
)

type getFunc func() (runtime.Object, error)

// When running, poller calls `get` every `interval`. If `get` is
Expand All @@ -52,7 +57,8 @@ type poller struct {
ready bool
mergedConfiguration runtime.Object
// lock much be hold when reading ready or mergedConfiguration
lock sync.RWMutex
lock sync.RWMutex
lastErr error
}

func newPoller(get getFunc) *poller {
Expand All @@ -63,6 +69,12 @@ func newPoller(get getFunc) *poller {
}
}

func (a *poller) lastError(err error) {
a.lock.Lock()
defer a.lock.Unlock()
a.lastErr = err
}

func (a *poller) notReady() {
a.lock.Lock()
defer a.lock.Unlock()
Expand All @@ -73,7 +85,10 @@ func (a *poller) configuration() (runtime.Object, error) {
a.lock.RLock()
defer a.lock.RUnlock()
if !a.ready {
return nil, fmt.Errorf("configuration is not ready")
if a.lastErr != nil {
return nil, a.lastErr
}
return nil, ErrNotReady
}
return a.mergedConfiguration, nil
}
Expand All @@ -83,6 +98,7 @@ func (a *poller) setConfigurationAndReady(value runtime.Object) {
defer a.lock.Unlock()
a.mergedConfiguration = value
a.ready = true
a.lastErr = nil
}

func (a *poller) Run(stopCh <-chan struct{}) {
Expand All @@ -93,6 +109,7 @@ func (a *poller) sync() {
configuration, err := a.get()
if err != nil {
a.failures++
a.lastError(err)
if a.failures >= a.failureThreshold {
a.notReady()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ type ExternalAdmissionHookConfigurationManager struct {
*poller
}

func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager {
getFn := func() (runtime.Object, error) {
list, err := c.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
return mergeExternalAdmissionHookConfigurations(list), nil
}

return &ExternalAdmissionHookConfigurationManager{
newPoller(getFn),
}
}

// ExternalAdmissionHooks returns the merged ExternalAdmissionHookConfiguration.
func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*v1alpha1.ExternalAdmissionHookConfiguration, error) {
configuration, err := im.poller.configuration()
Expand All @@ -46,17 +60,8 @@ func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*
return externalAdmissionHookConfiguration, nil
}

func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager {
getFn := func() (runtime.Object, error) {
list, err := c.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
return mergeExternalAdmissionHookConfigurations(list), nil
}

return &ExternalAdmissionHookConfigurationManager{
newPoller(getFn)}
func (im *ExternalAdmissionHookConfigurationManager) Run(stopCh <-chan struct{}) {
im.poller.Run(stopCh)
}

func mergeExternalAdmissionHookConfigurations(
Expand Down
32 changes: 22 additions & 10 deletions pkg/kubeapiserver/admission/configuration/initializer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"reflect"
"sort"

"github.com/golang/glog"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
Expand All @@ -34,6 +37,23 @@ type InitializerConfigurationManager struct {
*poller
}

func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager {
getFn := func() (runtime.Object, error) {
list, err := c.List(metav1.ListOptions{})
if err != nil {
if errors.IsNotFound(err) || errors.IsForbidden(err) {
glog.V(5).Infof("Initializers are disabled due to an error: %v", err)
return nil, ErrDisabled
}
return nil, err
}
return mergeInitializerConfigurations(list), nil
}
return &InitializerConfigurationManager{
newPoller(getFn),
}
}

// Initializers returns the merged InitializerConfiguration.
func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.InitializerConfiguration, error) {
configuration, err := im.poller.configuration()
Expand All @@ -47,16 +67,8 @@ func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.Initializer
return initializerConfiguration, nil
}

func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager {
getFn := func() (runtime.Object, error) {
list, err := c.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
return mergeInitializerConfigurations(list), nil
}
return &InitializerConfigurationManager{
newPoller(getFn)}
func (im *InitializerConfigurationManager) Run(stopCh <-chan struct{}) {
im.poller.Run(stopCh)
}

func mergeInitializerConfigurations(initializerConfigurationList *v1alpha1.InitializerConfigurationList) *v1alpha1.InitializerConfiguration {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubeapiserver/admission/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var _ WantsAuthorizer = &WantAuthorizerAdmission{}
// TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer
// interface is implemented.
func TestWantsAuthorizer(t *testing.T) {
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, nil, nil, nil)
initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, nil, nil, nil)
wantAuthorizerAdmission := &WantAuthorizerAdmission{}
initializer.Initialize(wantAuthorizerAdmission)
if wantAuthorizerAdmission.auth == nil {
Expand All @@ -76,7 +76,7 @@ func (self *WantsCloudConfigAdmissionPlugin) SetCloudConfig(cloudConfig []byte)

func TestCloudConfigAdmissionPlugin(t *testing.T) {
cloudConfig := []byte("cloud-configuration")
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil)
initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil)
wantsCloudConfigAdmission := &WantsCloudConfigAdmissionPlugin{}
initializer.Initialize(wantsCloudConfigAdmission)

Expand Down
20 changes: 18 additions & 2 deletions pkg/kubeapiserver/admission/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/kubernetes/pkg/apis/admissionregistration"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/quota"
Expand All @@ -36,6 +37,12 @@ type WantsInternalKubeClientSet interface {
admission.Validator
}

// WantsExternalKubeClientSet defines a function which sets ClientSet for admission plugins that need it
type WantsExternalKubeClientSet interface {
SetExternalKubeClientSet(clientset.Interface)
admission.Validator
}

// WantsInternalKubeInformerFactory defines a function which sets InformerFactory for admission plugins that need it
type WantsInternalKubeInformerFactory interface {
SetInternalKubeInformerFactory(informers.SharedInformerFactory)
Expand Down Expand Up @@ -95,6 +102,7 @@ type WebhookSource interface {

type PluginInitializer struct {
internalClient internalclientset.Interface
externalClient clientset.Interface
informers informers.SharedInformerFactory
authorizer authorizer.Authorizer
cloudConfig []byte
Expand All @@ -113,14 +121,18 @@ var _ admission.PluginInitializer = &PluginInitializer{}
// NewPluginInitializer constructs new instance of PluginInitializer
// TODO: switch these parameters to use the builder pattern or just make them
// all public, this construction method is pointless boilerplate.
func NewPluginInitializer(internalClient internalclientset.Interface,
func NewPluginInitializer(
internalClient internalclientset.Interface,
externalClient clientset.Interface,
sharedInformers informers.SharedInformerFactory,
authz authorizer.Authorizer,
cloudConfig []byte,
restMapper meta.RESTMapper,
quotaRegistry quota.Registry) *PluginInitializer {
quotaRegistry quota.Registry,
) *PluginInitializer {
return &PluginInitializer{
internalClient: internalClient,
externalClient: externalClient,
informers: sharedInformers,
authorizer: authz,
cloudConfig: cloudConfig,
Expand Down Expand Up @@ -157,6 +169,10 @@ func (i *PluginInitializer) Initialize(plugin admission.Interface) {
wants.SetInternalKubeClientSet(i.internalClient)
}

if wants, ok := plugin.(WantsExternalKubeClientSet); ok {
wants.SetExternalKubeClientSet(i.externalClient)
}

if wants, ok := plugin.(WantsInternalKubeInformerFactory); ok {
wants.SetInternalKubeInformerFactory(i.informers)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/pkg/admission/gc/gc_admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newGCPermissionsEnforcement() *gcPermissionsEnforcement {
Handler: admission.NewHandler(admission.Create, admission.Update),
whiteList: whiteList,
}
pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil)
pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil)
pluginInitializer.Initialize(gcAdmit)
return gcAdmit
}
Expand Down
17 changes: 17 additions & 0 deletions plugin/pkg/admission/initialization/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)

go_library(
name = "go_default_library",
srcs = ["initialization.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubeapiserver/admission/configuration:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/validation:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
],
Expand All @@ -36,3 +42,14 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)

go_test(
name = "go_default_test",
srcs = ["initialization_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
],
)
Loading

0 comments on commit 772ab8e

Please sign in to comment.