diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index e1bafb72f..962b38911 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -69,6 +69,13 @@ jobs: BUILDER=${{ matrix.builder }} make build-cluster-image test-kwok: + strategy: + fail-fast: false + matrix: + case: + - kwok + - kwok-with-cni + continue-on-error: false runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -92,33 +99,7 @@ jobs: - name: Test Workable shell: bash run: | - ./hack/e2e-test.sh kwok/kwok - - test-kwok-with-cni: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Set up Go - uses: actions/setup-go@v4 - with: - go-version: "1.20" - - - name: Install Kind - shell: bash - run: | - ./hack/requirements.sh kind - kind version - - - name: Install kubectl - shell: bash - run: | - ./hack/requirements.sh kubectl - kubectl version || : - - - name: Test Workable - shell: bash - run: | - ./hack/e2e-test.sh kwok-with-cni/kwok-with-cni + ./hack/e2e-test.sh ${{ matrix.case }}/${{ matrix.case }} test-kwokctl: # https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs diff --git a/kustomize/kwok-with-cni/deployment-patch.yaml b/kustomize/kwok-with-cni/deployment-patch.yaml index 40928ded8..d12f14bc8 100644 --- a/kustomize/kwok-with-cni/deployment-patch.yaml +++ b/kustomize/kwok-with-cni/deployment-patch.yaml @@ -29,6 +29,7 @@ spec: - --node-ip=$(POD_IP) - --node-port=10247 - --node-lease-duration-seconds=40 + - --enable-crd=Stage,Attach,Exec,PortForward,Logs,ClusterAttach,ClusterExec,ClusterPortForward,ClusterLogs - --experimental-enable-cni=true volumeMounts: - name: etc-cni diff --git a/kustomize/kwok/deployment.yaml b/kustomize/kwok/deployment.yaml index 6413675d2..0b91f5613 100644 --- a/kustomize/kwok/deployment.yaml +++ b/kustomize/kwok/deployment.yaml @@ -28,6 +28,7 @@ spec: - --node-port=10247 - --cidr=10.0.0.1/24 - --node-lease-duration-seconds=40 + - --enable-crd=Stage,Attach,Exec,PortForward,Logs,ClusterAttach,ClusterExec,ClusterPortForward,ClusterLogs env: - name: POD_IP valueFrom: diff --git a/pkg/config/resources/cache.go b/pkg/config/resources/cache.go new file mode 100644 index 000000000..8171f8be7 --- /dev/null +++ b/pkg/config/resources/cache.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "sync" +) + +type cacheGetter[O any] struct { + getter Getter[O] + + currentVer string + data O + + mut sync.RWMutex +} + +func withCache[O any](getter Getter[O]) Getter[O] { + return &cacheGetter[O]{getter: getter} +} + +func (g *cacheGetter[O]) Get() O { + g.mut.RLock() + latestVer := g.getter.Version() + if g.currentVer == latestVer { + data := g.data + g.mut.RUnlock() + return data + } + g.mut.RUnlock() + + g.mut.Lock() + defer g.mut.Unlock() + if g.currentVer == latestVer { + data := g.data + return data + } + + data := g.getter.Get() + g.data = data + g.currentVer = latestVer + return data +} + +func (g *cacheGetter[O]) Version() string { + return g.getter.Version() +} diff --git a/pkg/config/resources/dynamic.go b/pkg/config/resources/dynamic.go index 640b7fc4b..11faebd3c 100644 --- a/pkg/config/resources/dynamic.go +++ b/pkg/config/resources/dynamic.go @@ -18,7 +18,6 @@ package resources import ( "context" - "sync" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,10 +38,18 @@ type ConvertFunc[O any, T runtime.Object, S ~[]T] func(objs S) O // NewDynamicGetter returns a new Getter that returns the latest list of resources. func NewDynamicGetter[O any, T runtime.Object, L runtime.Object](syncer Syncer[T, L], convertFunc ConvertFunc[O, T, []T]) DynamicGetter[O] { - return &dynamicGetter[O, T, L]{ + getter := &dynamicGetter[O, T, L]{ syncer: syncer, convertFunc: convertFunc, } + + return struct { + Getter[O] + Starter + }{ + Getter: withCache[O](getter), + Starter: getter, + } } type dynamicGetter[O any, T runtime.Object, L runtime.Object] struct { @@ -52,11 +59,6 @@ type dynamicGetter[O any, T runtime.Object, L runtime.Object] struct { store cache.Store controller cache.Controller - - currentVer string - data O - - mut sync.RWMutex } func (c *dynamicGetter[O, T, L]) Start(ctx context.Context) error { @@ -82,25 +84,6 @@ func (c *dynamicGetter[O, T, L]) Start(ctx context.Context) error { } func (c *dynamicGetter[O, T, L]) Get() O { - latestVer := c.controller.LastSyncResourceVersion() - - c.mut.RLock() - if latestVer == c.currentVer { - data := c.data - c.mut.RUnlock() - return data - } - c.mut.RUnlock() - return c.updateAndReturn(latestVer) -} - -func (c *dynamicGetter[O, T, L]) updateAndReturn(latestVer string) O { - c.mut.Lock() - defer c.mut.Unlock() - if latestVer == c.currentVer { - return c.data - } - list := c.store.List() currentList := make([]T, 0, len(list)) for _, obj := range list { @@ -108,7 +91,9 @@ func (c *dynamicGetter[O, T, L]) updateAndReturn(latestVer string) O { } data := c.convertFunc(currentList) - c.data = data - c.currentVer = latestVer return data } + +func (c *dynamicGetter[O, T, L]) Version() string { + return c.controller.LastSyncResourceVersion() +} diff --git a/pkg/config/resources/filter.go b/pkg/config/resources/filter.go new file mode 100644 index 000000000..820a1a75b --- /dev/null +++ b/pkg/config/resources/filter.go @@ -0,0 +1,35 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +type filterGetter[O any, T any] struct { + getter Getter[T] + filterFunc func(T) O +} + +// NewFilter returns a new Getter that returns the given list. +func NewFilter[O any, T any](getter Getter[T], filterFunc func(T) O) Getter[O] { + return withCache[O](&filterGetter[O, T]{getter: getter, filterFunc: filterFunc}) +} + +func (f *filterGetter[O, T]) Get() O { + return f.filterFunc(f.getter.Get()) +} + +func (f *filterGetter[O, T]) Version() string { + return f.getter.Version() +} diff --git a/pkg/config/resources/resource.go b/pkg/config/resources/resource.go index 2bd8372fc..775a9843c 100644 --- a/pkg/config/resources/resource.go +++ b/pkg/config/resources/resource.go @@ -16,11 +16,14 @@ limitations under the License. package resources -import "context" +import ( + "context" +) // Getter is an interface for getting resources. type Getter[O any] interface { Get() O + Version() string } // DynamicGetter is an interface for getting resources. diff --git a/pkg/config/resources/static.go b/pkg/config/resources/static.go index 1f7182fd7..a81c96c5a 100644 --- a/pkg/config/resources/static.go +++ b/pkg/config/resources/static.go @@ -28,3 +28,7 @@ func NewStaticGetter[T any](data T) Getter[T] { func (s *staticGetter[T]) Get() T { return s.data } + +func (s *staticGetter[T]) Version() string { + return "" +} diff --git a/pkg/kwok/cmd/root.go b/pkg/kwok/cmd/root.go index d531c1735..de9b05e64 100644 --- a/pkg/kwok/cmd/root.go +++ b/pkg/kwok/cmd/root.go @@ -29,6 +29,7 @@ import ( "k8s.io/utils/clock" "sigs.k8s.io/kwok/pkg/apis/internalversion" + "sigs.k8s.io/kwok/pkg/apis/v1alpha1" "sigs.k8s.io/kwok/pkg/config" "sigs.k8s.io/kwok/pkg/kwok/controllers" "sigs.k8s.io/kwok/pkg/kwok/server" @@ -84,6 +85,7 @@ func NewCommand(ctx context.Context) *cobra.Command { cmd.Flags().StringVar(&flags.Master, "master", flags.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).") cmd.Flags().StringVar(&flags.Options.ServerAddress, "server-address", flags.Options.ServerAddress, "Address to expose the server on") cmd.Flags().UintVar(&flags.Options.NodeLeaseDurationSeconds, "node-lease-duration-seconds", flags.Options.NodeLeaseDurationSeconds, "Duration of node lease seconds") + cmd.Flags().StringArrayVar(&flags.Options.EnableCRDs, "enable-crd", flags.Options.EnableCRDs, "List of CRDs to enable") cmd.Flags().BoolVar(&flags.Options.EnableCNI, "experimental-enable-cni", flags.Options.EnableCNI, "Experimental support for getting pod ip from CNI, for CNI-related components, Only works with Linux") if config.GOOS != "linux" { @@ -145,26 +147,34 @@ func runE(ctx context.Context, flags *flagpole) error { } stagesData := config.FilterWithTypeFromContext[*internalversion.Stage](ctx) + var nodeStages []*internalversion.Stage + var podStages []*internalversion.Stage - nodeStages := filterStages(stagesData, "v1", "Node") - if len(nodeStages) == 0 { - nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages)) - if err != nil { - return err + if slices.Contains(flags.Options.EnableCRDs, v1alpha1.StageKind) { + if len(stagesData) != 0 { + return fmt.Errorf("stage already exists, cannot watch CRD") } - if flags.Options.NodeLeaseDurationSeconds == 0 { - nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) + } else { + nodeStages = filterStages(stagesData, "v1", "Node") + if len(nodeStages) == 0 { + nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages)) if err != nil { return err } - nodeStages = append(nodeStages, nodeHeartbeatStages...) + if flags.Options.NodeLeaseDurationSeconds == 0 { + nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) + if err != nil { + return err + } + nodeStages = append(nodeStages, nodeHeartbeatStages...) + } } - } - podStages := filterStages(stagesData, "v1", "Pod") - if len(podStages) == 0 { - podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages)) - if err != nil { - return err + podStages = filterStages(stagesData, "v1", "Pod") + if len(podStages) == 0 { + podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages)) + if err != nil { + return err + } } } @@ -179,6 +189,7 @@ func runE(ctx context.Context, flags *flagpole) error { ctr, err := controllers.NewController(controllers.Config{ Clock: clock.RealClock{}, TypedClient: typedClient, + TypedKwokClient: typedKwokClient, EnableCNI: flags.Options.EnableCNI, ManageAllNodes: flags.Options.ManageAllNodes, ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector, diff --git a/pkg/kwok/controllers/controller.go b/pkg/kwok/controllers/controller.go index 41ecae93d..aa0f301e5 100644 --- a/pkg/kwok/controllers/controller.go +++ b/pkg/kwok/controllers/controller.go @@ -37,8 +37,13 @@ import ( "sigs.k8s.io/yaml" "sigs.k8s.io/kwok/pkg/apis/internalversion" + "sigs.k8s.io/kwok/pkg/apis/v1alpha1" + "sigs.k8s.io/kwok/pkg/client/clientset/versioned" + "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/consts" + "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/gotpl" + "sigs.k8s.io/kwok/pkg/utils/slices" ) var ( @@ -85,6 +90,7 @@ var ( // Controller is a fake kubelet implementation that can be used to test type Controller struct { + conf Config nodes *NodeController pods *PodController nodeLeases *NodeLeaseController @@ -97,6 +103,7 @@ type Config struct { Clock clock.Clock EnableCNI bool TypedClient kubernetes.Interface + TypedKwokClient versioned.Interface ManageAllNodes bool ManageNodesWithAnnotationSelector string ManageNodesWithLabelSelector string @@ -118,33 +125,34 @@ type Config struct { // NewController creates a new fake kubelet controller func NewController(conf Config) (*Controller, error) { - var nodeSelectorFunc func(node *corev1.Node) bool switch { case conf.ManageAllNodes: - nodeSelectorFunc = func(node *corev1.Node) bool { - return true - } conf.ManageNodesWithAnnotationSelector = "" conf.ManageNodesWithLabelSelector = "" case conf.ManageNodesWithAnnotationSelector != "": - selector, err := labels.Parse(conf.ManageNodesWithAnnotationSelector) - if err != nil { - return nil, err - } - nodeSelectorFunc = func(node *corev1.Node) bool { - return selector.Matches(labels.Set(node.Annotations)) - } case conf.ManageNodesWithLabelSelector != "": - // client-go supports label filtering, so return true is ok. - nodeSelectorFunc = func(node *corev1.Node) bool { - return true - } default: return nil, fmt.Errorf("no nodes are managed") } - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"}) + n := &Controller{ + conf: conf, + broadcaster: record.NewBroadcaster(), + typedClient: conf.TypedClient, + } + + return n, nil +} + +// Start starts the controller +func (c *Controller) Start(ctx context.Context) error { + if c.pods != nil || c.nodes != nil || c.nodeLeases != nil { + return fmt.Errorf("controller already started") + } + + conf := c.conf + + recorder := c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"}) var ( nodeLeases *NodeLeaseController @@ -152,7 +160,27 @@ func NewController(conf Config) (*Controller, error) { onLeaseNodeManageFunc func(nodeName string) onNodeManagedFunc func(nodeName string) readOnlyFunc func(nodeName string) bool + nodeSelectorFunc func(node *corev1.Node) bool ) + switch { + case conf.ManageAllNodes: + nodeSelectorFunc = func(node *corev1.Node) bool { + return true + } + case conf.ManageNodesWithAnnotationSelector != "": + selector, err := labels.Parse(conf.ManageNodesWithAnnotationSelector) + if err != nil { + return err + } + nodeSelectorFunc = func(node *corev1.Node) bool { + return selector.Matches(labels.Set(node.Annotations)) + } + case conf.ManageNodesWithLabelSelector != "": + // client-go supports label filtering, so return true is ok. + nodeSelectorFunc = func(node *corev1.Node) bool { + return true + } + } if conf.NodeLeaseDurationSeconds != 0 { leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second @@ -177,7 +205,7 @@ func NewController(conf Config) (*Controller, error) { }, }) if err != nil { - return nil, fmt.Errorf("failed to create node leases controller: %w", err) + return fmt.Errorf("failed to create node leases controller: %w", err) } nodeLeases = l @@ -187,6 +215,75 @@ func NewController(conf Config) (*Controller, error) { } } + logger := log.FromContext(ctx) + + var nodeLifecycleGetter resources.Getter[Lifecycle] + var podLifecycleGetter resources.Getter[Lifecycle] + + if len(conf.PodStages) == 0 && len(conf.NodeStages) == 0 { + getter := resources.NewDynamicGetter[ + []*internalversion.Stage, + *v1alpha1.Stage, + *v1alpha1.StageList, + ]( + conf.TypedKwokClient.KwokV1alpha1().Stages(), + func(objs []*v1alpha1.Stage) []*internalversion.Stage { + return slices.FilterAndMap(objs, func(obj *v1alpha1.Stage) (*internalversion.Stage, bool) { + r, err := internalversion.ConvertToInternalStage(obj) + if err != nil { + logger.Error("failed to convert to internal stage", err, "obj", obj) + return nil, false + } + return r, true + }) + }, + ) + + nodeLifecycleGetter = resources.NewFilter[Lifecycle, []*internalversion.Stage](getter, func(stages []*internalversion.Stage) Lifecycle { + lifecycle := slices.FilterAndMap(stages, func(stage *internalversion.Stage) (*LifecycleStage, bool) { + if stage.Spec.ResourceRef.Kind != "Node" { + return nil, false + } + + lifecycleStage, err := NewLifecycleStage(stage) + if err != nil { + logger.Error("failed to create node lifecycle stage", err, "stage", stage) + return nil, false + } + return lifecycleStage, true + }) + return lifecycle + }) + + podLifecycleGetter = resources.NewFilter[Lifecycle, []*internalversion.Stage](getter, func(stages []*internalversion.Stage) Lifecycle { + lifecycle := slices.FilterAndMap(stages, func(stage *internalversion.Stage) (*LifecycleStage, bool) { + if stage.Spec.ResourceRef.Kind != "Pod" { + return nil, false + } + + lifecycleStage, err := NewLifecycleStage(stage) + if err != nil { + logger.Error("failed to create node lifecycle stage", err, "stage", stage) + return nil, false + } + return lifecycleStage, true + }) + return lifecycle + }) + } else { + lifecycle, err := NewLifecycle(conf.PodStages) + if err != nil { + return fmt.Errorf("failed to create pod lifecycle: %w", err) + } + podLifecycleGetter = resources.NewStaticGetter(lifecycle) + + lifecycle, err = NewLifecycle(conf.NodeStages) + if err != nil { + return fmt.Errorf("failed to create node lifecycle: %w", err) + } + nodeLifecycleGetter = resources.NewStaticGetter(lifecycle) + } + nodes, err := NewNodeController(NodeControllerConfig{ Clock: conf.Clock, TypedClient: conf.TypedClient, @@ -200,14 +297,14 @@ func NewController(conf Config) (*Controller, error) { OnNodeManagedFunc: func(nodeName string) { onNodeManagedFunc(nodeName) }, - Stages: conf.NodeStages, + Lifecycle: nodeLifecycleGetter, PlayStageParallelism: conf.NodePlayStageParallelism, FuncMap: defaultFuncMap, Recorder: recorder, ReadOnlyFunc: readOnlyFunc, }) if err != nil { - return nil, fmt.Errorf("failed to create nodes controller: %w", err) + return fmt.Errorf("failed to create nodes controller: %w", err) } nodeHasMetric := func(nodeName string) bool { @@ -222,7 +319,7 @@ func NewController(conf Config) (*Controller, error) { CIDR: conf.CIDR, DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector, DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector, - Stages: conf.PodStages, + Lifecycle: podLifecycleGetter, PlayStageParallelism: conf.PodPlayStageParallelism, Namespace: corev1.NamespaceAll, NodeGetFunc: nodes.Get, @@ -232,7 +329,7 @@ func NewController(conf Config) (*Controller, error) { ReadOnlyFunc: readOnlyFunc, }) if err != nil { - return nil, fmt.Errorf("failed to create pods controller: %w", err) + return fmt.Errorf("failed to create pods controller: %w", err) } if nodeLeases != nil { @@ -260,34 +357,25 @@ func NewController(conf Config) (*Controller, error) { } } - n := &Controller{ - pods: pods, - nodes: nodes, - nodeLeases: nodeLeases, - broadcaster: eventBroadcaster, - typedClient: conf.TypedClient, - } - - return n, nil -} - -// Start starts the controller -func (c *Controller) Start(ctx context.Context) error { c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.typedClient.CoreV1().Events("")}) - if c.nodeLeases != nil { - err := c.nodeLeases.Start(ctx) + if nodeLeases != nil { + err := nodeLeases.Start(ctx) if err != nil { return fmt.Errorf("failed to start node leases controller: %w", err) } } - err := c.pods.Start(ctx) + err = pods.Start(ctx) if err != nil { return fmt.Errorf("failed to start pods controller: %w", err) } - err = c.nodes.Start(ctx) + err = nodes.Start(ctx) if err != nil { return fmt.Errorf("failed to start nodes controller: %w", err) } + + c.pods = pods + c.nodes = nodes + c.nodeLeases = nodeLeases return nil } diff --git a/pkg/kwok/controllers/lifecycle.go b/pkg/kwok/controllers/lifecycle.go index f36107e8f..3780fb16d 100644 --- a/pkg/kwok/controllers/lifecycle.go +++ b/pkg/kwok/controllers/lifecycle.go @@ -61,7 +61,7 @@ func NewStagesFromYaml(data []byte) ([]*internalversion.Stage, error) { func NewLifecycle(stages []*internalversion.Stage) (Lifecycle, error) { lcs := Lifecycle{} for _, stage := range stages { - lc, err := newLifecycleFromStage(stage) + lc, err := NewLifecycleStage(stage) if err != nil { return nil, fmt.Errorf("lifecycle stage: %w", err) } @@ -129,7 +129,8 @@ func (s Lifecycle) Match(label, annotation labels.Set, data interface{}) (*Lifec return stages[len(stages)-1], nil } -func newLifecycleFromStage(s *internalversion.Stage) (*LifecycleStage, error) { +// NewLifecycleStage returns a new LifecycleStage. +func NewLifecycleStage(s *internalversion.Stage) (*LifecycleStage, error) { stage := &LifecycleStage{ name: s.Name, } diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index c60da033e..24e37e5a6 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -41,6 +41,7 @@ import ( netutils "k8s.io/utils/net" "sigs.k8s.io/kwok/pkg/apis/internalversion" + "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/expression" "sigs.k8s.io/kwok/pkg/utils/gotpl" @@ -104,7 +105,7 @@ type NodeController struct { preprocessChan chan *corev1.Node playStageChan chan resourceStageJob[*corev1.Node] playStageParallelism uint - lifecycle Lifecycle + lifecycle resources.Getter[Lifecycle] cronjob *cron.Cron delayJobs jobInfoMap recorder record.EventRecorder @@ -124,7 +125,7 @@ type NodeControllerConfig struct { NodeIP string NodeName string NodePort int - Stages []*internalversion.Stage + Lifecycle resources.Getter[Lifecycle] PlayStageParallelism uint FuncMap gotpl.FuncMap Recorder record.EventRecorder @@ -156,11 +157,6 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { return nil, err } - lifecycles, err := NewLifecycle(conf.Stages) - if err != nil { - return nil, err - } - if conf.Clock == nil { conf.Clock = clock.RealClock{} } @@ -177,7 +173,7 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { nodeName: conf.NodeName, nodePort: conf.NodePort, cronjob: cron.NewCron(), - lifecycle: lifecycles, + lifecycle: conf.Lifecycle, playStageParallelism: conf.PlayStageParallelism, preprocessChan: make(chan *corev1.Node), triggerPreprocessChan: make(chan string, 16), @@ -474,7 +470,8 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro return err } - stage, err := c.lifecycle.Match(node.Labels, node.Annotations, data) + lifecycle := c.lifecycle.Get() + stage, err := lifecycle.Match(node.Labels, node.Annotations, data) if err != nil { return fmt.Errorf("stage match: %w", err) } diff --git a/pkg/kwok/controllers/node_controller_test.go b/pkg/kwok/controllers/node_controller_test.go index 2619cfd56..d37dbbecb 100644 --- a/pkg/kwok/controllers/node_controller_test.go +++ b/pkg/kwok/controllers/node_controller_test.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/wait" "sigs.k8s.io/kwok/stages" @@ -71,11 +72,12 @@ func TestNodeController(t *testing.T) { nodeStages, _ := NewStagesFromYaml([]byte(stages.DefaultNodeStages)) nodeHeartbeatStages, _ := NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) nodeStages = append(nodeStages, nodeHeartbeatStages...) + lifecycle, _ := NewLifecycle(nodeStages) nodes, err := NewNodeController(NodeControllerConfig{ TypedClient: clientset, NodeIP: "10.0.0.1", NodeSelectorFunc: nodeSelectorFunc, - Stages: nodeStages, + Lifecycle: resources.NewStaticGetter(lifecycle), FuncMap: defaultFuncMap, PlayStageParallelism: 2, }) diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index 9b97da137..a9ce45b5f 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -39,6 +39,7 @@ import ( "k8s.io/utils/clock" "sigs.k8s.io/kwok/pkg/apis/internalversion" + "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/kwok/cni" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/expression" @@ -68,7 +69,7 @@ type PodController struct { preprocessChan chan *corev1.Pod playStageChan chan resourceStageJob[*corev1.Pod] playStageParallelism uint - lifecycle Lifecycle + lifecycle resources.Getter[Lifecycle] cronjob *cron.Cron delayJobs jobInfoMap recorder record.EventRecorder @@ -88,7 +89,7 @@ type PodControllerConfig struct { Namespace string NodeGetFunc func(nodeName string) (*NodeInfo, bool) NodeHasMetric func(nodeName string) bool - Stages []*internalversion.Stage + Lifecycle resources.Getter[Lifecycle] PlayStageParallelism uint FuncMap gotpl.FuncMap Recorder record.EventRecorder @@ -111,11 +112,6 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { return nil, err } - lifecycles, err := NewLifecycle(conf.Stages) - if err != nil { - return nil, err - } - if conf.Clock == nil { conf.Clock = clock.RealClock{} } @@ -131,7 +127,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { namespace: conf.Namespace, nodeGetFunc: conf.NodeGetFunc, cronjob: cron.NewCron(), - lifecycle: lifecycles, + lifecycle: conf.Lifecycle, playStageParallelism: conf.PlayStageParallelism, preprocessChan: make(chan *corev1.Pod), triggerPreprocessChan: make(chan string, 16), @@ -294,7 +290,8 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { return err } - stage, err := c.lifecycle.Match(pod.Labels, pod.Annotations, data) + lifecycle := c.lifecycle.Get() + stage, err := lifecycle.Match(pod.Labels, pod.Annotations, data) if err != nil { return fmt.Errorf("stage match: %w", err) } diff --git a/pkg/kwok/controllers/pod_controller_test.go b/pkg/kwok/controllers/pod_controller_test.go index 3aad94cd2..6004166e9 100644 --- a/pkg/kwok/controllers/pod_controller_test.go +++ b/pkg/kwok/controllers/pod_controller_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/kwok/pkg/config/resources" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/slices" "sigs.k8s.io/kwok/pkg/utils/wait" @@ -165,13 +166,14 @@ func TestPodController(t *testing.T) { return false } podStages, _ := NewStagesFromYaml([]byte(stages.DefaultPodStages)) + lifecycle, _ := NewLifecycle(podStages) annotationSelector, _ := labels.Parse("fake=custom") pods, err := NewPodController(PodControllerConfig{ TypedClient: clientset, NodeIP: defaultNodeIP, CIDR: defaultPodCIDR, DisregardStatusWithAnnotationSelector: annotationSelector.String(), - Stages: podStages, + Lifecycle: resources.NewStaticGetter(lifecycle), NodeGetFunc: nodeGetFunc, NodeHasMetric: nodeHasMetric, FuncMap: defaultFuncMap, diff --git a/pkg/kwok/server/server.go b/pkg/kwok/server/server.go index 953d27b87..71a348297 100644 --- a/pkg/kwok/server/server.go +++ b/pkg/kwok/server/server.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/kwok/pkg/kwok/controllers" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/pools" + "sigs.k8s.io/kwok/pkg/utils/slices" ) const ( @@ -134,7 +135,7 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) ]( cli.KwokV1alpha1().ClusterPortForwards(), func(objs []*v1alpha1.ClusterPortForward) []*internalversion.ClusterPortForward { - return convertList(objs, func(obj *v1alpha1.ClusterPortForward) (*internalversion.ClusterPortForward, bool) { + return slices.FilterAndMap(objs, func(obj *v1alpha1.ClusterPortForward) (*internalversion.ClusterPortForward, bool) { r, err := internalversion.ConvertToInternalClusterPortForward(obj) if err != nil { logger.Error("failed to convert to internal cluster port forward", err, "obj", obj) @@ -157,7 +158,7 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) ]( cli.KwokV1alpha1().PortForwards(""), func(objs []*v1alpha1.PortForward) []*internalversion.PortForward { - return convertList(objs, func(obj *v1alpha1.PortForward) (*internalversion.PortForward, bool) { + return slices.FilterAndMap(objs, func(obj *v1alpha1.PortForward) (*internalversion.PortForward, bool) { r, err := internalversion.ConvertToInternalPortForward(obj) if err != nil { logger.Error("failed to convert to internal port forward", err, "obj", obj) @@ -180,7 +181,7 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) ]( cli.KwokV1alpha1().ClusterExecs(), func(objs []*v1alpha1.ClusterExec) []*internalversion.ClusterExec { - return convertList(objs, func(obj *v1alpha1.ClusterExec) (*internalversion.ClusterExec, bool) { + return slices.FilterAndMap(objs, func(obj *v1alpha1.ClusterExec) (*internalversion.ClusterExec, bool) { r, err := internalversion.ConvertToInternalClusterExec(obj) if err != nil { logger.Error("failed to convert to internal cluster exec", err, "obj", obj) @@ -203,7 +204,7 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) ]( cli.KwokV1alpha1().Execs(""), func(objs []*v1alpha1.Exec) []*internalversion.Exec { - return convertList(objs, func(obj *v1alpha1.Exec) (*internalversion.Exec, bool) { + return slices.FilterAndMap(objs, func(obj *v1alpha1.Exec) (*internalversion.Exec, bool) { r, err := internalversion.ConvertToInternalExec(obj) if err != nil { logger.Error("failed to convert to internal exec", err, "obj", obj) @@ -226,7 +227,7 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) ]( cli.KwokV1alpha1().ClusterLogs(), func(objs []*v1alpha1.ClusterLogs) []*internalversion.ClusterLogs { - return convertList(objs, func(obj *v1alpha1.ClusterLogs) (*internalversion.ClusterLogs, bool) { + return slices.FilterAndMap(objs, func(obj *v1alpha1.ClusterLogs) (*internalversion.ClusterLogs, bool) { r, err := internalversion.ConvertToInternalClusterLogs(obj) if err != nil { logger.Error("failed to convert to internal cluster logs", err, "obj", obj) @@ -249,7 +250,7 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) ]( cli.KwokV1alpha1().Logs(""), func(objs []*v1alpha1.Logs) []*internalversion.Logs { - return convertList(objs, func(obj *v1alpha1.Logs) (*internalversion.Logs, bool) { + return slices.FilterAndMap(objs, func(obj *v1alpha1.Logs) (*internalversion.Logs, bool) { r, err := internalversion.ConvertToInternalLogs(obj) if err != nil { logger.Error("failed to convert to internal logs", err, "obj", obj) @@ -272,7 +273,7 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) ]( cli.KwokV1alpha1().ClusterAttaches(), func(objs []*v1alpha1.ClusterAttach) []*internalversion.ClusterAttach { - return convertList(objs, func(obj *v1alpha1.ClusterAttach) (*internalversion.ClusterAttach, bool) { + return slices.FilterAndMap(objs, func(obj *v1alpha1.ClusterAttach) (*internalversion.ClusterAttach, bool) { r, err := internalversion.ConvertToInternalClusterAttach(obj) if err != nil { logger.Error("failed to convert to internal cluster attach", err, "obj", obj) @@ -295,7 +296,7 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) ]( cli.KwokV1alpha1().Attaches(""), func(objs []*v1alpha1.Attach) []*internalversion.Attach { - return convertList(objs, func(obj *v1alpha1.Attach) (*internalversion.Attach, bool) { + return slices.FilterAndMap(objs, func(obj *v1alpha1.Attach) (*internalversion.Attach, bool) { r, err := internalversion.ConvertToInternalAttach(obj) if err != nil { logger.Error("failed to convert to internal attach", err, "obj", obj) @@ -306,23 +307,12 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error) }, ) starters = append(starters, attaches) + s.attaches = attaches } } return starters, nil } -func convertList[S ~[]T, T any, O any](s S, f func(T) (O, bool)) []O { - out := make([]O, 0, len(s)) - for _, d := range s { - o, ok := f(d) - if !ok { - continue - } - out = append(out, o) - } - return out -} - func getHandlerForDisabledEndpoint(errorMessage string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { http.Error(w, errorMessage, http.StatusMethodNotAllowed) diff --git a/pkg/kwokctl/runtime/cluster.go b/pkg/kwokctl/runtime/cluster.go index ebfae86ad..b5a6dd3f3 100644 --- a/pkg/kwokctl/runtime/cluster.go +++ b/pkg/kwokctl/runtime/cluster.go @@ -181,41 +181,54 @@ func (c *Cluster) Save(ctx context.Context) error { others := config.FilterWithoutTypeFromContext[*internalversion.KwokctlConfiguration](ctx) objs = append(objs, others...) - if conf.Options.NodeLeaseDurationSeconds == 0 { - if updateFrequency := conf.Options.NodeStatusUpdateFrequencyMilliseconds; updateFrequency > 0 && - conf.Options.Runtime != consts.RuntimeTypeKind && - conf.Options.Runtime != consts.RuntimeTypeKindPodman && - len(config.FilterWithTypeFromContext[*internalversion.Stage](ctx)) == 0 { - nodeStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages)) - if err != nil { - return err - } - for _, stage := range nodeStages { - objs = append(objs, stage) - } - - nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) - if err != nil { - return err - } - hasUpdate := false - for _, stage := range nodeHeartbeatStages { - if stage.Name == "node-heartbeat" { - stage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency) - stage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10) - hasUpdate = true - } - objs = append(objs, stage) - } - if !hasUpdate { - return fmt.Errorf("failed to update node heartbeat stage") - } + kwokConfigs := config.FilterWithTypeFromContext[*internalversion.KwokConfiguration](ctx) + if (len(kwokConfigs) == 0 || !slices.Contains(kwokConfigs[0].Options.EnableCRDs, v1alpha1.StageKind)) && + conf.Options.NodeLeaseDurationSeconds == 0 && + conf.Options.NodeStatusUpdateFrequencyMilliseconds > 0 && + conf.Options.Runtime != consts.RuntimeTypeKind && + conf.Options.Runtime != consts.RuntimeTypeKindPodman && + len(config.FilterWithTypeFromContext[*internalversion.Stage](ctx)) == 0 { + defaultStages, err := c.getDefaultStages(conf.Options.NodeStatusUpdateFrequencyMilliseconds, conf.Options.NodeLeaseDurationSeconds == 0) + if err != nil { + return err } + objs = append(objs, defaultStages...) } return config.Save(ctx, c.GetWorkdirPath(ConfigName), objs) } +func (c *Cluster) getDefaultStages(updateFrequency int64, nodeHeartbeat bool) ([]config.InternalObject, error) { + objs := []config.InternalObject{} + nodeStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages)) + if err != nil { + return nil, err + } + for _, stage := range nodeStages { + objs = append(objs, stage) + } + + if nodeHeartbeat { + nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) + if err != nil { + return nil, err + } + hasUpdate := false + for _, stage := range nodeHeartbeatStages { + if stage.Name == "node-heartbeat" { + stage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency) + stage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10) + hasUpdate = true + } + objs = append(objs, stage) + } + if !hasUpdate { + return nil, fmt.Errorf("failed to update node heartbeat stage") + } + } + return objs, nil +} + func (c *Cluster) kubectlPath(ctx context.Context) (string, error) { config, err := c.Config(ctx) if err != nil { diff --git a/pkg/utils/slices/slices.go b/pkg/utils/slices/slices.go index ada3f0295..71a24b958 100644 --- a/pkg/utils/slices/slices.go +++ b/pkg/utils/slices/slices.go @@ -46,6 +46,18 @@ func Filter[S ~[]T, T any](s S, f func(T) bool) []T { return out } +// FilterAndMap returns a new slice containing the results of applying the given function +// to all elements in the slice that satisfy the predicate f. +func FilterAndMap[S ~[]T, T any, O any](s S, f func(T) (O, bool)) []O { + out := make([]O, 0, len(s)) + for _, v := range s { + if o, ok := f(v); ok { + out = append(out, o) + } + } + return out +} + // Contains returns true if the slice contains the given element. func Contains[S ~[]T, T comparable](s S, t T) bool { for _, v := range s { diff --git a/site/content/en/docs/generated/kwok.md b/site/content/en/docs/generated/kwok.md index 6b4d54383..1c1e705dd 100644 --- a/site/content/en/docs/generated/kwok.md +++ b/site/content/en/docs/generated/kwok.md @@ -13,6 +13,7 @@ kwok [flags] -c, --config stringArray config path (default [~/.kwok/kwok.yaml]) --disregard-status-with-annotation-selector string All node/pod status excluding the ones that match the annotation selector will be watched and managed. --disregard-status-with-label-selector string All node/pod status excluding the ones that match the label selector will be watched and managed. + --enable-crd stringArray List of CRDs to enable --experimental-enable-cni Experimental support for getting pod ip from CNI, for CNI-related components, Only works with Linux -h, --help help for kwok --kubeconfig string Path to the kubeconfig file to use (default "~/.kube/config") diff --git a/stages/kustomization.yaml b/stages/kustomization.yaml new file mode 100644 index 000000000..d8fe0f505 --- /dev/null +++ b/stages/kustomization.yaml @@ -0,0 +1,5 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: +- node-fast.yaml +- pod-fast.yaml diff --git a/test/kwok-with-cni/kwok-with-cni.test.sh b/test/kwok-with-cni/kwok-with-cni.test.sh index 133741b3c..daa84aa1f 100755 --- a/test/kwok-with-cni/kwok-with-cni.test.sh +++ b/test/kwok-with-cni/kwok-with-cni.test.sh @@ -35,6 +35,7 @@ function start_cluster() { kind load docker-image --name="${CLUSTER_NAME}" "${KWOK_IMAGE}:${KWOK_VERSION}" kubectl kustomize "${DIR}" | kubectl apply -f - + kubectl kustomize "${ROOT_DIR}/stages" | kubectl apply -f - } # Check for normal heartbeat diff --git a/test/kwok/kwok.test.sh b/test/kwok/kwok.test.sh index a832e5271..a6022734e 100755 --- a/test/kwok/kwok.test.sh +++ b/test/kwok/kwok.test.sh @@ -34,6 +34,7 @@ function start_cluster() { kind load docker-image --name="${CLUSTER_NAME}" "${KWOK_IMAGE}:${KWOK_VERSION}" kubectl kustomize "${DIR}" | kubectl apply -f - + kubectl kustomize "${ROOT_DIR}/stages" | kubectl apply -f - } # Check for normal heartbeat