diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index c746ba891942c..53d700e5eadc1 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -25,6 +25,7 @@ go_library( "//pkg/apis/extensions:go_default_library", "//pkg/apis/networking:go_default_library", "//pkg/capabilities:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/cloudprovider:go_default_library", diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 1f12cf27e575b..6335dfe79ecf0 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/networking" "k8s.io/kubernetes/pkg/capabilities" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/cloudprovider" @@ -380,6 +381,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, // TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions) } + externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err) + } sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers) @@ -398,6 +403,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, pluginInitializer, err := BuildAdmissionPluginInitializer( s, client, + externalClient, sharedInformers, genericConfig.Authorizer, ) @@ -415,7 +421,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, } // BuildAdmissionPluginInitializer constructs the admission plugin initializer -func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) { +func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) { var cloudConfig []byte if s.CloudProvider.CloudConfigFile != "" { @@ -433,7 +439,7 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna // do not require us to open watches for all items tracked by quota. quotaRegistry := quotainstall.NewRegistry(nil, nil) - pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry) + pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry) // Read client cert/key for plugins that need to make calls out if len(s.ProxyClientCertFile) > 0 && len(s.ProxyClientKeyFile) > 0 { diff --git a/federation/cluster/common.sh b/federation/cluster/common.sh index a13c69a9f4160..d0610a6523b8f 100644 --- a/federation/cluster/common.sh +++ b/federation/cluster/common.sh @@ -258,7 +258,7 @@ function create-federation-api-objects { export FEDERATION_APISERVER_KEY_BASE64="${FEDERATION_APISERVER_KEY_BASE64}" # Enable the NamespaceLifecycle admission control by default. - export FEDERATION_ADMISSION_CONTROL="${FEDERATION_ADMISSION_CONTROL:-Initializers,NamespaceLifecycle}" + export FEDERATION_ADMISSION_CONTROL="${FEDERATION_ADMISSION_CONTROL:-NamespaceLifecycle}" for file in federation-etcd-pvc.yaml federation-apiserver-{deployment,secrets}.yaml federation-controller-manager-deployment.yaml; do echo "Creating manifest: ${file}" diff --git a/federation/cmd/federation-apiserver/app/BUILD b/federation/cmd/federation-apiserver/app/BUILD index 830442e65d1e7..5c1dd8ef83821 100644 --- a/federation/cmd/federation-apiserver/app/BUILD +++ b/federation/cmd/federation-apiserver/app/BUILD @@ -41,6 +41,7 @@ go_library( "//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions/install:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/cloudprovider/providers:go_default_library", diff --git a/federation/cmd/federation-apiserver/app/server.go b/federation/cmd/federation-apiserver/app/server.go index edc2a10c37eec..69e8eaad524ea 100644 --- a/federation/cmd/federation-apiserver/app/server.go +++ b/federation/cmd/federation-apiserver/app/server.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/api" apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/generated/openapi" @@ -180,6 +181,10 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("failed to create clientset: %v", err) } + externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig) + if err != nil { + return fmt.Errorf("failed to create external clientset: %v", err) + } sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers) @@ -199,7 +204,7 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error { // NOTE: we do not provide informers to the quota registry because admission level decisions // do not require us to open watches for all items tracked by quota. quotaRegistry := quotainstall.NewRegistry(nil, nil) - pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry) + pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry) err = s.Admission.ApplyTo( genericConfig, diff --git a/federation/pkg/kubefed/init/init.go b/federation/pkg/kubefed/init/init.go index d94a8abd3dcb1..13388216ef403 100644 --- a/federation/pkg/kubefed/init/init.go +++ b/federation/pkg/kubefed/init/init.go @@ -692,7 +692,7 @@ func createAPIServer(clientset client.Interface, namespace, name, federationName "--client-ca-file": "/etc/federation/apiserver/ca.crt", "--tls-cert-file": "/etc/federation/apiserver/server.crt", "--tls-private-key-file": "/etc/federation/apiserver/server.key", - "--admission-control": "Initializers,NamespaceLifecycle", + "--admission-control": "NamespaceLifecycle", } if advertiseAddress != "" { diff --git a/federation/pkg/kubefed/init/init_test.go b/federation/pkg/kubefed/init/init_test.go index 60a847f3f6398..765bbad5ffb91 100644 --- a/federation/pkg/kubefed/init/init_test.go +++ b/federation/pkg/kubefed/init/init_test.go @@ -869,7 +869,7 @@ func fakeInitHostFactory(apiserverServiceType v1.ServiceType, federationName, na fmt.Sprintf("--secure-port=%d", apiServerSecurePort), "--tls-cert-file=/etc/federation/apiserver/server.crt", "--tls-private-key-file=/etc/federation/apiserver/server.key", - "--admission-control=Initializers,NamespaceLifecycle", + "--admission-control=NamespaceLifecycle", fmt.Sprintf("--advertise-address=%s", address), } diff --git a/hack/make-rules/test-federation-cmd.sh b/hack/make-rules/test-federation-cmd.sh index 65e6f6594c6db..45e49a5d3067b 100755 --- a/hack/make-rules/test-federation-cmd.sh +++ b/hack/make-rules/test-federation-cmd.sh @@ -34,7 +34,7 @@ function run_federation_apiserver() { kube::log::status "Starting federation-apiserver" # Admission Controllers to invoke prior to persisting objects in cluster - ADMISSION_CONTROL="Initializers,NamespaceLifecycle" + ADMISSION_CONTROL="NamespaceLifecycle" "${KUBE_OUTPUT_HOSTBIN}/federation-apiserver" \ --insecure-port="${API_PORT}" \ diff --git a/pkg/kubeapiserver/admission/BUILD b/pkg/kubeapiserver/admission/BUILD index 8b796b26016f3..cd6d3bcaa9672 100644 --- a/pkg/kubeapiserver/admission/BUILD +++ b/pkg/kubeapiserver/admission/BUILD @@ -26,6 +26,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/apis/admissionregistration:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/quota:go_default_library", diff --git a/pkg/kubeapiserver/admission/configuration/BUILD b/pkg/kubeapiserver/admission/configuration/BUILD index d557d01dff7e3..8096c526efd9e 100644 --- a/pkg/kubeapiserver/admission/configuration/BUILD +++ b/pkg/kubeapiserver/admission/configuration/BUILD @@ -29,6 +29,8 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/kubeapiserver/admission/configuration/configuration_manager.go b/pkg/kubeapiserver/admission/configuration/configuration_manager.go index 95dd3c44c4656..450befa953906 100644 --- a/pkg/kubeapiserver/admission/configuration/configuration_manager.go +++ b/pkg/kubeapiserver/admission/configuration/configuration_manager.go @@ -30,6 +30,11 @@ const ( defaultFailureThreshold = 5 ) +var ( + ErrNotReady = fmt.Errorf("configuration is not ready") + ErrDisabled = fmt.Errorf("disabled") +) + type getFunc func() (runtime.Object, error) // When running, poller calls `get` every `interval`. If `get` is @@ -52,7 +57,8 @@ type poller struct { ready bool mergedConfiguration runtime.Object // lock much be hold when reading ready or mergedConfiguration - lock sync.RWMutex + lock sync.RWMutex + lastErr error } func newPoller(get getFunc) *poller { @@ -63,6 +69,12 @@ func newPoller(get getFunc) *poller { } } +func (a *poller) lastError(err error) { + a.lock.Lock() + defer a.lock.Unlock() + a.lastErr = err +} + func (a *poller) notReady() { a.lock.Lock() defer a.lock.Unlock() @@ -73,7 +85,10 @@ func (a *poller) configuration() (runtime.Object, error) { a.lock.RLock() defer a.lock.RUnlock() if !a.ready { - return nil, fmt.Errorf("configuration is not ready") + if a.lastErr != nil { + return nil, a.lastErr + } + return nil, ErrNotReady } return a.mergedConfiguration, nil } @@ -83,6 +98,7 @@ func (a *poller) setConfigurationAndReady(value runtime.Object) { defer a.lock.Unlock() a.mergedConfiguration = value a.ready = true + a.lastErr = nil } func (a *poller) Run(stopCh <-chan struct{}) { @@ -93,6 +109,7 @@ func (a *poller) sync() { configuration, err := a.get() if err != nil { a.failures++ + a.lastError(err) if a.failures >= a.failureThreshold { a.notReady() } diff --git a/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go b/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go index 56faf029f0c88..64d4bd3758937 100644 --- a/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go +++ b/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go @@ -33,6 +33,20 @@ type ExternalAdmissionHookConfigurationManager struct { *poller } +func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager { + getFn := func() (runtime.Object, error) { + list, err := c.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + return mergeExternalAdmissionHookConfigurations(list), nil + } + + return &ExternalAdmissionHookConfigurationManager{ + newPoller(getFn), + } +} + // ExternalAdmissionHooks returns the merged ExternalAdmissionHookConfiguration. func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*v1alpha1.ExternalAdmissionHookConfiguration, error) { configuration, err := im.poller.configuration() @@ -46,17 +60,8 @@ func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (* return externalAdmissionHookConfiguration, nil } -func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager { - getFn := func() (runtime.Object, error) { - list, err := c.List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - return mergeExternalAdmissionHookConfigurations(list), nil - } - - return &ExternalAdmissionHookConfigurationManager{ - newPoller(getFn)} +func (im *ExternalAdmissionHookConfigurationManager) Run(stopCh <-chan struct{}) { + im.poller.Run(stopCh) } func mergeExternalAdmissionHookConfigurations( diff --git a/pkg/kubeapiserver/admission/configuration/initializer_manager.go b/pkg/kubeapiserver/admission/configuration/initializer_manager.go index eba763f943b0d..86f4de0a56285 100644 --- a/pkg/kubeapiserver/admission/configuration/initializer_manager.go +++ b/pkg/kubeapiserver/admission/configuration/initializer_manager.go @@ -21,6 +21,9 @@ import ( "reflect" "sort" + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" @@ -34,6 +37,23 @@ type InitializerConfigurationManager struct { *poller } +func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager { + getFn := func() (runtime.Object, error) { + list, err := c.List(metav1.ListOptions{}) + if err != nil { + if errors.IsNotFound(err) || errors.IsForbidden(err) { + glog.V(5).Infof("Initializers are disabled due to an error: %v", err) + return nil, ErrDisabled + } + return nil, err + } + return mergeInitializerConfigurations(list), nil + } + return &InitializerConfigurationManager{ + newPoller(getFn), + } +} + // Initializers returns the merged InitializerConfiguration. func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.InitializerConfiguration, error) { configuration, err := im.poller.configuration() @@ -47,16 +67,8 @@ func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.Initializer return initializerConfiguration, nil } -func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager { - getFn := func() (runtime.Object, error) { - list, err := c.List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - return mergeInitializerConfigurations(list), nil - } - return &InitializerConfigurationManager{ - newPoller(getFn)} +func (im *InitializerConfigurationManager) Run(stopCh <-chan struct{}) { + im.poller.Run(stopCh) } func mergeInitializerConfigurations(initializerConfigurationList *v1alpha1.InitializerConfigurationList) *v1alpha1.InitializerConfiguration { diff --git a/pkg/kubeapiserver/admission/init_test.go b/pkg/kubeapiserver/admission/init_test.go index 7176f14a72bc4..d09e4cc601cb0 100644 --- a/pkg/kubeapiserver/admission/init_test.go +++ b/pkg/kubeapiserver/admission/init_test.go @@ -57,7 +57,7 @@ var _ WantsAuthorizer = &WantAuthorizerAdmission{} // TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer // interface is implemented. func TestWantsAuthorizer(t *testing.T) { - initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, nil, nil, nil) + initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, nil, nil, nil) wantAuthorizerAdmission := &WantAuthorizerAdmission{} initializer.Initialize(wantAuthorizerAdmission) if wantAuthorizerAdmission.auth == nil { @@ -76,7 +76,7 @@ func (self *WantsCloudConfigAdmissionPlugin) SetCloudConfig(cloudConfig []byte) func TestCloudConfigAdmissionPlugin(t *testing.T) { cloudConfig := []byte("cloud-configuration") - initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil) + initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil) wantsCloudConfigAdmission := &WantsCloudConfigAdmissionPlugin{} initializer.Initialize(wantsCloudConfigAdmission) diff --git a/pkg/kubeapiserver/admission/initializer.go b/pkg/kubeapiserver/admission/initializer.go index be56e6efdd6e0..275fde87f3792 100644 --- a/pkg/kubeapiserver/admission/initializer.go +++ b/pkg/kubeapiserver/admission/initializer.go @@ -23,6 +23,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/kubernetes/pkg/apis/admissionregistration" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/quota" @@ -36,6 +37,12 @@ type WantsInternalKubeClientSet interface { admission.Validator } +// WantsExternalKubeClientSet defines a function which sets ClientSet for admission plugins that need it +type WantsExternalKubeClientSet interface { + SetExternalKubeClientSet(clientset.Interface) + admission.Validator +} + // WantsInternalKubeInformerFactory defines a function which sets InformerFactory for admission plugins that need it type WantsInternalKubeInformerFactory interface { SetInternalKubeInformerFactory(informers.SharedInformerFactory) @@ -95,6 +102,7 @@ type WebhookSource interface { type PluginInitializer struct { internalClient internalclientset.Interface + externalClient clientset.Interface informers informers.SharedInformerFactory authorizer authorizer.Authorizer cloudConfig []byte @@ -113,14 +121,18 @@ var _ admission.PluginInitializer = &PluginInitializer{} // NewPluginInitializer constructs new instance of PluginInitializer // TODO: switch these parameters to use the builder pattern or just make them // all public, this construction method is pointless boilerplate. -func NewPluginInitializer(internalClient internalclientset.Interface, +func NewPluginInitializer( + internalClient internalclientset.Interface, + externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, authz authorizer.Authorizer, cloudConfig []byte, restMapper meta.RESTMapper, - quotaRegistry quota.Registry) *PluginInitializer { + quotaRegistry quota.Registry, +) *PluginInitializer { return &PluginInitializer{ internalClient: internalClient, + externalClient: externalClient, informers: sharedInformers, authorizer: authz, cloudConfig: cloudConfig, @@ -157,6 +169,10 @@ func (i *PluginInitializer) Initialize(plugin admission.Interface) { wants.SetInternalKubeClientSet(i.internalClient) } + if wants, ok := plugin.(WantsExternalKubeClientSet); ok { + wants.SetExternalKubeClientSet(i.externalClient) + } + if wants, ok := plugin.(WantsInternalKubeInformerFactory); ok { wants.SetInternalKubeInformerFactory(i.informers) } diff --git a/plugin/pkg/admission/gc/gc_admission_test.go b/plugin/pkg/admission/gc/gc_admission_test.go index 079f6b717c87c..4d7a6aac6a446 100644 --- a/plugin/pkg/admission/gc/gc_admission_test.go +++ b/plugin/pkg/admission/gc/gc_admission_test.go @@ -74,7 +74,7 @@ func newGCPermissionsEnforcement() *gcPermissionsEnforcement { Handler: admission.NewHandler(admission.Create, admission.Update), whiteList: whiteList, } - pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil) + pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil) pluginInitializer.Initialize(gcAdmit) return gcAdmit } diff --git a/plugin/pkg/admission/initialization/BUILD b/plugin/pkg/admission/initialization/BUILD index 88838997ef70a..e8708bcb8fd25 100644 --- a/plugin/pkg/admission/initialization/BUILD +++ b/plugin/pkg/admission/initialization/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -12,6 +13,10 @@ go_library( srcs = ["initialization.go"], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", + "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/kubeapiserver/admission/configuration:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", @@ -19,6 +24,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", ], @@ -36,3 +42,14 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["initialization_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + ], +) diff --git a/plugin/pkg/admission/initialization/initialization.go b/plugin/pkg/admission/initialization/initialization.go index 15855053c4088..864347e81e0cf 100644 --- a/plugin/pkg/admission/initialization/initialization.go +++ b/plugin/pkg/admission/initialization/initialization.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,6 +19,12 @@ package initialization import ( "fmt" "io" + "net" + "net/url" + "os" + "strings" + "syscall" + "time" "github.com/golang/glog" @@ -28,8 +34,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/kubeapiserver/admission/configuration" ) // Register registers a plugin @@ -43,38 +54,131 @@ type initializerOptions struct { Initializers []string } +type InitializationConfig interface { + Run(stopCh <-chan struct{}) + Initializers() (*v1alpha1.InitializerConfiguration, error) +} + type initializer struct { - resources map[schema.GroupResource]initializerOptions + config InitializationConfig authorizer authorizer.Authorizer } -// NewAlwaysAdmit creates a new always admit admission handler +// Retry config loading failures for up to four and a half seconds if the config hasn't been loaded +// yet or if the server is down. Creation requests are delayed during this interval, which prevents +// racy failures during startup until the initializer configuration becomes available. +// TODO: move into InitializationConfigurationManager, since these values depend on the config +// refresh loop. +const ( + retryTemporaryConfigFailures = 8 + retryTemporaryConfigInterval = 550 * time.Millisecond +) + +// NewInitializer creates a new initializer plugin which assigns newly created resources initializers +// based on configuration loaded from the admission API group. +// FUTURE: this may be moved to the storage layer of the apiserver, but for now this is an alpha feature +// that can be disabled. func NewInitializer() admission.Interface { - return &initializer{ - resources: map[schema.GroupResource]initializerOptions{ - //schema.GroupResource{Resource: "pods"}: {Initializers: []string{"Test"}}, - }, - } + return &initializer{} } func (i *initializer) Validate() error { + if i.config == nil { + return fmt.Errorf("the Initializer admission plugin requires a Kubernetes client to be provided") + } + i.config.Run(wait.NeverStop) return nil } +func (i *initializer) SetExternalKubeClientSet(client clientset.Interface) { + i.config = configuration.NewInitializerConfigurationManager(client.Admissionregistration().InitializerConfigurations()) +} + func (i *initializer) SetAuthorizer(a authorizer.Authorizer) { i.authorizer = a } var initializerFieldPath = field.NewPath("metadata", "initializers") +// temporaryConnectionError returns true if the error is considered temporary +func temporaryConnectionError(err error) bool { + if urlError, ok := err.(*url.Error); ok { + if urlError.Temporary() { + return true + } + if opError, ok := urlError.Err.(*net.OpError); ok { + if syscallError, ok := opError.Err.(*os.SyscallError); ok { + if errno, ok := syscallError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { + return true + } + } + } + } + return false +} + +// readConfigWithRetry holds requests instead of failing them if the server is not yet initialized +// or is unresponsive. It formats the returned error for client use if necessary. +func (i *initializer) readConfigWithRetry(a admission.Attributes) (*v1alpha1.InitializerConfiguration, error) { + var lastErr error + for count := 0; count < retryTemporaryConfigFailures; count++ { + if count > 0 { + time.Sleep(retryTemporaryConfigInterval) + } + + // read initializers from config + config, err := i.config.Initializers() + if err == nil { + return config, nil + } + + // if initializer configuration is disabled, fail open + if err == configuration.ErrDisabled { + return &v1alpha1.InitializerConfiguration{}, nil + } + + // retry certain errors + lastErr = err + if err != configuration.ErrNotReady && !temporaryConnectionError(err) { + break + } + } + + e := errors.NewServerTimeout(a.GetResource().GroupResource(), "create", 1) + if lastErr == configuration.ErrNotReady { + e.ErrStatus.Message = fmt.Sprintf("Waiting for initialization configuration to load: %v", lastErr) + e.ErrStatus.Reason = "LoadingConfiguration" + e.ErrStatus.Details.Causes = append(e.ErrStatus.Details.Causes, metav1.StatusCause{ + Type: "InitializerConfigurationPending", + Message: "The server is waiting for the initializer configuration to be loaded.", + }) + } else { + e.ErrStatus.Message = fmt.Sprintf("Unable to refresh the initializer configuration: %v", lastErr) + e.ErrStatus.Reason = "LoadingConfiguration" + e.ErrStatus.Details.Causes = append(e.ErrStatus.Details.Causes, metav1.StatusCause{ + Type: "InitializerConfigurationFailure", + Message: "An error has occurred while refreshing the initializer configuration, no resources can be created until a refresh succeeds.", + }) + } + return nil, e +} + +// Admit checks for create requests to add initializers, or update request to enforce invariants. +// The admission controller fails open if the object doesn't have ObjectMeta (can't be initialized). +// A client with sufficient permission ("initialize" verb on resource) can specify its own initializers +// or an empty initializers struct (which bypasses initialization). Only clients with the initialize verb +// can update objects that have not completed initialization. Sub resources can still be modified on +// resources that are undergoing initialization. +// TODO: once this logic is ready for beta, move it into the REST storage layer. func (i *initializer) Admit(a admission.Attributes) (err error) { - // TODO: sub-resource action should be denied until the object is initialized - if len(a.GetSubresource()) > 0 { + switch a.GetOperation() { + case admission.Create, admission.Update: + default: return nil } - resource, ok := i.resources[a.GetResource().GroupResource()] - if !ok { + // TODO: should sub-resource action should be denied until the object is initialized? + if len(a.GetSubresource()) > 0 { return nil } @@ -87,15 +191,44 @@ func (i *initializer) Admit(a admission.Attributes) (err error) { return nil } existing := accessor.GetInitializers() - // it must be possible for some users to bypass initialization - for now, check the initialize operation if existing != nil { - if err := i.canInitialize(a); err != nil { + glog.V(5).Infof("Admin bypassing initialization for %s", a.GetResource()) + + // it must be possible for some users to bypass initialization - for now, check the initialize operation + if err := i.canInitialize(a, "create with initializers denied"); err != nil { + return err + } + // allow administrators to bypass initialization by setting an empty initializers struct + if len(existing.Pending) == 0 && existing.Result == nil { + accessor.SetInitializers(nil) + return nil + } + } else { + glog.V(5).Infof("Checking initialization for %s", a.GetResource()) + + config, err := i.readConfigWithRetry(a) + if err != nil { return err } - } - // TODO: pull this from config - accessor.SetInitializers(copiedInitializers(resource.Initializers)) + // Mirror pods are exempt from initialization because they are created and initialized + // on the Kubelet before they appear in the API. + // TODO: once this moves to REST storage layer, this becomes a pod specific concern + if pod, ok := a.GetObject().(*api.Pod); ok && pod != nil { + if _, isMirror := pod.Annotations[api.MirrorPodAnnotationKey]; isMirror { + return nil + } + } + + names := findInitializers(config, a.GetResource()) + if len(names) == 0 { + glog.V(5).Infof("No initializers needed") + return nil + } + + glog.V(5).Infof("Found initializers for %s: %v", a.GetResource(), names) + accessor.SetInitializers(newInitializers(names)) + } case admission.Update: accessor, err := meta.Accessor(a.GetObject()) @@ -113,13 +246,20 @@ func (i *initializer) Admit(a admission.Attributes) (err error) { } existing := existingAccessor.GetInitializers() + // updates on initialized resources are allowed + if updated == nil && existing == nil { + return nil + } + + glog.V(5).Infof("Modifying uninitialized resource %s", a.GetResource()) + // because we are called before validation, we need to ensure the update transition is valid. if errs := validation.ValidateInitializersUpdate(updated, existing, initializerFieldPath); len(errs) > 0 { return errors.NewInvalid(a.GetKind().GroupKind(), a.GetName(), errs) } // caller must have the ability to mutate un-initialized resources - if err := i.canInitialize(a); err != nil { + if err := i.canInitialize(a, "update to uninitialized resource denied"); err != nil { return err } @@ -129,7 +269,7 @@ func (i *initializer) Admit(a admission.Attributes) (err error) { return nil } -func (i *initializer) canInitialize(a admission.Attributes) error { +func (i *initializer) canInitialize(a admission.Attributes, message string) error { // if no authorizer is present, the initializer plugin allows modification of uninitialized resources if i.authorizer == nil { glog.V(4).Infof("No authorizer provided to initialization admission control, unable to check permissions") @@ -150,16 +290,17 @@ func (i *initializer) canInitialize(a admission.Attributes) error { return err } if !authorized { - return fmt.Errorf("user must have permission to initialize resources: %s", reason) + return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("%s: %s", message, reason)) } return nil } func (i *initializer) Handles(op admission.Operation) bool { - return true + return op == admission.Create || op == admission.Update } -func copiedInitializers(names []string) *metav1.Initializers { +// newInitializers populates an Initializers struct. +func newInitializers(names []string) *metav1.Initializers { if len(names) == 0 { return nil } @@ -171,3 +312,71 @@ func copiedInitializers(names []string) *metav1.Initializers { Pending: init, } } + +// findInitializers returns the list of initializer names that apply to a config. It returns an empty list +// if no initializers apply. +func findInitializers(initializers *v1alpha1.InitializerConfiguration, gvr schema.GroupVersionResource) []string { + var names []string + for _, init := range initializers.Initializers { + if !matchRule(init.Rules, gvr) { + continue + } + names = append(names, init.Name) + } + return names +} + +// matchRule returns true if any rule matches the provided group version resource. +func matchRule(rules []v1alpha1.Rule, gvr schema.GroupVersionResource) bool { + for _, rule := range rules { + if !hasGroup(rule.APIGroups, gvr.Group) { + return false + } + if !hasVersion(rule.APIVersions, gvr.Version) { + return false + } + if !hasResource(rule.Resources, gvr.Resource) { + return false + } + } + return len(rules) > 0 +} + +func hasGroup(groups []string, group string) bool { + if groups[0] == "*" { + return true + } + for _, g := range groups { + if g == group { + return true + } + } + return false +} + +func hasVersion(versions []string, version string) bool { + if versions[0] == "*" { + return true + } + for _, v := range versions { + if v == version { + return true + } + } + return false +} + +func hasResource(resources []string, resource string) bool { + if resources[0] == "*" || resources[0] == "*/*" { + return true + } + for _, r := range resources { + if strings.Contains(r, "/") { + continue + } + if r == resource { + return true + } + } + return false +} diff --git a/plugin/pkg/admission/initialization/initialization_test.go b/plugin/pkg/admission/initialization/initialization_test.go new file mode 100644 index 0000000000000..b45844c93746e --- /dev/null +++ b/plugin/pkg/admission/initialization/initialization_test.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package initialization + +import ( + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" +) + +func newInitializer(name string, rules ...v1alpha1.Rule) *v1alpha1.InitializerConfiguration { + return addInitializer(&v1alpha1.InitializerConfiguration{}, name, rules...) +} + +func addInitializer(base *v1alpha1.InitializerConfiguration, name string, rules ...v1alpha1.Rule) *v1alpha1.InitializerConfiguration { + base.Initializers = append(base.Initializers, v1alpha1.Initializer{ + Name: name, + Rules: rules, + }) + return base +} + +func TestFindInitializers(t *testing.T) { + type args struct { + initializers *v1alpha1.InitializerConfiguration + gvr schema.GroupVersionResource + } + tests := []struct { + name string + args args + want []string + }{ + { + name: "empty", + args: args{ + gvr: schema.GroupVersionResource{}, + initializers: newInitializer("1"), + }, + }, + { + name: "everything", + args: args{ + gvr: schema.GroupVersionResource{}, + initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{"*"}, APIVersions: []string{"*"}, Resources: []string{"*"}}), + }, + want: []string{"1"}, + }, + { + name: "empty group", + args: args{ + gvr: schema.GroupVersionResource{}, + initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"*"}}), + }, + want: []string{"1"}, + }, + { + name: "pod", + args: args{ + gvr: schema.GroupVersionResource{Resource: "pods"}, + initializers: addInitializer( + newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}), + "2", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}, + ), + }, + want: []string{"1", "2"}, + }, + { + name: "multiple matches", + args: args{ + gvr: schema.GroupVersionResource{Resource: "pods"}, + initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}), + }, + want: []string{"1"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := findInitializers(tt.args.initializers, tt.args.gvr); !reflect.DeepEqual(got, tt.want) { + t.Errorf("findInitializers() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/plugin/pkg/admission/limitranger/admission_test.go b/plugin/pkg/admission/limitranger/admission_test.go index 0e9d1bf68aa9a..135e9914f0a28 100644 --- a/plugin/pkg/admission/limitranger/admission_test.go +++ b/plugin/pkg/admission/limitranger/admission_test.go @@ -595,7 +595,7 @@ func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.Sh if err != nil { return nil, f, err } - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err = admission.Validate(handler) return handler, f, err diff --git a/plugin/pkg/admission/namespace/autoprovision/admission_test.go b/plugin/pkg/admission/namespace/autoprovision/admission_test.go index 36bdb80f6f8e0..b4e7227433e0e 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission_test.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission_test.go @@ -38,7 +38,7 @@ import ( func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewProvision() - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err := admission.Validate(handler) return handler, f, err diff --git a/plugin/pkg/admission/namespace/exists/admission_test.go b/plugin/pkg/admission/namespace/exists/admission_test.go index ebba2ebc67b49..00bdd805f6352 100644 --- a/plugin/pkg/admission/namespace/exists/admission_test.go +++ b/plugin/pkg/admission/namespace/exists/admission_test.go @@ -37,7 +37,7 @@ import ( func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewExists() - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err := admission.Validate(handler) return handler, f, err diff --git a/plugin/pkg/admission/podnodeselector/admission_test.go b/plugin/pkg/admission/podnodeselector/admission_test.go index a7584cd1dc236..f5bc4ac969e2d 100644 --- a/plugin/pkg/admission/podnodeselector/admission_test.go +++ b/plugin/pkg/admission/podnodeselector/admission_test.go @@ -191,7 +191,7 @@ func TestHandles(t *testing.T) { func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) { f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewPodNodeSelector(nil) - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err := admission.Validate(handler) return handler, f, err diff --git a/plugin/pkg/admission/podtolerationrestriction/admission_test.go b/plugin/pkg/admission/podtolerationrestriction/admission_test.go index 16035ee6ddd34..53717f613485b 100644 --- a/plugin/pkg/admission/podtolerationrestriction/admission_test.go +++ b/plugin/pkg/admission/podtolerationrestriction/admission_test.go @@ -193,7 +193,7 @@ func newHandlerForTest(c clientset.Interface) (*podTolerationsPlugin, informers. return nil, nil, err } handler := NewPodTolerationsPlugin(pluginConfig) - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err = admission.Validate(handler) return handler, f, err diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go index 0a86aefa0781e..760f4fc3da7c5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go @@ -41,7 +41,7 @@ type AdmissionOptions struct { func NewAdmissionOptions() *AdmissionOptions { options := &AdmissionOptions{ Plugins: &admission.Plugins{}, - PluginNames: []string{"Initializers"}, + PluginNames: []string{}, } server.RegisterAllAdmissionPlugins(options.Plugins) return options diff --git a/test/e2e/extension/BUILD b/test/e2e/extension/BUILD index bc9bf9064a43d..070760a98c6a9 100644 --- a/test/e2e/extension/BUILD +++ b/test/e2e/extension/BUILD @@ -13,6 +13,9 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/retry:go_default_library", "//test/e2e/framework:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", diff --git a/test/e2e/extension/initializers.go b/test/e2e/extension/initializers.go index 49922ac2b3fa5..4b7a8ffa78d0a 100644 --- a/test/e2e/extension/initializers.go +++ b/test/e2e/extension/initializers.go @@ -23,10 +23,14 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + clientretry "k8s.io/kubernetes/pkg/client/retry" "k8s.io/kubernetes/test/e2e/framework" ) @@ -65,10 +69,19 @@ var _ = framework.KubeDescribe("Initializers", func() { // verify that we can update an initializing pod pod, err := c.Core().Pods(ns).Get(podName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) pod.Annotations = map[string]string{"update-1": "test"} pod, err = c.Core().Pods(ns).Update(pod) Expect(err).NotTo(HaveOccurred()) + // verify the list call filters out uninitialized pods + pods, err := c.Core().Pods(ns).List(metav1.ListOptions{IncludeUninitialized: true}) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(1)) + pods, err = c.Core().Pods(ns).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(0)) + // clear initializers pod.Initializers = nil pod, err = c.Core().Pods(ns).Update(pod) @@ -93,17 +106,120 @@ var _ = framework.KubeDescribe("Initializers", func() { } }) + It("should dynamically register and apply initializers to pods [Serial]", func() { + ns := f.Namespace.Name + c := f.ClientSet + + podName := "uninitialized-pod" + framework.Logf("Creating pod %s", podName) + + // create and register an initializer + initializerName := "pod.test.e2e.kubernetes.io" + initializerConfigName := "e2e-test-initializer" + _, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(&v1alpha1.InitializerConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName}, + Initializers: []v1alpha1.Initializer{ + { + Name: initializerName, + Rules: []v1alpha1.Rule{ + {APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}, + }, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + // we must remove the initializer when the test is complete and ensure no pods are pending for that initializer + defer func() { + if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) { + framework.Logf("got error on deleting %s", initializerConfigName) + } + // poller configuration is 1 second, wait at least that long + time.Sleep(3 * time.Second) + // clear our initializer from anyone who got it + removeInitializersFromAllPods(c, initializerName) + }() + + // poller configuration is 1 second, wait at least that long + time.Sleep(3 * time.Second) + + // run create that blocks + ch := make(chan struct{}) + go func() { + defer close(ch) + _, err := c.Core().Pods(ns).Create(newPod(podName)) + Expect(err).NotTo(HaveOccurred()) + }() + + // wait until the pod shows up uninitialized + By("Waiting until the pod is visible to a client") + var pod *v1.Pod + err = wait.PollImmediate(2*time.Second, 15*time.Second, func() (bool, error) { + pod, err = c.Core().Pods(ns).Get(podName, metav1.GetOptions{IncludeUninitialized: true}) + if errors.IsNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil + }) + Expect(err).NotTo(HaveOccurred()) + Expect(pod.Initializers).NotTo(BeNil()) + Expect(pod.Initializers.Pending).To(HaveLen(1)) + Expect(pod.Initializers.Pending[0].Name).To(Equal(initializerName)) + + // pretend we are an initializer + By("Completing initialization") + pod.Initializers = nil + pod, err = c.Core().Pods(ns).Update(pod) + Expect(err).NotTo(HaveOccurred()) + + // ensure create call returns + <-ch + + // pod should now start running + err = framework.WaitForPodRunningInNamespace(c, pod) + Expect(err).NotTo(HaveOccurred()) + + // bypass initialization by explicitly passing an empty pending list + By("Setting an empty initializer as an admin to bypass initialization") + podName = "preinitialized-pod" + pod = newUninitializedPod(podName) + pod.Initializers.Pending = nil + pod, err = c.Core().Pods(ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + Expect(pod.Initializers).To(BeNil()) + + // bypass initialization for mirror pods + By("Creating a mirror pod that bypasses initialization") + podName = "mirror-pod" + pod = newPod(podName) + pod.Annotations = map[string]string{ + v1.MirrorPodAnnotationKey: "true", + } + pod.Spec.NodeName = "node-does-not-yet-exist" + pod, err = c.Core().Pods(ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + Expect(pod.Initializers).To(BeNil()) + Expect(pod.Annotations[v1.MirrorPodAnnotationKey]).To(Equal("true")) + }) }) func newUninitializedPod(podName string) *v1.Pod { + pod := newPod(podName) + pod.Initializers = &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Test"}}, + } + return pod +} + +func newPod(podName string) *v1.Pod { containerName := fmt.Sprintf("%s-container", podName) port := 8080 pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, - Initializers: &metav1.Initializers{ - Pending: []metav1.Initializer{{Name: "Test"}}, - }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -119,3 +235,48 @@ func newUninitializedPod(podName string) *v1.Pod { } return pod } + +// removeInitializersFromAllPods walks all pods and ensures they don't have the provided initializer, +// to guarantee completing the test doesn't block the entire cluster. +func removeInitializersFromAllPods(c clientset.Interface, initializerName string) { + pods, err := c.Core().Pods("").List(metav1.ListOptions{IncludeUninitialized: true}) + if err != nil { + return + } + for _, p := range pods.Items { + if p.Initializers == nil { + continue + } + err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error { + pod, err := c.Core().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{IncludeUninitialized: true}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + if pod.Initializers == nil { + return nil + } + var updated []metav1.Initializer + for _, pending := range pod.Initializers.Pending { + if pending.Name != initializerName { + updated = append(updated, pending) + } + } + if len(updated) == len(pod.Initializers.Pending) { + return nil + } + pod.Initializers.Pending = updated + if len(updated) == 0 { + pod.Initializers = nil + } + framework.Logf("Found initializer on pod %s in ns %s", pod.Name, pod.Namespace) + _, err = c.Core().Pods(p.Namespace).Update(pod) + return err + }) + if err != nil { + framework.Logf("Unable to remove initializer from pod %s in ns %s: %v", p.Name, p.Namespace, err) + } + } +}