Skip to content

Commit

Permalink
refactor: use the provided sharedInformerFactory for params
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Zielenski committed Jul 20, 2023
1 parent 9a59520 commit 6323c10
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func NewAdmissionController(
restMapper,
client,
dynamicClient,
informerFactory,
nil,
NewMatcher(matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client)),
generic.NewInformer[*v1alpha1.ValidatingAdmissionPolicy](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ import (
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
)

type policyController struct {
once sync.Once
context context.Context
dynamicClient dynamic.Interface
informerFactory informers.SharedInformerFactory
restMapper meta.RESTMapper
policyDefinitionsController generic.Controller[*v1alpha1.ValidatingAdmissionPolicy]
policyBindingController generic.Controller[*v1alpha1.ValidatingAdmissionPolicyBinding]
Expand All @@ -61,13 +61,10 @@ type policyController struct {

newValidator

// Lock which protects:
// - cachedPolicies
// - paramCRDControllers
// - definitionInfo
// - bindingInfos
// - definitionsToBindings
// All other fields should be assumed constant
client kubernetes.Interface
// Lock which protects
// All Below fields
// All above fields should be assumed constant
mutex sync.RWMutex

cachedPolicies []policyData
Expand All @@ -88,8 +85,6 @@ type policyController struct {
// All keys must have at least one dependent binding
// All binding names MUST exist as a key bindingInfos
definitionsToBindings map[namespacedName]sets.Set[namespacedName]

client kubernetes.Interface
}

type newValidator func(validationFilter cel.Filter, celMatcher matchconditions.Matcher, auditAnnotationFilter, messageFilter cel.Filter, failurePolicy *v1.FailurePolicyType) Validator
Expand All @@ -98,6 +93,7 @@ func newPolicyController(
restMapper meta.RESTMapper,
client kubernetes.Interface,
dynamicClient dynamic.Interface,
informerFactory informers.SharedInformerFactory,
filterCompiler cel.FilterCompiler,
matcher Matcher,
policiesInformer generic.Informer[*v1alpha1.ValidatingAdmissionPolicy],
Expand Down Expand Up @@ -128,9 +124,10 @@ func newPolicyController(
Name: "cel-policy-bindings",
},
),
restMapper: restMapper,
dynamicClient: dynamicClient,
client: client,
restMapper: restMapper,
dynamicClient: dynamicClient,
informerFactory: informerFactory,
client: client,
}
return res
}
Expand Down Expand Up @@ -235,7 +232,6 @@ func (c *policyController) reconcilePolicyDefinitionSpec(namespace, name string,
// Skip setting up controller for empty param type
return nil
}

// find GVR for params
// Parse param source into a GVK

Expand All @@ -262,101 +258,74 @@ func (c *policyController) reconcilePolicyDefinitionSpec(namespace, name string,
return info.configurationError
}

if info, ok := c.paramsCRDControllers[*paramSource]; ok {
// If a param controller is already active for this paramsource, make
// sure it is tracking this policy's dependency upon it
info.dependentDefinitions.Insert(nn)
paramInfo := c.ensureParamInfo(paramSource, paramsGVR.Resource)
paramInfo.dependentDefinitions.Insert(nn)

} else {
instanceContext, instanceCancel := context.WithCancel(c.context)

var informer cache.SharedIndexInformer

// Informer Factory is optional
if c.client != nil {
// Create temporary informer factory
// Cannot use the k8s shared informer factory for dynamic params informer.
// Would leak unnecessary informers when we are done since we would have to
// call informerFactory.Start() with a longer-lived stopCh than necessary.
// SharedInformerFactory does not support temporary usage.
dynamicFactory := informers.NewSharedInformerFactory(c.client, 10*time.Minute)

// Look for a typed informer. If it does not exist
genericInformer, err := dynamicFactory.ForResource(paramsGVR.Resource)

// Ignore error. We fallback to dynamic informer if there is no
// typed informer
if err != nil {
informer = nil
} else {
informer = genericInformer.Informer()

// Set transformer on the informer to workaround inconsistency
// where typed objects have TypeMeta wiped out but dynamic
// objects keep kind/apiVersion fields
informer.SetTransform(func(i interface{}) (interface{}, error) {
// Ensure param is populated with its GVK for consistency
// (CRD dynamic informer always returns objects with kind/apiversion,
// but native types do not include populated TypeMeta.
if param := i.(runtime.Object); param != nil {
if param.GetObjectKind().GroupVersionKind().Empty() {
// https://github.com/kubernetes/client-go/issues/413#issue-324586398
gvks, _, _ := k8sscheme.Scheme.ObjectKinds(param)
for _, gvk := range gvks {
if len(gvk.Kind) == 0 {
continue
}
if len(gvk.Version) == 0 || gvk.Version == runtime.APIVersionInternal {
continue
}
param.GetObjectKind().SetGroupVersionKind(gvk)
break
}
}
}
return nil
}

return i, nil
})
}
}
// Ensures that there is an informer started for the given GVK to be used as a
// param
func (c *policyController) ensureParamInfo(paramSource *v1alpha1.ParamKind, paramsGVR schema.GroupVersionResource) *paramInfo {
if info, ok := c.paramsCRDControllers[*paramSource]; ok {
return info
}

if informer == nil {
// Dynamic JSON informer fallback.
// Cannot use shared dynamic informer since it would be impossible
// to clean CRD informers properly with multiple dependents
// (cannot start ahead of time, and cannot track dependencies via stopCh)
informer = dynamicinformer.NewFilteredDynamicInformer(
c.dynamicClient,
paramsGVR.Resource,
corev1.NamespaceAll,
// Use same interval as is used for k8s typed sharedInformerFactory
// https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430
10*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
nil,
).Informer()
}
// We are not watching this param. Start an informer for it.
instanceContext, instanceCancel := context.WithCancel(c.context)

controller := generic.NewController(
generic.NewInformer[runtime.Object](informer),
c.reconcileParams,
generic.ControllerOptions{
Workers: 1,
Name: paramSource.String() + "-controller",
},
)
var informer cache.SharedIndexInformer

c.paramsCRDControllers[*paramSource] = &paramInfo{
controller: controller,
stop: instanceCancel,
dependentDefinitions: sets.New(nn),
}
// Try to see if our provided informer factory has an informer for this type.
// We assume the informer is already started, and starts all types associated
// with it.
if genericInformer, err := c.informerFactory.ForResource(paramsGVR); err == nil {
informer = genericInformer.Informer()

go controller.Run(instanceContext)
// Ensure the informer is started
// Use policyController's context rather than the instance context.
// PolicyController context is expected to last until app shutdown
// This is due to behavior of informerFactory which would cause the
// informer to stop running once the context is cancelled, and
// never started again.
c.informerFactory.Start(c.context.Done())
} else {
// Dynamic JSON informer fallback.
// Cannot use shared dynamic informer since it would be impossible
// to clean CRD informers properly with multiple dependents
// (cannot start ahead of time, and cannot track dependencies via stopCh)
informer = dynamicinformer.NewFilteredDynamicInformer(
c.dynamicClient,
paramsGVR,
corev1.NamespaceAll,
// Use same interval as is used for k8s typed sharedInformerFactory
// https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430
10*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
nil,
).Informer()
go informer.Run(instanceContext.Done())
}

return nil
controller := generic.NewController(
generic.NewInformer[runtime.Object](informer),
c.reconcileParams,
generic.ControllerOptions{
Workers: 1,
Name: paramSource.String() + "-controller",
},
)

ret := &paramInfo{
controller: controller,
stop: instanceCancel,
dependentDefinitions: sets.New[namespacedName](),
}
c.paramsCRDControllers[*paramSource] = ret

go controller.Run(instanceContext)
return ret

}

func (c *policyController) reconcilePolicyBinding(namespace, name string, binding *v1alpha1.ValidatingAdmissionPolicyBinding) error {
Expand Down

0 comments on commit 6323c10

Please sign in to comment.