Skip to content

Commit

Permalink
Add matchConditions to Stage
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Mar 11, 2024
1 parent b727209 commit 8df7342
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 22 deletions.
10 changes: 9 additions & 1 deletion pkg/apis/internalversion/stage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,20 @@ type StageSelector struct {
MatchAnnotations map[string]string
// MatchExpressions is a list of label selector requirements. The requirements are ANDed.
MatchExpressions []SelectorRequirement
// MatchConditions is a list of label selector conditions. The conditions are ANDed.
MatchConditions []SelectorCondition
}

// SelectorCondition is a resource selector condition is a set of conditions that must be true for a match.
type SelectorCondition struct {
// Expression represents the expression which will be evaluated by CEL.
Expression string
}

// SelectorRequirement is a resource selector requirement is a selector that contains values, a key,
// and an operator that relates the key and values.
type SelectorRequirement struct {
// The name of the scope that the selector applies to.
// Key represents the expression which will be evaluated by JQ.
Key string
// Represents a scope's relationship to a set of values.
Operator SelectorOperator
Expand Down
32 changes: 32 additions & 0 deletions pkg/apis/internalversion/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions pkg/apis/internalversion/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/apis/v1alpha1/stage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,20 @@ type StageSelector struct {
MatchAnnotations map[string]string `json:"matchAnnotations,omitempty"`
// MatchExpressions is a list of label selector requirements. The requirements are ANDed.
MatchExpressions []SelectorRequirement `json:"matchExpressions,omitempty"`
// MatchConditions is a list of label selector conditions. The conditions are ANDed.
MatchConditions []SelectorCondition `json:"matchConditions,omitempty"`
}

// SelectorCondition is a resource selector condition is a set of conditions that must be true for a match.
type SelectorCondition struct {
// Expression represents the expression which will be evaluated by CEL.
Expression string `json:"expression"`
}

// SelectorRequirement is a resource selector requirement is a selector that contains values, a key,
// and an operator that relates the key and values.
type SelectorRequirement struct {
// The name of the scope that the selector applies to.
// Key represents the expression which will be evaluated by JQ.
Key string `json:"key"`
// Represents a scope's relationship to a set of values.
Operator SelectorOperator `json:"operator"`
Expand Down
21 changes: 21 additions & 0 deletions pkg/apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -47,6 +48,7 @@ import (
"sigs.k8s.io/kwok/pkg/config/resources"
"sigs.k8s.io/kwok/pkg/consts"
"sigs.k8s.io/kwok/pkg/log"
"sigs.k8s.io/kwok/pkg/utils/cel"
"sigs.k8s.io/kwok/pkg/utils/client"
"sigs.k8s.io/kwok/pkg/utils/gotpl"
"sigs.k8s.io/kwok/pkg/utils/informer"
Expand Down Expand Up @@ -103,6 +105,7 @@ var (
// Controller is a fake kubelet implementation that can be used to test
type Controller struct {
conf Config
env *cel.Environment

stagesManager *StagesManager

Expand Down Expand Up @@ -198,7 +201,25 @@ func NewController(conf Config) (*Controller, error) {
return nil, err
}

types := slices.Clone(cel.DefaultTypes)
conversions := slices.Clone(cel.DefaultConversions)
funcs := maps.Clone(cel.DefaultFuncs)
methods := maps.Clone(cel.FuncsToMethods(cel.DefaultFuncs))

env, err := cel.NewEnvironment(cel.EnvironmentConfig{
EnableCompileCache: true,

Types: types,
Conversions: conversions,
Methods: methods,
Funcs: funcs,
})
if err != nil {
return nil, err
}

c := &Controller{
env: env,
conf: conf,
}

Expand Down Expand Up @@ -581,7 +602,7 @@ func (c *Controller) Start(ctx context.Context) error {

if len(c.conf.LocalStages) != 0 {
for ref, stage := range c.conf.LocalStages {
lifecycle, err := NewLifecycle(stage)
lifecycle, err := NewLifecycle(stage, c.env)
if err != nil {
return err
}
Expand Down
48 changes: 36 additions & 12 deletions pkg/kwok/controllers/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import (
"k8s.io/apimachinery/pkg/labels"

"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/utils/cel"
"sigs.k8s.io/kwok/pkg/utils/expression"
"sigs.k8s.io/kwok/pkg/utils/format"
)

// NewLifecycle returns a new Lifecycle.
func NewLifecycle(stages []*internalversion.Stage) (Lifecycle, error) {
func NewLifecycle(stages []*internalversion.Stage, env *cel.Environment) (Lifecycle, error) {
lcs := Lifecycle{}
for _, stage := range stages {
lc, err := NewLifecycleStage(stage)
lc, err := NewLifecycleStage(stage, env)
if err != nil {
return nil, fmt.Errorf("lifecycle stage: %w", err)
}
Expand All @@ -48,10 +49,10 @@ func NewLifecycle(stages []*internalversion.Stage) (Lifecycle, error) {
// Lifecycle is a list of lifecycle stage.
type Lifecycle []*LifecycleStage

func (s Lifecycle) match(label, annotation labels.Set, data interface{}) ([]*LifecycleStage, error) {
func (s Lifecycle) match(label, annotation labels.Set, data, jsonStandard any) ([]*LifecycleStage, error) {
out := []*LifecycleStage{}
for _, stage := range s {
ok, err := stage.match(label, annotation, data)
ok, err := stage.match(label, annotation, data, jsonStandard)
if err != nil {
return nil, err
}
Expand All @@ -63,12 +64,8 @@ func (s Lifecycle) match(label, annotation labels.Set, data interface{}) ([]*Lif
}

// Match returns matched stage.
func (s Lifecycle) Match(label, annotation labels.Set, data interface{}) (*LifecycleStage, error) {
data, err := expression.ToJSONStandard(data)
if err != nil {
return nil, err
}
stages, err := s.match(label, annotation, data)
func (s Lifecycle) Match(label, annotation labels.Set, data, jsonStandard any) (*LifecycleStage, error) {
stages, err := s.match(label, annotation, data, jsonStandard)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -102,7 +99,7 @@ func (s Lifecycle) Match(label, annotation labels.Set, data interface{}) (*Lifec
}

// NewLifecycleStage returns a new LifecycleStage.
func NewLifecycleStage(s *internalversion.Stage) (*LifecycleStage, error) {
func NewLifecycleStage(s *internalversion.Stage, env *cel.Environment) (*LifecycleStage, error) {
stage := &LifecycleStage{
name: s.Name,
}
Expand All @@ -126,6 +123,15 @@ func NewLifecycleStage(s *internalversion.Stage) (*LifecycleStage, error) {
stage.matchExpressions = append(stage.matchExpressions, requirement)
}
}
if selector.MatchConditions != nil {
for _, express := range selector.MatchConditions {
program, err := env.Compile(express.Expression)
if err != nil {
return nil, err
}
stage.matchConditions = append(stage.matchConditions, program)
}
}

stage.next = &s.Spec.Next
if delay := s.Spec.Delay; delay != nil {
Expand Down Expand Up @@ -177,6 +183,7 @@ type LifecycleStage struct {
matchLabels labels.Selector
matchAnnotations labels.Selector
matchExpressions []*expression.Requirement
matchConditions []cel.Program

weight int
next *internalversion.StageNext
Expand All @@ -187,7 +194,7 @@ type LifecycleStage struct {
immediateNextStage bool
}

func (s *LifecycleStage) match(label, annotation labels.Set, jsonStandard interface{}) (bool, error) {
func (s *LifecycleStage) match(label, annotation labels.Set, data, jsonStandard any) (bool, error) {
if s.matchLabels != nil {
if !s.matchLabels.Matches(label) {
return false, nil
Expand All @@ -210,6 +217,23 @@ func (s *LifecycleStage) match(label, annotation labels.Set, jsonStandard interf
}
}
}

if s.matchConditions != nil {
for _, program := range s.matchConditions {
val, _, err := program.Eval(data)
if err != nil {
return false, err
}
ok, err := cel.AsBool(val)
if err != nil {
return false, err
}
if !ok {
return false, nil
}
}
}

return true, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro
}

lifecycle := c.lifecycle.Get()
stage, err := lifecycle.Match(node.Labels, node.Annotations, data)
stage, err := lifecycle.Match(node.Labels, node.Annotations, node, data)
if err != nil {
return fmt.Errorf("stage match: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kwok/controllers/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestNodeController(t *testing.T) {
nodeInit, _ := config.UnmarshalWithType[*internalversion.Stage](nodefast.DefaultNodeInit)
nodeStages := []*internalversion.Stage{nodeInit}

lifecycle, _ := NewLifecycle(nodeStages)
lifecycle, _ := NewLifecycle(nodeStages, nil)
nodes, err := NewNodeController(NodeControllerConfig{
TypedClient: clientset,
NodeIP: "10.0.0.1",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error {
}

lifecycle := c.lifecycle.Get()
stage, err := lifecycle.Match(pod.Labels, pod.Annotations, data)
stage, err := lifecycle.Match(pod.Labels, pod.Annotations, pod, data)
if err != nil {
return fmt.Errorf("stage match: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kwok/controllers/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestPodController(t *testing.T) {
t.Fatal(fmt.Errorf("failed to watch nodes: %w", err))
}

lifecycle, _ := NewLifecycle(podStages)
lifecycle, _ := NewLifecycle(podStages, nil)
annotationSelector, _ := labels.Parse("fake=custom")
pods, err := NewPodController(PodControllerConfig{
TypedClient: clientset,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kwok/controllers/stage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *StageController) preprocess(ctx context.Context, resource *unstructured
}

lifecycle := c.lifecycle.Get()
stage, err := lifecycle.Match(resource.GetLabels(), resource.GetAnnotations(), data)
stage, err := lifecycle.Match(resource.GetLabels(), resource.GetAnnotations(), resource, data)
if err != nil {
return fmt.Errorf("stage match: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kwok/controllers/stage_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestStageController(t *testing.T) {
},
},
},
})
}, nil)
patchMeta, _ := strategicpatch.NewPatchMetaFromStruct(corev1.PersistentVolume{})
controller, err := NewStageController(StageControllerConfig{
PlayStageParallelism: 1,
Expand Down
Loading

0 comments on commit 8df7342

Please sign in to comment.