From 6323c106e9b5b0edd452a2a223d569a5dae8a832 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Mon, 12 Jun 2023 18:19:33 -0700 Subject: [PATCH] refactor: use the provided sharedInformerFactory for params --- .../validatingadmissionpolicy/controller.go | 1 + .../controller_reconcile.go | 169 +++++++----------- 2 files changed, 70 insertions(+), 100 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go index f0ca174380905..f228cab53e157 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go @@ -134,6 +134,7 @@ func NewAdmissionController( restMapper, client, dynamicClient, + informerFactory, nil, NewMatcher(matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client)), generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicy]( diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go index 5e0773d8bd744..b7e62d068614e 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go @@ -40,7 +40,6 @@ import ( "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - k8sscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" ) @@ -48,6 +47,7 @@ type policyController struct { once sync.Once context context.Context dynamicClient dynamic.Interface + informerFactory informers.SharedInformerFactory restMapper meta.RESTMapper policyDefinitionsController generic.Controller[*v1alpha1.ValidatingAdmissionPolicy] policyBindingController generic.Controller[*v1alpha1.ValidatingAdmissionPolicyBinding] @@ -61,13 +61,10 @@ type policyController struct { newValidator - // Lock which protects: - // - cachedPolicies - // - paramCRDControllers - // - definitionInfo - // - bindingInfos - // - definitionsToBindings - // All other fields should be assumed constant + client kubernetes.Interface + // Lock which protects + // All Below fields + // All above fields should be assumed constant mutex sync.RWMutex cachedPolicies []policyData @@ -88,8 +85,6 @@ type policyController struct { // All keys must have at least one dependent binding // All binding names MUST exist as a key bindingInfos definitionsToBindings map[namespacedName]sets.Set[namespacedName] - - client kubernetes.Interface } type newValidator func(validationFilter cel.Filter, celMatcher matchconditions.Matcher, auditAnnotationFilter, messageFilter cel.Filter, failurePolicy *v1.FailurePolicyType) Validator @@ -98,6 +93,7 @@ func newPolicyController( restMapper meta.RESTMapper, client kubernetes.Interface, dynamicClient dynamic.Interface, + informerFactory informers.SharedInformerFactory, filterCompiler cel.FilterCompiler, matcher Matcher, policiesInformer generic.Informer[*v1alpha1.ValidatingAdmissionPolicy], @@ -128,9 +124,10 @@ func newPolicyController( Name: "cel-policy-bindings", }, ), - restMapper: restMapper, - dynamicClient: dynamicClient, - client: client, + restMapper: restMapper, + dynamicClient: dynamicClient, + informerFactory: informerFactory, + client: client, } return res } @@ -235,7 +232,6 @@ func (c *policyController) reconcilePolicyDefinitionSpec(namespace, name string, // Skip setting up controller for empty param type return nil } - // find GVR for params // Parse param source into a GVK @@ -262,101 +258,74 @@ func (c *policyController) reconcilePolicyDefinitionSpec(namespace, name string, return info.configurationError } - if info, ok := c.paramsCRDControllers[*paramSource]; ok { - // If a param controller is already active for this paramsource, make - // sure it is tracking this policy's dependency upon it - info.dependentDefinitions.Insert(nn) + paramInfo := c.ensureParamInfo(paramSource, paramsGVR.Resource) + paramInfo.dependentDefinitions.Insert(nn) - } else { - instanceContext, instanceCancel := context.WithCancel(c.context) - - var informer cache.SharedIndexInformer - - // Informer Factory is optional - if c.client != nil { - // Create temporary informer factory - // Cannot use the k8s shared informer factory for dynamic params informer. - // Would leak unnecessary informers when we are done since we would have to - // call informerFactory.Start() with a longer-lived stopCh than necessary. - // SharedInformerFactory does not support temporary usage. - dynamicFactory := informers.NewSharedInformerFactory(c.client, 10*time.Minute) - - // Look for a typed informer. If it does not exist - genericInformer, err := dynamicFactory.ForResource(paramsGVR.Resource) - - // Ignore error. We fallback to dynamic informer if there is no - // typed informer - if err != nil { - informer = nil - } else { - informer = genericInformer.Informer() - - // Set transformer on the informer to workaround inconsistency - // where typed objects have TypeMeta wiped out but dynamic - // objects keep kind/apiVersion fields - informer.SetTransform(func(i interface{}) (interface{}, error) { - // Ensure param is populated with its GVK for consistency - // (CRD dynamic informer always returns objects with kind/apiversion, - // but native types do not include populated TypeMeta. - if param := i.(runtime.Object); param != nil { - if param.GetObjectKind().GroupVersionKind().Empty() { - // https://github.com/kubernetes/client-go/issues/413#issue-324586398 - gvks, _, _ := k8sscheme.Scheme.ObjectKinds(param) - for _, gvk := range gvks { - if len(gvk.Kind) == 0 { - continue - } - if len(gvk.Version) == 0 || gvk.Version == runtime.APIVersionInternal { - continue - } - param.GetObjectKind().SetGroupVersionKind(gvk) - break - } - } - } + return nil +} - return i, nil - }) - } - } +// Ensures that there is an informer started for the given GVK to be used as a +// param +func (c *policyController) ensureParamInfo(paramSource *v1alpha1.ParamKind, paramsGVR schema.GroupVersionResource) *paramInfo { + if info, ok := c.paramsCRDControllers[*paramSource]; ok { + return info + } - if informer == nil { - // Dynamic JSON informer fallback. - // Cannot use shared dynamic informer since it would be impossible - // to clean CRD informers properly with multiple dependents - // (cannot start ahead of time, and cannot track dependencies via stopCh) - informer = dynamicinformer.NewFilteredDynamicInformer( - c.dynamicClient, - paramsGVR.Resource, - corev1.NamespaceAll, - // Use same interval as is used for k8s typed sharedInformerFactory - // https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430 - 10*time.Minute, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - nil, - ).Informer() - } + // We are not watching this param. Start an informer for it. + instanceContext, instanceCancel := context.WithCancel(c.context) - controller := generic.NewController( - generic.NewInformer[runtime.Object](informer), - c.reconcileParams, - generic.ControllerOptions{ - Workers: 1, - Name: paramSource.String() + "-controller", - }, - ) + var informer cache.SharedIndexInformer - c.paramsCRDControllers[*paramSource] = ¶mInfo{ - controller: controller, - stop: instanceCancel, - dependentDefinitions: sets.New(nn), - } + // Try to see if our provided informer factory has an informer for this type. + // We assume the informer is already started, and starts all types associated + // with it. + if genericInformer, err := c.informerFactory.ForResource(paramsGVR); err == nil { + informer = genericInformer.Informer() - go controller.Run(instanceContext) + // Ensure the informer is started + // Use policyController's context rather than the instance context. + // PolicyController context is expected to last until app shutdown + // This is due to behavior of informerFactory which would cause the + // informer to stop running once the context is cancelled, and + // never started again. + c.informerFactory.Start(c.context.Done()) + } else { + // Dynamic JSON informer fallback. + // Cannot use shared dynamic informer since it would be impossible + // to clean CRD informers properly with multiple dependents + // (cannot start ahead of time, and cannot track dependencies via stopCh) + informer = dynamicinformer.NewFilteredDynamicInformer( + c.dynamicClient, + paramsGVR, + corev1.NamespaceAll, + // Use same interval as is used for k8s typed sharedInformerFactory + // https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430 + 10*time.Minute, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + nil, + ).Informer() go informer.Run(instanceContext.Done()) } - return nil + controller := generic.NewController( + generic.NewInformer[runtime.Object](informer), + c.reconcileParams, + generic.ControllerOptions{ + Workers: 1, + Name: paramSource.String() + "-controller", + }, + ) + + ret := ¶mInfo{ + controller: controller, + stop: instanceCancel, + dependentDefinitions: sets.New[namespacedName](), + } + c.paramsCRDControllers[*paramSource] = ret + + go controller.Run(instanceContext) + return ret + } func (c *policyController) reconcilePolicyBinding(namespace, name string, binding *v1alpha1.ValidatingAdmissionPolicyBinding) error {