diff --git a/pkg/apis/internalversion/stage_types.go b/pkg/apis/internalversion/stage_types.go index 3e368cb6a6..d73d5cd7ed 100644 --- a/pkg/apis/internalversion/stage_types.go +++ b/pkg/apis/internalversion/stage_types.go @@ -137,12 +137,20 @@ type StageSelector struct { MatchAnnotations map[string]string // MatchExpressions is a list of label selector requirements. The requirements are ANDed. MatchExpressions []SelectorRequirement + // MatchConditions is a list of label selector conditions. The conditions are ANDed. + MatchConditions []SelectorCondition +} + +// SelectorCondition is a resource selector condition is a set of conditions that must be true for a match. +type SelectorCondition struct { + // Expression represents the expression which will be evaluated by CEL. + Expression string } // SelectorRequirement is a resource selector requirement is a selector that contains values, a key, // and an operator that relates the key and values. type SelectorRequirement struct { - // The name of the scope that the selector applies to. + // Key represents the expression which will be evaluated by JQ. Key string // Represents a scope's relationship to a set of values. Operator SelectorOperator diff --git a/pkg/apis/internalversion/zz_generated.conversion.go b/pkg/apis/internalversion/zz_generated.conversion.go index 0b9a3c6aa9..7e62a98ecd 100644 --- a/pkg/apis/internalversion/zz_generated.conversion.go +++ b/pkg/apis/internalversion/zz_generated.conversion.go @@ -550,6 +550,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*SelectorCondition)(nil), (*v1alpha1.SelectorCondition)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_internalversion_SelectorCondition_To_v1alpha1_SelectorCondition(a.(*SelectorCondition), b.(*v1alpha1.SelectorCondition), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha1.SelectorCondition)(nil), (*SelectorCondition)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_SelectorCondition_To_internalversion_SelectorCondition(a.(*v1alpha1.SelectorCondition), b.(*SelectorCondition), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*SelectorRequirement)(nil), (*v1alpha1.SelectorRequirement)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_internalversion_SelectorRequirement_To_v1alpha1_SelectorRequirement(a.(*SelectorRequirement), b.(*v1alpha1.SelectorRequirement), scope) }); err != nil { @@ -2330,6 +2340,26 @@ func Convert_v1alpha1_SecurityContext_To_internalversion_SecurityContext(in *v1a return autoConvert_v1alpha1_SecurityContext_To_internalversion_SecurityContext(in, out, s) } +func autoConvert_internalversion_SelectorCondition_To_v1alpha1_SelectorCondition(in *SelectorCondition, out *v1alpha1.SelectorCondition, s conversion.Scope) error { + out.Expression = in.Expression + return nil +} + +// Convert_internalversion_SelectorCondition_To_v1alpha1_SelectorCondition is an autogenerated conversion function. +func Convert_internalversion_SelectorCondition_To_v1alpha1_SelectorCondition(in *SelectorCondition, out *v1alpha1.SelectorCondition, s conversion.Scope) error { + return autoConvert_internalversion_SelectorCondition_To_v1alpha1_SelectorCondition(in, out, s) +} + +func autoConvert_v1alpha1_SelectorCondition_To_internalversion_SelectorCondition(in *v1alpha1.SelectorCondition, out *SelectorCondition, s conversion.Scope) error { + out.Expression = in.Expression + return nil +} + +// Convert_v1alpha1_SelectorCondition_To_internalversion_SelectorCondition is an autogenerated conversion function. +func Convert_v1alpha1_SelectorCondition_To_internalversion_SelectorCondition(in *v1alpha1.SelectorCondition, out *SelectorCondition, s conversion.Scope) error { + return autoConvert_v1alpha1_SelectorCondition_To_internalversion_SelectorCondition(in, out, s) +} + func autoConvert_internalversion_SelectorRequirement_To_v1alpha1_SelectorRequirement(in *SelectorRequirement, out *v1alpha1.SelectorRequirement, s conversion.Scope) error { out.Key = in.Key out.Operator = v1alpha1.SelectorOperator(in.Operator) @@ -2516,6 +2546,7 @@ func autoConvert_internalversion_StageSelector_To_v1alpha1_StageSelector(in *Sta out.MatchLabels = *(*map[string]string)(unsafe.Pointer(&in.MatchLabels)) out.MatchAnnotations = *(*map[string]string)(unsafe.Pointer(&in.MatchAnnotations)) out.MatchExpressions = *(*[]v1alpha1.SelectorRequirement)(unsafe.Pointer(&in.MatchExpressions)) + out.MatchConditions = *(*[]v1alpha1.SelectorCondition)(unsafe.Pointer(&in.MatchConditions)) return nil } @@ -2528,6 +2559,7 @@ func autoConvert_v1alpha1_StageSelector_To_internalversion_StageSelector(in *v1a out.MatchLabels = *(*map[string]string)(unsafe.Pointer(&in.MatchLabels)) out.MatchAnnotations = *(*map[string]string)(unsafe.Pointer(&in.MatchAnnotations)) out.MatchExpressions = *(*[]SelectorRequirement)(unsafe.Pointer(&in.MatchExpressions)) + out.MatchConditions = *(*[]SelectorCondition)(unsafe.Pointer(&in.MatchConditions)) return nil } diff --git a/pkg/apis/internalversion/zz_generated.deepcopy.go b/pkg/apis/internalversion/zz_generated.deepcopy.go index b3d4420a58..42ac067572 100644 --- a/pkg/apis/internalversion/zz_generated.deepcopy.go +++ b/pkg/apis/internalversion/zz_generated.deepcopy.go @@ -1162,6 +1162,22 @@ func (in *SecurityContext) DeepCopy() *SecurityContext { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SelectorCondition) DeepCopyInto(out *SelectorCondition) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SelectorCondition. +func (in *SelectorCondition) DeepCopy() *SelectorCondition { + if in == nil { + return nil + } + out := new(SelectorCondition) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SelectorRequirement) DeepCopyInto(out *SelectorRequirement) { *out = *in @@ -1350,6 +1366,11 @@ func (in *StageSelector) DeepCopyInto(out *StageSelector) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.MatchConditions != nil { + in, out := &in.MatchConditions, &out.MatchConditions + *out = make([]SelectorCondition, len(*in)) + copy(*out, *in) + } return } diff --git a/pkg/apis/v1alpha1/stage_types.go b/pkg/apis/v1alpha1/stage_types.go index a7f6baede0..07aa48f38f 100644 --- a/pkg/apis/v1alpha1/stage_types.go +++ b/pkg/apis/v1alpha1/stage_types.go @@ -176,12 +176,20 @@ type StageSelector struct { MatchAnnotations map[string]string `json:"matchAnnotations,omitempty"` // MatchExpressions is a list of label selector requirements. The requirements are ANDed. MatchExpressions []SelectorRequirement `json:"matchExpressions,omitempty"` + // MatchConditions is a list of label selector conditions. The conditions are ANDed. + MatchConditions []SelectorCondition `json:"matchConditions,omitempty"` +} + +// SelectorCondition is a resource selector condition is a set of conditions that must be true for a match. +type SelectorCondition struct { + // Expression represents the expression which will be evaluated by CEL. + Expression string `json:"expression"` } // SelectorRequirement is a resource selector requirement is a selector that contains values, a key, // and an operator that relates the key and values. type SelectorRequirement struct { - // The name of the scope that the selector applies to. + // Key represents the expression which will be evaluated by JQ. Key string `json:"key"` // Represents a scope's relationship to a set of values. Operator SelectorOperator `json:"operator"` diff --git a/pkg/apis/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/v1alpha1/zz_generated.deepcopy.go index 2d726b62a5..ca3aaedc26 100644 --- a/pkg/apis/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/v1alpha1/zz_generated.deepcopy.go @@ -1628,6 +1628,22 @@ func (in *SecurityContext) DeepCopy() *SecurityContext { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SelectorCondition) DeepCopyInto(out *SelectorCondition) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SelectorCondition. +func (in *SelectorCondition) DeepCopy() *SelectorCondition { + if in == nil { + return nil + } + out := new(SelectorCondition) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SelectorRequirement) DeepCopyInto(out *SelectorRequirement) { *out = *in @@ -1864,6 +1880,11 @@ func (in *StageSelector) DeepCopyInto(out *StageSelector) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.MatchConditions != nil { + in, out := &in.MatchConditions, &out.MatchConditions + *out = make([]SelectorCondition, len(*in)) + copy(*out, *in) + } return } diff --git a/pkg/kwok/controllers/controller.go b/pkg/kwok/controllers/controller.go index 7b7c0d024d..3660b75ca0 100644 --- a/pkg/kwok/controllers/controller.go +++ b/pkg/kwok/controllers/controller.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "maps" "os" "strconv" "strings" @@ -47,6 +48,7 @@ import ( "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/consts" "sigs.k8s.io/kwok/pkg/log" + "sigs.k8s.io/kwok/pkg/utils/cel" "sigs.k8s.io/kwok/pkg/utils/client" "sigs.k8s.io/kwok/pkg/utils/gotpl" "sigs.k8s.io/kwok/pkg/utils/informer" @@ -103,6 +105,7 @@ var ( // Controller is a fake kubelet implementation that can be used to test type Controller struct { conf Config + env *cel.Environment stagesManager *StagesManager @@ -198,7 +201,26 @@ func NewController(conf Config) (*Controller, error) { return nil, err } + types := slices.Clone(cel.DefaultTypes) + conversions := slices.Clone(cel.DefaultConversions) + funcs := maps.Clone(cel.DefaultFuncs) + methods := maps.Clone(cel.FuncsToMethods(cel.DefaultFuncs)) + vars := map[string]any{ + "self": nil, + } + env, err := cel.NewEnvironment(cel.EnvironmentConfig{ + Types: types, + Conversions: conversions, + Methods: methods, + Funcs: funcs, + Vars: vars, + }) + if err != nil { + return nil, err + } + c := &Controller{ + env: env, conf: conf, } @@ -581,7 +603,7 @@ func (c *Controller) Start(ctx context.Context) error { if len(c.conf.LocalStages) != 0 { for ref, stage := range c.conf.LocalStages { - lifecycle, err := NewLifecycle(stage) + lifecycle, err := NewLifecycle(stage, c.env) if err != nil { return err } diff --git a/pkg/kwok/controllers/lifecycle.go b/pkg/kwok/controllers/lifecycle.go index 937b0b73b1..fc5533cfd2 100644 --- a/pkg/kwok/controllers/lifecycle.go +++ b/pkg/kwok/controllers/lifecycle.go @@ -25,15 +25,16 @@ import ( "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/kwok/pkg/apis/internalversion" + "sigs.k8s.io/kwok/pkg/utils/cel" "sigs.k8s.io/kwok/pkg/utils/expression" "sigs.k8s.io/kwok/pkg/utils/format" ) // NewLifecycle returns a new Lifecycle. -func NewLifecycle(stages []*internalversion.Stage) (Lifecycle, error) { +func NewLifecycle(stages []*internalversion.Stage, env *cel.Environment) (Lifecycle, error) { lcs := Lifecycle{} for _, stage := range stages { - lc, err := NewLifecycleStage(stage) + lc, err := NewLifecycleStage(stage, env) if err != nil { return nil, fmt.Errorf("lifecycle stage: %w", err) } @@ -48,10 +49,10 @@ func NewLifecycle(stages []*internalversion.Stage) (Lifecycle, error) { // Lifecycle is a list of lifecycle stage. type Lifecycle []*LifecycleStage -func (s Lifecycle) match(label, annotation labels.Set, data interface{}) ([]*LifecycleStage, error) { +func (s Lifecycle) match(ctx context.Context, label, annotation labels.Set, data, jsonStandard any) ([]*LifecycleStage, error) { out := []*LifecycleStage{} for _, stage := range s { - ok, err := stage.match(label, annotation, data) + ok, err := stage.match(ctx, label, annotation, data, jsonStandard) if err != nil { return nil, err } @@ -63,12 +64,8 @@ func (s Lifecycle) match(label, annotation labels.Set, data interface{}) ([]*Lif } // Match returns matched stage. -func (s Lifecycle) Match(label, annotation labels.Set, data interface{}) (*LifecycleStage, error) { - data, err := expression.ToJSONStandard(data) - if err != nil { - return nil, err - } - stages, err := s.match(label, annotation, data) +func (s Lifecycle) Match(ctx context.Context, label, annotation labels.Set, data, jsonStandard any) (*LifecycleStage, error) { + stages, err := s.match(ctx, label, annotation, data, jsonStandard) if err != nil { return nil, err } @@ -102,7 +99,7 @@ func (s Lifecycle) Match(label, annotation labels.Set, data interface{}) (*Lifec } // NewLifecycleStage returns a new LifecycleStage. -func NewLifecycleStage(s *internalversion.Stage) (*LifecycleStage, error) { +func NewLifecycleStage(s *internalversion.Stage, env *cel.Environment) (*LifecycleStage, error) { stage := &LifecycleStage{ name: s.Name, } @@ -126,6 +123,15 @@ func NewLifecycleStage(s *internalversion.Stage) (*LifecycleStage, error) { stage.matchExpressions = append(stage.matchExpressions, requirement) } } + if selector.MatchConditions != nil { + for _, express := range selector.MatchConditions { + program, err := env.Compile(express.Expression) + if err != nil { + return nil, err + } + stage.matchConditions = append(stage.matchConditions, program) + } + } stage.next = &s.Spec.Next if delay := s.Spec.Delay; delay != nil { @@ -177,6 +183,7 @@ type LifecycleStage struct { matchLabels labels.Selector matchAnnotations labels.Selector matchExpressions []*expression.Requirement + matchConditions []cel.Program weight int next *internalversion.StageNext @@ -187,7 +194,7 @@ type LifecycleStage struct { immediateNextStage bool } -func (s *LifecycleStage) match(label, annotation labels.Set, jsonStandard interface{}) (bool, error) { +func (s *LifecycleStage) match(ctx context.Context, label, annotation labels.Set, data, jsonStandard any) (bool, error) { if s.matchLabels != nil { if !s.matchLabels.Matches(label) { return false, nil @@ -201,7 +208,7 @@ func (s *LifecycleStage) match(label, annotation labels.Set, jsonStandard interf if s.matchExpressions != nil { for _, requirement := range s.matchExpressions { - ok, err := requirement.Matches(context.Background(), jsonStandard) + ok, err := requirement.Matches(ctx, jsonStandard) if err != nil { return false, err } @@ -210,6 +217,25 @@ func (s *LifecycleStage) match(label, annotation labels.Set, jsonStandard interf } } } + + if s.matchConditions != nil { + for _, program := range s.matchConditions { + val, _, err := program.ContextEval(ctx, map[string]any{ + "self": data, + }) + if err != nil { + return false, err + } + ok, err := cel.AsBool(val) + if err != nil { + return false, err + } + if !ok { + return false, nil + } + } + } + return true, nil } diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index fffd96f575..daa9d997f4 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -352,7 +352,7 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro } lifecycle := c.lifecycle.Get() - stage, err := lifecycle.Match(node.Labels, node.Annotations, data) + stage, err := lifecycle.Match(ctx, node.Labels, node.Annotations, node, data) if err != nil { return fmt.Errorf("stage match: %w", err) } diff --git a/pkg/kwok/controllers/node_controller_test.go b/pkg/kwok/controllers/node_controller_test.go index a35e298b58..179a974d2a 100644 --- a/pkg/kwok/controllers/node_controller_test.go +++ b/pkg/kwok/controllers/node_controller_test.go @@ -78,7 +78,7 @@ func TestNodeController(t *testing.T) { nodeInit, _ := config.UnmarshalWithType[*internalversion.Stage](nodefast.DefaultNodeInit) nodeStages := []*internalversion.Stage{nodeInit} - lifecycle, _ := NewLifecycle(nodeStages) + lifecycle, _ := NewLifecycle(nodeStages, nil) nodes, err := NewNodeController(NodeControllerConfig{ TypedClient: clientset, NodeIP: "10.0.0.1", diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index bf173487b3..d2cf5e053f 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -248,7 +248,7 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { } lifecycle := c.lifecycle.Get() - stage, err := lifecycle.Match(pod.Labels, pod.Annotations, data) + stage, err := lifecycle.Match(ctx, pod.Labels, pod.Annotations, pod, data) if err != nil { return fmt.Errorf("stage match: %w", err) } diff --git a/pkg/kwok/controllers/pod_controller_test.go b/pkg/kwok/controllers/pod_controller_test.go index 036003eaf9..ca92b63a90 100644 --- a/pkg/kwok/controllers/pod_controller_test.go +++ b/pkg/kwok/controllers/pod_controller_test.go @@ -190,7 +190,7 @@ func TestPodController(t *testing.T) { t.Fatal(fmt.Errorf("failed to watch nodes: %w", err)) } - lifecycle, _ := NewLifecycle(podStages) + lifecycle, _ := NewLifecycle(podStages, nil) annotationSelector, _ := labels.Parse("fake=custom") pods, err := NewPodController(PodControllerConfig{ TypedClient: clientset, diff --git a/pkg/kwok/controllers/stage_controller.go b/pkg/kwok/controllers/stage_controller.go index 7d2a99116a..792fb0cf85 100644 --- a/pkg/kwok/controllers/stage_controller.go +++ b/pkg/kwok/controllers/stage_controller.go @@ -228,7 +228,7 @@ func (c *StageController) preprocess(ctx context.Context, resource *unstructured } lifecycle := c.lifecycle.Get() - stage, err := lifecycle.Match(resource.GetLabels(), resource.GetAnnotations(), data) + stage, err := lifecycle.Match(ctx, resource.GetLabels(), resource.GetAnnotations(), resource, data) if err != nil { return fmt.Errorf("stage match: %w", err) } diff --git a/pkg/kwok/controllers/stage_controller_test.go b/pkg/kwok/controllers/stage_controller_test.go index 5654d59c00..a2640c12cc 100644 --- a/pkg/kwok/controllers/stage_controller_test.go +++ b/pkg/kwok/controllers/stage_controller_test.go @@ -73,7 +73,7 @@ func TestStageController(t *testing.T) { }, }, }, - }) + }, nil) patchMeta, _ := strategicpatch.NewPatchMetaFromStruct(corev1.PersistentVolume{}) controller, err := NewStageController(StageControllerConfig{ PlayStageParallelism: 1, diff --git a/pkg/kwok/controllers/stages_manager.go b/pkg/kwok/controllers/stages_manager.go index 90fef95b2a..7c3bb0fb6a 100644 --- a/pkg/kwok/controllers/stages_manager.go +++ b/pkg/kwok/controllers/stages_manager.go @@ -22,12 +22,14 @@ import ( "sigs.k8s.io/kwok/pkg/apis/internalversion" "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/log" + "sigs.k8s.io/kwok/pkg/utils/cel" "sigs.k8s.io/kwok/pkg/utils/sets" "sigs.k8s.io/kwok/pkg/utils/slices" ) // StagesManagerConfig is the configuration for a stages manager type StagesManagerConfig struct { + Env *cel.Environment StageGetter resources.DynamicGetter[[]*internalversion.Stage] StartFunc func(ctx context.Context, ref internalversion.StageResourceRef, lifecycle resources.Getter[Lifecycle]) error } @@ -35,6 +37,7 @@ type StagesManagerConfig struct { // StagesManager is a stages manager // It is a dynamic getter for stages and start a stage controller type StagesManager struct { + env *cel.Environment stageGetter resources.DynamicGetter[[]*internalversion.Stage] startFunc func(ctx context.Context, ref internalversion.StageResourceRef, lifecycle resources.Getter[Lifecycle]) error cache map[internalversion.StageResourceRef]context.CancelCauseFunc @@ -43,6 +46,7 @@ type StagesManager struct { // NewStagesManager creates a stage controller manager func NewStagesManager(conf StagesManagerConfig) *StagesManager { return &StagesManager{ + env: conf.Env, stageGetter: conf.StageGetter, startFunc: conf.StartFunc, cache: map[internalversion.StageResourceRef]context.CancelCauseFunc{}, @@ -89,7 +93,7 @@ func (c *StagesManager) manage(ctx context.Context) { return nil, false } - lifecycleStage, err := NewLifecycleStage(stage) + lifecycleStage, err := NewLifecycleStage(stage, c.env) if err != nil { logger.Error("failed to create lifecycle stage", err, "ref", ref) return nil, false diff --git a/pkg/utils/cel/environment.go b/pkg/utils/cel/environment.go index acbf162394..d87ec9d7f7 100644 --- a/pkg/utils/cel/environment.go +++ b/pkg/utils/cel/environment.go @@ -144,3 +144,12 @@ func AsString(refVal ref.Val) (string, error) { } return string(v), nil } + +// AsBool returns the bool value of a ref.Val +func AsBool(refVal ref.Val) (bool, error) { + v, ok := refVal.(types.Bool) + if !ok { + return false, fmt.Errorf("unsupported type: %T", v) + } + return bool(v), nil +}