From 034f06d7e416f6ce077fad03b6902c7052025d0f Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 5 Jun 2017 13:27:08 -0400 Subject: [PATCH 1/2] Remove Initializers from federation --- federation/cluster/common.sh | 2 +- federation/pkg/kubefed/init/init.go | 2 +- federation/pkg/kubefed/init/init_test.go | 2 +- hack/make-rules/test-federation-cmd.sh | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/federation/cluster/common.sh b/federation/cluster/common.sh index a13c69a9f4160..d0610a6523b8f 100644 --- a/federation/cluster/common.sh +++ b/federation/cluster/common.sh @@ -258,7 +258,7 @@ function create-federation-api-objects { export FEDERATION_APISERVER_KEY_BASE64="${FEDERATION_APISERVER_KEY_BASE64}" # Enable the NamespaceLifecycle admission control by default. - export FEDERATION_ADMISSION_CONTROL="${FEDERATION_ADMISSION_CONTROL:-Initializers,NamespaceLifecycle}" + export FEDERATION_ADMISSION_CONTROL="${FEDERATION_ADMISSION_CONTROL:-NamespaceLifecycle}" for file in federation-etcd-pvc.yaml federation-apiserver-{deployment,secrets}.yaml federation-controller-manager-deployment.yaml; do echo "Creating manifest: ${file}" diff --git a/federation/pkg/kubefed/init/init.go b/federation/pkg/kubefed/init/init.go index d94a8abd3dcb1..13388216ef403 100644 --- a/federation/pkg/kubefed/init/init.go +++ b/federation/pkg/kubefed/init/init.go @@ -692,7 +692,7 @@ func createAPIServer(clientset client.Interface, namespace, name, federationName "--client-ca-file": "/etc/federation/apiserver/ca.crt", "--tls-cert-file": "/etc/federation/apiserver/server.crt", "--tls-private-key-file": "/etc/federation/apiserver/server.key", - "--admission-control": "Initializers,NamespaceLifecycle", + "--admission-control": "NamespaceLifecycle", } if advertiseAddress != "" { diff --git a/federation/pkg/kubefed/init/init_test.go b/federation/pkg/kubefed/init/init_test.go index 60a847f3f6398..765bbad5ffb91 100644 --- a/federation/pkg/kubefed/init/init_test.go +++ b/federation/pkg/kubefed/init/init_test.go @@ -869,7 +869,7 @@ func fakeInitHostFactory(apiserverServiceType v1.ServiceType, federationName, na fmt.Sprintf("--secure-port=%d", apiServerSecurePort), "--tls-cert-file=/etc/federation/apiserver/server.crt", "--tls-private-key-file=/etc/federation/apiserver/server.key", - "--admission-control=Initializers,NamespaceLifecycle", + "--admission-control=NamespaceLifecycle", fmt.Sprintf("--advertise-address=%s", address), } diff --git a/hack/make-rules/test-federation-cmd.sh b/hack/make-rules/test-federation-cmd.sh index 65e6f6594c6db..45e49a5d3067b 100755 --- a/hack/make-rules/test-federation-cmd.sh +++ b/hack/make-rules/test-federation-cmd.sh @@ -34,7 +34,7 @@ function run_federation_apiserver() { kube::log::status "Starting federation-apiserver" # Admission Controllers to invoke prior to persisting objects in cluster - ADMISSION_CONTROL="Initializers,NamespaceLifecycle" + ADMISSION_CONTROL="NamespaceLifecycle" "${KUBE_OUTPUT_HOSTBIN}/federation-apiserver" \ --insecure-port="${API_PORT}" \ From 772ab8e1b4163c17d285a2789321762a8f2dc9f3 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 30 May 2017 22:58:57 -0400 Subject: [PATCH 2/2] Load initializers from dynamic config Handle failure cases on startup gracefully to avoid causing cascading errors and poor initialization in other components. Initial errors from config load cause the initializer to pause and hold requests. Return typed errors to better communicate failures to clients. Add code to handle two specific cases - admin wants to bypass initialization defaulting, and mirror pods (which want to bypass initialization because the kubelet owns their lifecycle). --- cmd/kube-apiserver/app/BUILD | 1 + cmd/kube-apiserver/app/server.go | 10 +- federation/cmd/federation-apiserver/app/BUILD | 1 + .../cmd/federation-apiserver/app/server.go | 7 +- pkg/kubeapiserver/admission/BUILD | 1 + .../admission/configuration/BUILD | 2 + .../configuration/configuration_manager.go | 21 +- .../external_admission_hook_manager.go | 27 +- .../configuration/initializer_manager.go | 32 ++- pkg/kubeapiserver/admission/init_test.go | 4 +- pkg/kubeapiserver/admission/initializer.go | 20 +- plugin/pkg/admission/gc/gc_admission_test.go | 2 +- plugin/pkg/admission/initialization/BUILD | 17 ++ .../initialization/initialization.go | 253 ++++++++++++++++-- .../initialization/initialization_test.go | 99 +++++++ .../admission/limitranger/admission_test.go | 2 +- .../namespace/autoprovision/admission_test.go | 2 +- .../namespace/exists/admission_test.go | 2 +- .../podnodeselector/admission_test.go | 2 +- .../admission_test.go | 2 +- .../apiserver/pkg/server/options/admission.go | 2 +- test/e2e/extension/BUILD | 3 + test/e2e/extension/initializers.go | 167 +++++++++++- 23 files changed, 617 insertions(+), 62 deletions(-) create mode 100644 plugin/pkg/admission/initialization/initialization_test.go diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index e5012da611c61..97bac39362f10 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -25,6 +25,7 @@ go_library( "//pkg/apis/extensions:go_default_library", "//pkg/apis/networking:go_default_library", "//pkg/capabilities:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/cloudprovider:go_default_library", diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index ae52ed0f62b62..08f13ffb9adaa 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -60,6 +60,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/networking" "k8s.io/kubernetes/pkg/capabilities" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/cloudprovider" @@ -379,6 +380,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, // TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions) } + externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err) + } sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers) @@ -397,6 +402,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, pluginInitializer, err := BuildAdmissionPluginInitializer( s, client, + externalClient, sharedInformers, genericConfig.Authorizer, ) @@ -414,7 +420,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, } // BuildAdmissionPluginInitializer constructs the admission plugin initializer -func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) { +func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) { var cloudConfig []byte if s.CloudProvider.CloudConfigFile != "" { @@ -432,7 +438,7 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna // do not require us to open watches for all items tracked by quota. quotaRegistry := quotainstall.NewRegistry(nil, nil) - pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry) + pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry) // Read client cert/key for plugins that need to make calls out if len(s.ProxyClientCertFile) > 0 && len(s.ProxyClientKeyFile) > 0 { diff --git a/federation/cmd/federation-apiserver/app/BUILD b/federation/cmd/federation-apiserver/app/BUILD index 830442e65d1e7..5c1dd8ef83821 100644 --- a/federation/cmd/federation-apiserver/app/BUILD +++ b/federation/cmd/federation-apiserver/app/BUILD @@ -41,6 +41,7 @@ go_library( "//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions/install:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/cloudprovider/providers:go_default_library", diff --git a/federation/cmd/federation-apiserver/app/server.go b/federation/cmd/federation-apiserver/app/server.go index edc2a10c37eec..69e8eaad524ea 100644 --- a/federation/cmd/federation-apiserver/app/server.go +++ b/federation/cmd/federation-apiserver/app/server.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/api" apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/generated/openapi" @@ -180,6 +181,10 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("failed to create clientset: %v", err) } + externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig) + if err != nil { + return fmt.Errorf("failed to create external clientset: %v", err) + } sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers) @@ -199,7 +204,7 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error { // NOTE: we do not provide informers to the quota registry because admission level decisions // do not require us to open watches for all items tracked by quota. quotaRegistry := quotainstall.NewRegistry(nil, nil) - pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry) + pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry) err = s.Admission.ApplyTo( genericConfig, diff --git a/pkg/kubeapiserver/admission/BUILD b/pkg/kubeapiserver/admission/BUILD index 8b796b26016f3..cd6d3bcaa9672 100644 --- a/pkg/kubeapiserver/admission/BUILD +++ b/pkg/kubeapiserver/admission/BUILD @@ -26,6 +26,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/apis/admissionregistration:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/quota:go_default_library", diff --git a/pkg/kubeapiserver/admission/configuration/BUILD b/pkg/kubeapiserver/admission/configuration/BUILD index d557d01dff7e3..8096c526efd9e 100644 --- a/pkg/kubeapiserver/admission/configuration/BUILD +++ b/pkg/kubeapiserver/admission/configuration/BUILD @@ -29,6 +29,8 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/kubeapiserver/admission/configuration/configuration_manager.go b/pkg/kubeapiserver/admission/configuration/configuration_manager.go index 95dd3c44c4656..450befa953906 100644 --- a/pkg/kubeapiserver/admission/configuration/configuration_manager.go +++ b/pkg/kubeapiserver/admission/configuration/configuration_manager.go @@ -30,6 +30,11 @@ const ( defaultFailureThreshold = 5 ) +var ( + ErrNotReady = fmt.Errorf("configuration is not ready") + ErrDisabled = fmt.Errorf("disabled") +) + type getFunc func() (runtime.Object, error) // When running, poller calls `get` every `interval`. If `get` is @@ -52,7 +57,8 @@ type poller struct { ready bool mergedConfiguration runtime.Object // lock much be hold when reading ready or mergedConfiguration - lock sync.RWMutex + lock sync.RWMutex + lastErr error } func newPoller(get getFunc) *poller { @@ -63,6 +69,12 @@ func newPoller(get getFunc) *poller { } } +func (a *poller) lastError(err error) { + a.lock.Lock() + defer a.lock.Unlock() + a.lastErr = err +} + func (a *poller) notReady() { a.lock.Lock() defer a.lock.Unlock() @@ -73,7 +85,10 @@ func (a *poller) configuration() (runtime.Object, error) { a.lock.RLock() defer a.lock.RUnlock() if !a.ready { - return nil, fmt.Errorf("configuration is not ready") + if a.lastErr != nil { + return nil, a.lastErr + } + return nil, ErrNotReady } return a.mergedConfiguration, nil } @@ -83,6 +98,7 @@ func (a *poller) setConfigurationAndReady(value runtime.Object) { defer a.lock.Unlock() a.mergedConfiguration = value a.ready = true + a.lastErr = nil } func (a *poller) Run(stopCh <-chan struct{}) { @@ -93,6 +109,7 @@ func (a *poller) sync() { configuration, err := a.get() if err != nil { a.failures++ + a.lastError(err) if a.failures >= a.failureThreshold { a.notReady() } diff --git a/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go b/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go index 56faf029f0c88..64d4bd3758937 100644 --- a/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go +++ b/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go @@ -33,6 +33,20 @@ type ExternalAdmissionHookConfigurationManager struct { *poller } +func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager { + getFn := func() (runtime.Object, error) { + list, err := c.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + return mergeExternalAdmissionHookConfigurations(list), nil + } + + return &ExternalAdmissionHookConfigurationManager{ + newPoller(getFn), + } +} + // ExternalAdmissionHooks returns the merged ExternalAdmissionHookConfiguration. func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*v1alpha1.ExternalAdmissionHookConfiguration, error) { configuration, err := im.poller.configuration() @@ -46,17 +60,8 @@ func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (* return externalAdmissionHookConfiguration, nil } -func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager { - getFn := func() (runtime.Object, error) { - list, err := c.List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - return mergeExternalAdmissionHookConfigurations(list), nil - } - - return &ExternalAdmissionHookConfigurationManager{ - newPoller(getFn)} +func (im *ExternalAdmissionHookConfigurationManager) Run(stopCh <-chan struct{}) { + im.poller.Run(stopCh) } func mergeExternalAdmissionHookConfigurations( diff --git a/pkg/kubeapiserver/admission/configuration/initializer_manager.go b/pkg/kubeapiserver/admission/configuration/initializer_manager.go index eba763f943b0d..86f4de0a56285 100644 --- a/pkg/kubeapiserver/admission/configuration/initializer_manager.go +++ b/pkg/kubeapiserver/admission/configuration/initializer_manager.go @@ -21,6 +21,9 @@ import ( "reflect" "sort" + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" @@ -34,6 +37,23 @@ type InitializerConfigurationManager struct { *poller } +func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager { + getFn := func() (runtime.Object, error) { + list, err := c.List(metav1.ListOptions{}) + if err != nil { + if errors.IsNotFound(err) || errors.IsForbidden(err) { + glog.V(5).Infof("Initializers are disabled due to an error: %v", err) + return nil, ErrDisabled + } + return nil, err + } + return mergeInitializerConfigurations(list), nil + } + return &InitializerConfigurationManager{ + newPoller(getFn), + } +} + // Initializers returns the merged InitializerConfiguration. func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.InitializerConfiguration, error) { configuration, err := im.poller.configuration() @@ -47,16 +67,8 @@ func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.Initializer return initializerConfiguration, nil } -func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager { - getFn := func() (runtime.Object, error) { - list, err := c.List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - return mergeInitializerConfigurations(list), nil - } - return &InitializerConfigurationManager{ - newPoller(getFn)} +func (im *InitializerConfigurationManager) Run(stopCh <-chan struct{}) { + im.poller.Run(stopCh) } func mergeInitializerConfigurations(initializerConfigurationList *v1alpha1.InitializerConfigurationList) *v1alpha1.InitializerConfiguration { diff --git a/pkg/kubeapiserver/admission/init_test.go b/pkg/kubeapiserver/admission/init_test.go index 7176f14a72bc4..d09e4cc601cb0 100644 --- a/pkg/kubeapiserver/admission/init_test.go +++ b/pkg/kubeapiserver/admission/init_test.go @@ -57,7 +57,7 @@ var _ WantsAuthorizer = &WantAuthorizerAdmission{} // TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer // interface is implemented. func TestWantsAuthorizer(t *testing.T) { - initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, nil, nil, nil) + initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, nil, nil, nil) wantAuthorizerAdmission := &WantAuthorizerAdmission{} initializer.Initialize(wantAuthorizerAdmission) if wantAuthorizerAdmission.auth == nil { @@ -76,7 +76,7 @@ func (self *WantsCloudConfigAdmissionPlugin) SetCloudConfig(cloudConfig []byte) func TestCloudConfigAdmissionPlugin(t *testing.T) { cloudConfig := []byte("cloud-configuration") - initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil) + initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil) wantsCloudConfigAdmission := &WantsCloudConfigAdmissionPlugin{} initializer.Initialize(wantsCloudConfigAdmission) diff --git a/pkg/kubeapiserver/admission/initializer.go b/pkg/kubeapiserver/admission/initializer.go index be56e6efdd6e0..275fde87f3792 100644 --- a/pkg/kubeapiserver/admission/initializer.go +++ b/pkg/kubeapiserver/admission/initializer.go @@ -23,6 +23,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/kubernetes/pkg/apis/admissionregistration" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/quota" @@ -36,6 +37,12 @@ type WantsInternalKubeClientSet interface { admission.Validator } +// WantsExternalKubeClientSet defines a function which sets ClientSet for admission plugins that need it +type WantsExternalKubeClientSet interface { + SetExternalKubeClientSet(clientset.Interface) + admission.Validator +} + // WantsInternalKubeInformerFactory defines a function which sets InformerFactory for admission plugins that need it type WantsInternalKubeInformerFactory interface { SetInternalKubeInformerFactory(informers.SharedInformerFactory) @@ -95,6 +102,7 @@ type WebhookSource interface { type PluginInitializer struct { internalClient internalclientset.Interface + externalClient clientset.Interface informers informers.SharedInformerFactory authorizer authorizer.Authorizer cloudConfig []byte @@ -113,14 +121,18 @@ var _ admission.PluginInitializer = &PluginInitializer{} // NewPluginInitializer constructs new instance of PluginInitializer // TODO: switch these parameters to use the builder pattern or just make them // all public, this construction method is pointless boilerplate. -func NewPluginInitializer(internalClient internalclientset.Interface, +func NewPluginInitializer( + internalClient internalclientset.Interface, + externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, authz authorizer.Authorizer, cloudConfig []byte, restMapper meta.RESTMapper, - quotaRegistry quota.Registry) *PluginInitializer { + quotaRegistry quota.Registry, +) *PluginInitializer { return &PluginInitializer{ internalClient: internalClient, + externalClient: externalClient, informers: sharedInformers, authorizer: authz, cloudConfig: cloudConfig, @@ -157,6 +169,10 @@ func (i *PluginInitializer) Initialize(plugin admission.Interface) { wants.SetInternalKubeClientSet(i.internalClient) } + if wants, ok := plugin.(WantsExternalKubeClientSet); ok { + wants.SetExternalKubeClientSet(i.externalClient) + } + if wants, ok := plugin.(WantsInternalKubeInformerFactory); ok { wants.SetInternalKubeInformerFactory(i.informers) } diff --git a/plugin/pkg/admission/gc/gc_admission_test.go b/plugin/pkg/admission/gc/gc_admission_test.go index 079f6b717c87c..4d7a6aac6a446 100644 --- a/plugin/pkg/admission/gc/gc_admission_test.go +++ b/plugin/pkg/admission/gc/gc_admission_test.go @@ -74,7 +74,7 @@ func newGCPermissionsEnforcement() *gcPermissionsEnforcement { Handler: admission.NewHandler(admission.Create, admission.Update), whiteList: whiteList, } - pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil) + pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil) pluginInitializer.Initialize(gcAdmit) return gcAdmit } diff --git a/plugin/pkg/admission/initialization/BUILD b/plugin/pkg/admission/initialization/BUILD index 88838997ef70a..e8708bcb8fd25 100644 --- a/plugin/pkg/admission/initialization/BUILD +++ b/plugin/pkg/admission/initialization/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -12,6 +13,10 @@ go_library( srcs = ["initialization.go"], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", + "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/kubeapiserver/admission/configuration:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", @@ -19,6 +24,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", ], @@ -36,3 +42,14 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["initialization_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + ], +) diff --git a/plugin/pkg/admission/initialization/initialization.go b/plugin/pkg/admission/initialization/initialization.go index 15855053c4088..864347e81e0cf 100644 --- a/plugin/pkg/admission/initialization/initialization.go +++ b/plugin/pkg/admission/initialization/initialization.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,6 +19,12 @@ package initialization import ( "fmt" "io" + "net" + "net/url" + "os" + "strings" + "syscall" + "time" "github.com/golang/glog" @@ -28,8 +34,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/kubeapiserver/admission/configuration" ) // Register registers a plugin @@ -43,38 +54,131 @@ type initializerOptions struct { Initializers []string } +type InitializationConfig interface { + Run(stopCh <-chan struct{}) + Initializers() (*v1alpha1.InitializerConfiguration, error) +} + type initializer struct { - resources map[schema.GroupResource]initializerOptions + config InitializationConfig authorizer authorizer.Authorizer } -// NewAlwaysAdmit creates a new always admit admission handler +// Retry config loading failures for up to four and a half seconds if the config hasn't been loaded +// yet or if the server is down. Creation requests are delayed during this interval, which prevents +// racy failures during startup until the initializer configuration becomes available. +// TODO: move into InitializationConfigurationManager, since these values depend on the config +// refresh loop. +const ( + retryTemporaryConfigFailures = 8 + retryTemporaryConfigInterval = 550 * time.Millisecond +) + +// NewInitializer creates a new initializer plugin which assigns newly created resources initializers +// based on configuration loaded from the admission API group. +// FUTURE: this may be moved to the storage layer of the apiserver, but for now this is an alpha feature +// that can be disabled. func NewInitializer() admission.Interface { - return &initializer{ - resources: map[schema.GroupResource]initializerOptions{ - //schema.GroupResource{Resource: "pods"}: {Initializers: []string{"Test"}}, - }, - } + return &initializer{} } func (i *initializer) Validate() error { + if i.config == nil { + return fmt.Errorf("the Initializer admission plugin requires a Kubernetes client to be provided") + } + i.config.Run(wait.NeverStop) return nil } +func (i *initializer) SetExternalKubeClientSet(client clientset.Interface) { + i.config = configuration.NewInitializerConfigurationManager(client.Admissionregistration().InitializerConfigurations()) +} + func (i *initializer) SetAuthorizer(a authorizer.Authorizer) { i.authorizer = a } var initializerFieldPath = field.NewPath("metadata", "initializers") +// temporaryConnectionError returns true if the error is considered temporary +func temporaryConnectionError(err error) bool { + if urlError, ok := err.(*url.Error); ok { + if urlError.Temporary() { + return true + } + if opError, ok := urlError.Err.(*net.OpError); ok { + if syscallError, ok := opError.Err.(*os.SyscallError); ok { + if errno, ok := syscallError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { + return true + } + } + } + } + return false +} + +// readConfigWithRetry holds requests instead of failing them if the server is not yet initialized +// or is unresponsive. It formats the returned error for client use if necessary. +func (i *initializer) readConfigWithRetry(a admission.Attributes) (*v1alpha1.InitializerConfiguration, error) { + var lastErr error + for count := 0; count < retryTemporaryConfigFailures; count++ { + if count > 0 { + time.Sleep(retryTemporaryConfigInterval) + } + + // read initializers from config + config, err := i.config.Initializers() + if err == nil { + return config, nil + } + + // if initializer configuration is disabled, fail open + if err == configuration.ErrDisabled { + return &v1alpha1.InitializerConfiguration{}, nil + } + + // retry certain errors + lastErr = err + if err != configuration.ErrNotReady && !temporaryConnectionError(err) { + break + } + } + + e := errors.NewServerTimeout(a.GetResource().GroupResource(), "create", 1) + if lastErr == configuration.ErrNotReady { + e.ErrStatus.Message = fmt.Sprintf("Waiting for initialization configuration to load: %v", lastErr) + e.ErrStatus.Reason = "LoadingConfiguration" + e.ErrStatus.Details.Causes = append(e.ErrStatus.Details.Causes, metav1.StatusCause{ + Type: "InitializerConfigurationPending", + Message: "The server is waiting for the initializer configuration to be loaded.", + }) + } else { + e.ErrStatus.Message = fmt.Sprintf("Unable to refresh the initializer configuration: %v", lastErr) + e.ErrStatus.Reason = "LoadingConfiguration" + e.ErrStatus.Details.Causes = append(e.ErrStatus.Details.Causes, metav1.StatusCause{ + Type: "InitializerConfigurationFailure", + Message: "An error has occurred while refreshing the initializer configuration, no resources can be created until a refresh succeeds.", + }) + } + return nil, e +} + +// Admit checks for create requests to add initializers, or update request to enforce invariants. +// The admission controller fails open if the object doesn't have ObjectMeta (can't be initialized). +// A client with sufficient permission ("initialize" verb on resource) can specify its own initializers +// or an empty initializers struct (which bypasses initialization). Only clients with the initialize verb +// can update objects that have not completed initialization. Sub resources can still be modified on +// resources that are undergoing initialization. +// TODO: once this logic is ready for beta, move it into the REST storage layer. func (i *initializer) Admit(a admission.Attributes) (err error) { - // TODO: sub-resource action should be denied until the object is initialized - if len(a.GetSubresource()) > 0 { + switch a.GetOperation() { + case admission.Create, admission.Update: + default: return nil } - resource, ok := i.resources[a.GetResource().GroupResource()] - if !ok { + // TODO: should sub-resource action should be denied until the object is initialized? + if len(a.GetSubresource()) > 0 { return nil } @@ -87,15 +191,44 @@ func (i *initializer) Admit(a admission.Attributes) (err error) { return nil } existing := accessor.GetInitializers() - // it must be possible for some users to bypass initialization - for now, check the initialize operation if existing != nil { - if err := i.canInitialize(a); err != nil { + glog.V(5).Infof("Admin bypassing initialization for %s", a.GetResource()) + + // it must be possible for some users to bypass initialization - for now, check the initialize operation + if err := i.canInitialize(a, "create with initializers denied"); err != nil { + return err + } + // allow administrators to bypass initialization by setting an empty initializers struct + if len(existing.Pending) == 0 && existing.Result == nil { + accessor.SetInitializers(nil) + return nil + } + } else { + glog.V(5).Infof("Checking initialization for %s", a.GetResource()) + + config, err := i.readConfigWithRetry(a) + if err != nil { return err } - } - // TODO: pull this from config - accessor.SetInitializers(copiedInitializers(resource.Initializers)) + // Mirror pods are exempt from initialization because they are created and initialized + // on the Kubelet before they appear in the API. + // TODO: once this moves to REST storage layer, this becomes a pod specific concern + if pod, ok := a.GetObject().(*api.Pod); ok && pod != nil { + if _, isMirror := pod.Annotations[api.MirrorPodAnnotationKey]; isMirror { + return nil + } + } + + names := findInitializers(config, a.GetResource()) + if len(names) == 0 { + glog.V(5).Infof("No initializers needed") + return nil + } + + glog.V(5).Infof("Found initializers for %s: %v", a.GetResource(), names) + accessor.SetInitializers(newInitializers(names)) + } case admission.Update: accessor, err := meta.Accessor(a.GetObject()) @@ -113,13 +246,20 @@ func (i *initializer) Admit(a admission.Attributes) (err error) { } existing := existingAccessor.GetInitializers() + // updates on initialized resources are allowed + if updated == nil && existing == nil { + return nil + } + + glog.V(5).Infof("Modifying uninitialized resource %s", a.GetResource()) + // because we are called before validation, we need to ensure the update transition is valid. if errs := validation.ValidateInitializersUpdate(updated, existing, initializerFieldPath); len(errs) > 0 { return errors.NewInvalid(a.GetKind().GroupKind(), a.GetName(), errs) } // caller must have the ability to mutate un-initialized resources - if err := i.canInitialize(a); err != nil { + if err := i.canInitialize(a, "update to uninitialized resource denied"); err != nil { return err } @@ -129,7 +269,7 @@ func (i *initializer) Admit(a admission.Attributes) (err error) { return nil } -func (i *initializer) canInitialize(a admission.Attributes) error { +func (i *initializer) canInitialize(a admission.Attributes, message string) error { // if no authorizer is present, the initializer plugin allows modification of uninitialized resources if i.authorizer == nil { glog.V(4).Infof("No authorizer provided to initialization admission control, unable to check permissions") @@ -150,16 +290,17 @@ func (i *initializer) canInitialize(a admission.Attributes) error { return err } if !authorized { - return fmt.Errorf("user must have permission to initialize resources: %s", reason) + return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("%s: %s", message, reason)) } return nil } func (i *initializer) Handles(op admission.Operation) bool { - return true + return op == admission.Create || op == admission.Update } -func copiedInitializers(names []string) *metav1.Initializers { +// newInitializers populates an Initializers struct. +func newInitializers(names []string) *metav1.Initializers { if len(names) == 0 { return nil } @@ -171,3 +312,71 @@ func copiedInitializers(names []string) *metav1.Initializers { Pending: init, } } + +// findInitializers returns the list of initializer names that apply to a config. It returns an empty list +// if no initializers apply. +func findInitializers(initializers *v1alpha1.InitializerConfiguration, gvr schema.GroupVersionResource) []string { + var names []string + for _, init := range initializers.Initializers { + if !matchRule(init.Rules, gvr) { + continue + } + names = append(names, init.Name) + } + return names +} + +// matchRule returns true if any rule matches the provided group version resource. +func matchRule(rules []v1alpha1.Rule, gvr schema.GroupVersionResource) bool { + for _, rule := range rules { + if !hasGroup(rule.APIGroups, gvr.Group) { + return false + } + if !hasVersion(rule.APIVersions, gvr.Version) { + return false + } + if !hasResource(rule.Resources, gvr.Resource) { + return false + } + } + return len(rules) > 0 +} + +func hasGroup(groups []string, group string) bool { + if groups[0] == "*" { + return true + } + for _, g := range groups { + if g == group { + return true + } + } + return false +} + +func hasVersion(versions []string, version string) bool { + if versions[0] == "*" { + return true + } + for _, v := range versions { + if v == version { + return true + } + } + return false +} + +func hasResource(resources []string, resource string) bool { + if resources[0] == "*" || resources[0] == "*/*" { + return true + } + for _, r := range resources { + if strings.Contains(r, "/") { + continue + } + if r == resource { + return true + } + } + return false +} diff --git a/plugin/pkg/admission/initialization/initialization_test.go b/plugin/pkg/admission/initialization/initialization_test.go new file mode 100644 index 0000000000000..b45844c93746e --- /dev/null +++ b/plugin/pkg/admission/initialization/initialization_test.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package initialization + +import ( + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" +) + +func newInitializer(name string, rules ...v1alpha1.Rule) *v1alpha1.InitializerConfiguration { + return addInitializer(&v1alpha1.InitializerConfiguration{}, name, rules...) +} + +func addInitializer(base *v1alpha1.InitializerConfiguration, name string, rules ...v1alpha1.Rule) *v1alpha1.InitializerConfiguration { + base.Initializers = append(base.Initializers, v1alpha1.Initializer{ + Name: name, + Rules: rules, + }) + return base +} + +func TestFindInitializers(t *testing.T) { + type args struct { + initializers *v1alpha1.InitializerConfiguration + gvr schema.GroupVersionResource + } + tests := []struct { + name string + args args + want []string + }{ + { + name: "empty", + args: args{ + gvr: schema.GroupVersionResource{}, + initializers: newInitializer("1"), + }, + }, + { + name: "everything", + args: args{ + gvr: schema.GroupVersionResource{}, + initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{"*"}, APIVersions: []string{"*"}, Resources: []string{"*"}}), + }, + want: []string{"1"}, + }, + { + name: "empty group", + args: args{ + gvr: schema.GroupVersionResource{}, + initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"*"}}), + }, + want: []string{"1"}, + }, + { + name: "pod", + args: args{ + gvr: schema.GroupVersionResource{Resource: "pods"}, + initializers: addInitializer( + newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}), + "2", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}, + ), + }, + want: []string{"1", "2"}, + }, + { + name: "multiple matches", + args: args{ + gvr: schema.GroupVersionResource{Resource: "pods"}, + initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}), + }, + want: []string{"1"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := findInitializers(tt.args.initializers, tt.args.gvr); !reflect.DeepEqual(got, tt.want) { + t.Errorf("findInitializers() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/plugin/pkg/admission/limitranger/admission_test.go b/plugin/pkg/admission/limitranger/admission_test.go index 0e9d1bf68aa9a..135e9914f0a28 100644 --- a/plugin/pkg/admission/limitranger/admission_test.go +++ b/plugin/pkg/admission/limitranger/admission_test.go @@ -595,7 +595,7 @@ func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.Sh if err != nil { return nil, f, err } - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err = admission.Validate(handler) return handler, f, err diff --git a/plugin/pkg/admission/namespace/autoprovision/admission_test.go b/plugin/pkg/admission/namespace/autoprovision/admission_test.go index 36bdb80f6f8e0..b4e7227433e0e 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission_test.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission_test.go @@ -38,7 +38,7 @@ import ( func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewProvision() - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err := admission.Validate(handler) return handler, f, err diff --git a/plugin/pkg/admission/namespace/exists/admission_test.go b/plugin/pkg/admission/namespace/exists/admission_test.go index ebba2ebc67b49..00bdd805f6352 100644 --- a/plugin/pkg/admission/namespace/exists/admission_test.go +++ b/plugin/pkg/admission/namespace/exists/admission_test.go @@ -37,7 +37,7 @@ import ( func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewExists() - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err := admission.Validate(handler) return handler, f, err diff --git a/plugin/pkg/admission/podnodeselector/admission_test.go b/plugin/pkg/admission/podnodeselector/admission_test.go index a7584cd1dc236..f5bc4ac969e2d 100644 --- a/plugin/pkg/admission/podnodeselector/admission_test.go +++ b/plugin/pkg/admission/podnodeselector/admission_test.go @@ -191,7 +191,7 @@ func TestHandles(t *testing.T) { func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) { f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewPodNodeSelector(nil) - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err := admission.Validate(handler) return handler, f, err diff --git a/plugin/pkg/admission/podtolerationrestriction/admission_test.go b/plugin/pkg/admission/podtolerationrestriction/admission_test.go index 16035ee6ddd34..53717f613485b 100644 --- a/plugin/pkg/admission/podtolerationrestriction/admission_test.go +++ b/plugin/pkg/admission/podtolerationrestriction/admission_test.go @@ -193,7 +193,7 @@ func newHandlerForTest(c clientset.Interface) (*podTolerationsPlugin, informers. return nil, nil, err } handler := NewPodTolerationsPlugin(pluginConfig) - pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil) + pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil) pluginInitializer.Initialize(handler) err = admission.Validate(handler) return handler, f, err diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go index 0a86aefa0781e..760f4fc3da7c5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go @@ -41,7 +41,7 @@ type AdmissionOptions struct { func NewAdmissionOptions() *AdmissionOptions { options := &AdmissionOptions{ Plugins: &admission.Plugins{}, - PluginNames: []string{"Initializers"}, + PluginNames: []string{}, } server.RegisterAllAdmissionPlugins(options.Plugins) return options diff --git a/test/e2e/extension/BUILD b/test/e2e/extension/BUILD index bc9bf9064a43d..070760a98c6a9 100644 --- a/test/e2e/extension/BUILD +++ b/test/e2e/extension/BUILD @@ -13,6 +13,9 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/retry:go_default_library", "//test/e2e/framework:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", diff --git a/test/e2e/extension/initializers.go b/test/e2e/extension/initializers.go index 49922ac2b3fa5..4b7a8ffa78d0a 100644 --- a/test/e2e/extension/initializers.go +++ b/test/e2e/extension/initializers.go @@ -23,10 +23,14 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + clientretry "k8s.io/kubernetes/pkg/client/retry" "k8s.io/kubernetes/test/e2e/framework" ) @@ -65,10 +69,19 @@ var _ = framework.KubeDescribe("Initializers", func() { // verify that we can update an initializing pod pod, err := c.Core().Pods(ns).Get(podName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) pod.Annotations = map[string]string{"update-1": "test"} pod, err = c.Core().Pods(ns).Update(pod) Expect(err).NotTo(HaveOccurred()) + // verify the list call filters out uninitialized pods + pods, err := c.Core().Pods(ns).List(metav1.ListOptions{IncludeUninitialized: true}) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(1)) + pods, err = c.Core().Pods(ns).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(0)) + // clear initializers pod.Initializers = nil pod, err = c.Core().Pods(ns).Update(pod) @@ -93,17 +106,120 @@ var _ = framework.KubeDescribe("Initializers", func() { } }) + It("should dynamically register and apply initializers to pods [Serial]", func() { + ns := f.Namespace.Name + c := f.ClientSet + + podName := "uninitialized-pod" + framework.Logf("Creating pod %s", podName) + + // create and register an initializer + initializerName := "pod.test.e2e.kubernetes.io" + initializerConfigName := "e2e-test-initializer" + _, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(&v1alpha1.InitializerConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName}, + Initializers: []v1alpha1.Initializer{ + { + Name: initializerName, + Rules: []v1alpha1.Rule{ + {APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}, + }, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + // we must remove the initializer when the test is complete and ensure no pods are pending for that initializer + defer func() { + if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) { + framework.Logf("got error on deleting %s", initializerConfigName) + } + // poller configuration is 1 second, wait at least that long + time.Sleep(3 * time.Second) + // clear our initializer from anyone who got it + removeInitializersFromAllPods(c, initializerName) + }() + + // poller configuration is 1 second, wait at least that long + time.Sleep(3 * time.Second) + + // run create that blocks + ch := make(chan struct{}) + go func() { + defer close(ch) + _, err := c.Core().Pods(ns).Create(newPod(podName)) + Expect(err).NotTo(HaveOccurred()) + }() + + // wait until the pod shows up uninitialized + By("Waiting until the pod is visible to a client") + var pod *v1.Pod + err = wait.PollImmediate(2*time.Second, 15*time.Second, func() (bool, error) { + pod, err = c.Core().Pods(ns).Get(podName, metav1.GetOptions{IncludeUninitialized: true}) + if errors.IsNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil + }) + Expect(err).NotTo(HaveOccurred()) + Expect(pod.Initializers).NotTo(BeNil()) + Expect(pod.Initializers.Pending).To(HaveLen(1)) + Expect(pod.Initializers.Pending[0].Name).To(Equal(initializerName)) + + // pretend we are an initializer + By("Completing initialization") + pod.Initializers = nil + pod, err = c.Core().Pods(ns).Update(pod) + Expect(err).NotTo(HaveOccurred()) + + // ensure create call returns + <-ch + + // pod should now start running + err = framework.WaitForPodRunningInNamespace(c, pod) + Expect(err).NotTo(HaveOccurred()) + + // bypass initialization by explicitly passing an empty pending list + By("Setting an empty initializer as an admin to bypass initialization") + podName = "preinitialized-pod" + pod = newUninitializedPod(podName) + pod.Initializers.Pending = nil + pod, err = c.Core().Pods(ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + Expect(pod.Initializers).To(BeNil()) + + // bypass initialization for mirror pods + By("Creating a mirror pod that bypasses initialization") + podName = "mirror-pod" + pod = newPod(podName) + pod.Annotations = map[string]string{ + v1.MirrorPodAnnotationKey: "true", + } + pod.Spec.NodeName = "node-does-not-yet-exist" + pod, err = c.Core().Pods(ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + Expect(pod.Initializers).To(BeNil()) + Expect(pod.Annotations[v1.MirrorPodAnnotationKey]).To(Equal("true")) + }) }) func newUninitializedPod(podName string) *v1.Pod { + pod := newPod(podName) + pod.Initializers = &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Test"}}, + } + return pod +} + +func newPod(podName string) *v1.Pod { containerName := fmt.Sprintf("%s-container", podName) port := 8080 pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, - Initializers: &metav1.Initializers{ - Pending: []metav1.Initializer{{Name: "Test"}}, - }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -119,3 +235,48 @@ func newUninitializedPod(podName string) *v1.Pod { } return pod } + +// removeInitializersFromAllPods walks all pods and ensures they don't have the provided initializer, +// to guarantee completing the test doesn't block the entire cluster. +func removeInitializersFromAllPods(c clientset.Interface, initializerName string) { + pods, err := c.Core().Pods("").List(metav1.ListOptions{IncludeUninitialized: true}) + if err != nil { + return + } + for _, p := range pods.Items { + if p.Initializers == nil { + continue + } + err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error { + pod, err := c.Core().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{IncludeUninitialized: true}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + if pod.Initializers == nil { + return nil + } + var updated []metav1.Initializer + for _, pending := range pod.Initializers.Pending { + if pending.Name != initializerName { + updated = append(updated, pending) + } + } + if len(updated) == len(pod.Initializers.Pending) { + return nil + } + pod.Initializers.Pending = updated + if len(updated) == 0 { + pod.Initializers = nil + } + framework.Logf("Found initializer on pod %s in ns %s", pod.Name, pod.Namespace) + _, err = c.Core().Pods(p.Namespace).Update(pod) + return err + }) + if err != nil { + framework.Logf("Unable to remove initializer from pod %s in ns %s: %v", p.Name, p.Namespace, err) + } + } +}