From 0c59ad29daed32f1b48f7e1f4779b6af2a063dac Mon Sep 17 00:00:00 2001 From: Shiming Zhang Date: Wed, 17 May 2023 18:16:48 +0800 Subject: [PATCH] Add node lease controller --- kustomize/kwok/kwok-clusterrole.yaml | 11 + kustomize/kwok/kwok-deployment.yaml | 1 + .../v1alpha1/kwok_configuration_types.go | 7 + .../v1alpha1/kwokctl_configuration_types.go | 4 + .../config/v1alpha1/zz_generated.defaults.go | 6 + .../kwok_configuration_types.go | 6 + .../kwokctl_configuration_types.go | 3 + .../zz_generated.conversion.go | 6 + pkg/config/vars.go | 8 + pkg/kwok/cmd/root.go | 19 + pkg/kwok/controllers/controller.go | 102 ++++- pkg/kwok/controllers/controller_test.go | 2 + pkg/kwok/controllers/node_controller.go | 119 +++++- pkg/kwok/controllers/node_controller_test.go | 6 +- pkg/kwok/controllers/node_lease_controller.go | 392 ++++++++++++++++++ .../controllers/node_lease_controller_test.go | 274 ++++++++++++ pkg/kwok/controllers/pod_controller.go | 89 +++- pkg/kwokctl/components/kwok_controller.go | 31 +- pkg/kwokctl/runtime/binary/cluster.go | 25 +- pkg/kwokctl/runtime/cluster.go | 46 +- pkg/kwokctl/runtime/compose/cluster.go | 27 +- pkg/kwokctl/runtime/kind/cluster.go | 11 +- .../runtime/kind/kwok_controller_pod.go | 15 +- .../runtime/kind/kwok_controller_pod.yaml.tpl | 1 + pkg/utils/format/pointer.go | 8 + pkg/utils/wait/wait.go | 5 + site/content/en/docs/generated/kwok.md | 1 + stages/embed.go | 4 + stages/node-fast.yaml | 36 -- stages/node-heartbeat.yaml | 35 ++ test/kwok/kustomization.yaml | 9 + test/kwok/kwok.test.sh | 33 ++ 32 files changed, 1203 insertions(+), 139 deletions(-) create mode 100644 pkg/kwok/controllers/node_lease_controller.go create mode 100644 pkg/kwok/controllers/node_lease_controller_test.go create mode 100644 stages/node-heartbeat.yaml diff --git a/kustomize/kwok/kwok-clusterrole.yaml b/kustomize/kwok/kwok-clusterrole.yaml index 64d8e57efb..4da07fbe7b 100644 --- a/kustomize/kwok/kwok-clusterrole.yaml +++ b/kustomize/kwok/kwok-clusterrole.yaml @@ -35,3 +35,14 @@ rules: verbs: - update - patch + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - update + - patch + - watch + - list + - get diff --git a/kustomize/kwok/kwok-deployment.yaml b/kustomize/kwok/kwok-deployment.yaml index 53e692221b..6413675d2f 100644 --- a/kustomize/kwok/kwok-deployment.yaml +++ b/kustomize/kwok/kwok-deployment.yaml @@ -27,6 +27,7 @@ spec: - --node-ip=$(POD_IP) - --node-port=10247 - --cidr=10.0.0.1/24 + - --node-lease-duration-seconds=40 env: - name: POD_IP valueFrom: diff --git a/pkg/apis/config/v1alpha1/kwok_configuration_types.go b/pkg/apis/config/v1alpha1/kwok_configuration_types.go index 15201756f4..7d7c548566 100644 --- a/pkg/apis/config/v1alpha1/kwok_configuration_types.go +++ b/pkg/apis/config/v1alpha1/kwok_configuration_types.go @@ -118,4 +118,11 @@ type KwokConfigurationOptions struct { // NodePlayStageParallelism is the number of NodePlayStages that are allowed to run in parallel. // +default=4 NodePlayStageParallelism uint `json:"nodePlayStageParallelism,omitempty"` + + // NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease. + NodeLeaseDurationSeconds uint `json:"nodeLeaseDurationSeconds,omitempty"` + + // NodeLeaseParallelism is the number of NodeLeases that are allowed to be processed in parallel. + // +default=4 + NodeLeaseParallelism uint `json:"nodeLeaseParallelism,omitempty"` } diff --git a/pkg/apis/config/v1alpha1/kwokctl_configuration_types.go b/pkg/apis/config/v1alpha1/kwokctl_configuration_types.go index 9e17856187..4a457cfb56 100644 --- a/pkg/apis/config/v1alpha1/kwokctl_configuration_types.go +++ b/pkg/apis/config/v1alpha1/kwokctl_configuration_types.go @@ -319,6 +319,10 @@ type KwokctlConfigurationOptions struct { // +default=1200000 NodeStatusUpdateFrequencyMilliseconds int64 `json:"nodeStatusUpdateFrequencyMilliseconds,omitempty"` + // NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease. + // +default=1200 + NodeLeaseDurationSeconds uint `json:"nodeLeaseDurationSeconds,omitempty"` + // BindAddress is the address to bind to. // +default="0.0.0.0" BindAddress string `json:"bindAddress,omitempty"` diff --git a/pkg/apis/config/v1alpha1/zz_generated.defaults.go b/pkg/apis/config/v1alpha1/zz_generated.defaults.go index 31da11c0a3..53403772ba 100644 --- a/pkg/apis/config/v1alpha1/zz_generated.defaults.go +++ b/pkg/apis/config/v1alpha1/zz_generated.defaults.go @@ -64,6 +64,9 @@ func SetObjectDefaults_KwokConfiguration(in *KwokConfiguration) { if in.Options.NodePlayStageParallelism == 0 { in.Options.NodePlayStageParallelism = 4 } + if in.Options.NodeLeaseParallelism == 0 { + in.Options.NodeLeaseParallelism = 4 + } } func SetObjectDefaults_KwokctlConfiguration(in *KwokctlConfiguration) { @@ -96,6 +99,9 @@ func SetObjectDefaults_KwokctlConfiguration(in *KwokctlConfiguration) { if in.Options.NodeStatusUpdateFrequencyMilliseconds == 0 { in.Options.NodeStatusUpdateFrequencyMilliseconds = 1200000 } + if in.Options.NodeLeaseDurationSeconds == 0 { + in.Options.NodeLeaseDurationSeconds = 1200 + } if in.Options.BindAddress == "" { in.Options.BindAddress = "0.0.0.0" } diff --git a/pkg/apis/internalversion/kwok_configuration_types.go b/pkg/apis/internalversion/kwok_configuration_types.go index 3261bccd53..a967610603 100644 --- a/pkg/apis/internalversion/kwok_configuration_types.go +++ b/pkg/apis/internalversion/kwok_configuration_types.go @@ -85,4 +85,10 @@ type KwokConfigurationOptions struct { // NodePlayStageParallelism is the number of NodePlayStages that are allowed to run in parallel. NodePlayStageParallelism uint + + // NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease. + NodeLeaseDurationSeconds uint + + // NodeLeaseParallelism is the number of NodeLeases that are allowed to be processed in parallel. + NodeLeaseParallelism uint } diff --git a/pkg/apis/internalversion/kwokctl_configuration_types.go b/pkg/apis/internalversion/kwokctl_configuration_types.go index 35cba98308..60a67699f0 100644 --- a/pkg/apis/internalversion/kwokctl_configuration_types.go +++ b/pkg/apis/internalversion/kwokctl_configuration_types.go @@ -204,6 +204,9 @@ type KwokctlConfigurationOptions struct { // NodeStatusUpdateFrequencyMilliseconds is --node-status-update-frequency for kwok like kubelet. NodeStatusUpdateFrequencyMilliseconds int64 + // NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease. + NodeLeaseDurationSeconds uint + // BindAddress is the address to bind to. BindAddress string } diff --git a/pkg/apis/internalversion/zz_generated.conversion.go b/pkg/apis/internalversion/zz_generated.conversion.go index 92fc844aea..1ed44b9fb1 100644 --- a/pkg/apis/internalversion/zz_generated.conversion.go +++ b/pkg/apis/internalversion/zz_generated.conversion.go @@ -1160,6 +1160,8 @@ func autoConvert_internalversion_KwokConfigurationOptions_To_v1alpha1_KwokConfig } out.PodPlayStageParallelism = in.PodPlayStageParallelism out.NodePlayStageParallelism = in.NodePlayStageParallelism + out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds + out.NodeLeaseParallelism = in.NodeLeaseParallelism return nil } @@ -1197,6 +1199,8 @@ func autoConvert_v1alpha1_KwokConfigurationOptions_To_internalversion_KwokConfig } out.PodPlayStageParallelism = in.PodPlayStageParallelism out.NodePlayStageParallelism = in.NodePlayStageParallelism + out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds + out.NodeLeaseParallelism = in.NodeLeaseParallelism return nil } @@ -1338,6 +1342,7 @@ func autoConvert_internalversion_KwokctlConfigurationOptions_To_v1alpha1_Kwokctl out.KubeControllerManagerNodeMonitorPeriodMilliseconds = in.KubeControllerManagerNodeMonitorPeriodMilliseconds out.KubeControllerManagerNodeMonitorGracePeriodMilliseconds = in.KubeControllerManagerNodeMonitorGracePeriodMilliseconds out.NodeStatusUpdateFrequencyMilliseconds = in.NodeStatusUpdateFrequencyMilliseconds + out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds out.BindAddress = in.BindAddress return nil } @@ -1420,6 +1425,7 @@ func autoConvert_v1alpha1_KwokctlConfigurationOptions_To_internalversion_Kwokctl out.KubeControllerManagerNodeMonitorPeriodMilliseconds = in.KubeControllerManagerNodeMonitorPeriodMilliseconds out.KubeControllerManagerNodeMonitorGracePeriodMilliseconds = in.KubeControllerManagerNodeMonitorGracePeriodMilliseconds out.NodeStatusUpdateFrequencyMilliseconds = in.NodeStatusUpdateFrequencyMilliseconds + out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds out.BindAddress = in.BindAddress return nil } diff --git a/pkg/config/vars.go b/pkg/config/vars.go index 6b573a9971..7baf4df6f3 100644 --- a/pkg/config/vars.go +++ b/pkg/config/vars.go @@ -202,6 +202,14 @@ func setKwokctlConfigurationDefaults(config *configv1alpha1.KwokctlConfiguration } } + // Disable node lease duration seconds for kubernetes < 1.14 + if conf.NodeLeaseDurationSeconds != 0 { + minor := parseRelease(conf.KubeVersion) + if minor < 14 && minor != -1 { + conf.NodeLeaseDurationSeconds = 0 + } + } + setKwokctlKubernetesConfig(conf) setKwokctlKwokConfig(conf) diff --git a/pkg/kwok/cmd/root.go b/pkg/kwok/cmd/root.go index ce72bfbb44..a654178132 100644 --- a/pkg/kwok/cmd/root.go +++ b/pkg/kwok/cmd/root.go @@ -25,6 +25,7 @@ import ( "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/utils/clock" "sigs.k8s.io/kwok/pkg/apis/internalversion" "sigs.k8s.io/kwok/pkg/config" @@ -81,6 +82,7 @@ func NewCommand(ctx context.Context) *cobra.Command { cmd.Flags().StringVar(&flags.Kubeconfig, "kubeconfig", flags.Kubeconfig, "Path to the kubeconfig file to use") cmd.Flags().StringVar(&flags.Master, "master", flags.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).") cmd.Flags().StringVar(&flags.Options.ServerAddress, "server-address", flags.Options.ServerAddress, "Address to expose the server on") + cmd.Flags().UintVar(&flags.Options.NodeLeaseDurationSeconds, "node-lease-duration-seconds", flags.Options.NodeLeaseDurationSeconds, "Duration of node lease seconds") cmd.Flags().BoolVar(&flags.Options.EnableCNI, "experimental-enable-cni", flags.Options.EnableCNI, "Experimental support for getting pod ip from CNI, for CNI-related components, Only works with Linux") if config.GOOS != "linux" { @@ -145,6 +147,13 @@ func runE(ctx context.Context, flags *flagpole) error { if err != nil { return err } + if flags.Options.NodeLeaseDurationSeconds == 0 { + nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) + if err != nil { + return err + } + nodeStages = append(nodeStages, nodeHeartbeatStages...) + } } podStages := filterStages(stagesData, "v1", "Pod") if len(podStages) == 0 { @@ -154,7 +163,14 @@ func runE(ctx context.Context, flags *flagpole) error { } } + id, err := controllers.Identity() + if err != nil { + return err + } + ctx = log.NewContext(ctx, logger.With("id", id)) + ctr, err := controllers.NewController(controllers.Config{ + Clock: clock.RealClock{}, ClientSet: typedClient, EnableCNI: flags.Options.EnableCNI, ManageAllNodes: flags.Options.ManageAllNodes, @@ -170,6 +186,9 @@ func runE(ctx context.Context, flags *flagpole) error { NodePlayStageParallelism: flags.Options.NodePlayStageParallelism, NodeStages: nodeStages, PodStages: podStages, + NodeLeaseParallelism: flags.Options.NodeLeaseParallelism, + NodeLeaseDurationSeconds: flags.Options.NodeLeaseDurationSeconds, + ID: id, }) if err != nil { return err diff --git a/pkg/kwok/controllers/controller.go b/pkg/kwok/controllers/controller.go index 193432b3af..9265fdd274 100644 --- a/pkg/kwok/controllers/controller.go +++ b/pkg/kwok/controllers/controller.go @@ -19,16 +19,20 @@ package controllers import ( "context" "fmt" + "os" "strings" "text/template" "time" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" + "k8s.io/utils/clock" "sigs.k8s.io/yaml" "sigs.k8s.io/kwok/pkg/apis/internalversion" @@ -68,12 +72,14 @@ var ( type Controller struct { nodes *NodeController pods *PodController + nodeLeases *NodeLeaseController broadcaster record.EventBroadcaster clientSet kubernetes.Interface } // Config is the configuration for the controller type Config struct { + Clock clock.Clock EnableCNI bool ClientSet kubernetes.Interface ManageAllNodes bool @@ -89,6 +95,9 @@ type Config struct { NodeStages []*internalversion.Stage PodPlayStageParallelism uint NodePlayStageParallelism uint + NodeLeaseDurationSeconds uint + NodeLeaseParallelism uint + ID string } // NewController creates a new fake kubelet controller @@ -121,9 +130,49 @@ func NewController(conf Config) (*Controller, error) { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"}) - var onNodeManagedFunc func(ctx context.Context, nodeName string) error + var ( + nodeLeases *NodeLeaseController + getNodeOwnerFunc func(nodeName string) []metav1.OwnerReference + onLeaseNodeManageFunc func(nodeName string) + onNodeManagedFunc func(nodeName string) + readOnlyFunc func(nodeName string) bool + ) + + if conf.NodeLeaseDurationSeconds != 0 { + leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second + // https://github.com/kubernetes/kubernetes/blob/02f4d643eae2e225591702e1bbf432efea453a26/pkg/kubelet/kubelet.go#L199-L200 + renewInterval := leaseDuration / 4 + // https://github.com/kubernetes/component-helpers/blob/d17b6f1e84500ee7062a26f5327dc73cb3e9374a/apimachinery/lease/controller.go#L100 + renewIntervalJitter := 0.04 + l, err := NewNodeLeaseController(NodeLeaseControllerConfig{ + Clock: conf.Clock, + ClientSet: conf.ClientSet, + LeaseDurationSeconds: conf.NodeLeaseDurationSeconds, + LeaseParallelism: conf.NodeLeaseParallelism, + RenewInterval: renewInterval, + RenewIntervalJitter: renewIntervalJitter, + LeaseNamespace: corev1.NamespaceNodeLease, + MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference { + return getNodeOwnerFunc(nodeName) + }), + HolderIdentity: conf.ID, + OnNodeManagedFunc: func(nodeName string) { + onLeaseNodeManageFunc(nodeName) + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to create node leases controller: %w", err) + } + nodeLeases = l + + // Not holding the lease means the node is not managed + readOnlyFunc = func(nodeName string) bool { + return !nodeLeases.Held(nodeName) + } + } nodes, err := NewNodeController(NodeControllerConfig{ + Clock: conf.Clock, ClientSet: conf.ClientSet, NodeIP: conf.NodeIP, NodeName: conf.NodeName, @@ -132,19 +181,21 @@ func NewController(conf Config) (*Controller, error) { DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector, ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector, NodeSelectorFunc: nodeSelectorFunc, - OnNodeManagedFunc: func(ctx context.Context, nodeName string) error { - return onNodeManagedFunc(ctx, nodeName) + OnNodeManagedFunc: func(nodeName string) { + onNodeManagedFunc(nodeName) }, Stages: conf.NodeStages, PlayStageParallelism: conf.NodePlayStageParallelism, FuncMap: defaultFuncMap, Recorder: recorder, + ReadOnlyFunc: readOnlyFunc, }) if err != nil { return nil, fmt.Errorf("failed to create nodes controller: %w", err) } pods, err := NewPodController(PodControllerConfig{ + Clock: conf.Clock, EnableCNI: conf.EnableCNI, ClientSet: conf.ClientSet, NodeIP: conf.NodeIP, @@ -153,19 +204,45 @@ func NewController(conf Config) (*Controller, error) { DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector, Stages: conf.PodStages, PlayStageParallelism: conf.PodPlayStageParallelism, + Namespace: corev1.NamespaceAll, NodeGetFunc: nodes.Get, FuncMap: defaultFuncMap, Recorder: recorder, + ReadOnlyFunc: readOnlyFunc, }) if err != nil { return nil, fmt.Errorf("failed to create pods controller: %w", err) } - onNodeManagedFunc = pods.PlayStagePodsOnNode + if nodeLeases != nil { + getNodeOwnerFunc = func(nodeName string) []metav1.OwnerReference { + nodeInfo, ok := nodes.Get(nodeName) + if !ok || nodeInfo == nil { + return nil + } + return nodeInfo.OwnerReferences + } + onLeaseNodeManageFunc = func(nodeName string) { + // Manage the node and play stage all pods on the node + nodes.Manage(nodeName) + pods.PlayStagePodsOnNode(nodeName) + } + + onNodeManagedFunc = func(nodeName string) { + // Try to hold the lease + nodeLeases.TryHold(nodeName) + } + } else { + onNodeManagedFunc = func(nodeName string) { + // Play stage all pods on the node + pods.PlayStagePodsOnNode(nodeName) + } + } n := &Controller{ pods: pods, nodes: nodes, + nodeLeases: nodeLeases, broadcaster: eventBroadcaster, clientSet: conf.ClientSet, } @@ -176,7 +253,12 @@ func NewController(conf Config) (*Controller, error) { // Start starts the controller func (c *Controller) Start(ctx context.Context) error { c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.clientSet.CoreV1().Events("")}) - + if c.nodeLeases != nil { + err := c.nodeLeases.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start node leases controller: %w", err) + } + } err := c.pods.Start(ctx) if err != nil { return fmt.Errorf("failed to start pods controller: %w", err) @@ -187,3 +269,13 @@ func (c *Controller) Start(ctx context.Context) error { } return nil } + +// Identity returns a unique identifier for this controller +func Identity() (string, error) { + hostname, err := os.Hostname() + if err != nil { + return "", fmt.Errorf("unable to get hostname: %w", err) + } + // add a uniquifier so that two processes on the same host don't accidentally both become active + return hostname + "_" + string(uuid.NewUUID()), nil +} diff --git a/pkg/kwok/controllers/controller_test.go b/pkg/kwok/controllers/controller_test.go index 3777ec20a0..d21d223d60 100644 --- a/pkg/kwok/controllers/controller_test.go +++ b/pkg/kwok/controllers/controller_test.go @@ -68,6 +68,8 @@ func TestController(t *testing.T) { } nodeStages, _ := NewStagesFromYaml([]byte(stages.DefaultNodeStages)) + nodeHeartbeatStage, _ := NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) + nodeStages = append(nodeStages, nodeHeartbeatStage...) podStages, _ := NewStagesFromYaml([]byte(stages.DefaultPodStages)) tests := []struct { diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index d92f7d5903..a70c5513a6 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/pager" "k8s.io/client-go/tools/record" + "k8s.io/utils/clock" netutils "k8s.io/utils/net" "sigs.k8s.io/kwok/pkg/apis/internalversion" @@ -82,10 +83,12 @@ var ( }, } nodeConditionsData, _ = expression.ToJSONStandard(nodeConditions) + nodeKind = corev1.SchemeGroupVersion.WithKind("Node") ) // NodeController is a fake nodes implementation that can be used to test type NodeController struct { + clock clock.Clock clientSet kubernetes.Interface nodeIP string nodeName string @@ -94,7 +97,7 @@ type NodeController struct { disregardStatusWithLabelSelector labels.Selector manageNodesWithLabelSelector string nodeSelectorFunc func(node *corev1.Node) bool - onNodeManagedFunc func(ctx context.Context, nodeName string) error + onNodeManagedFunc func(nodeName string) nodesSets maps.SyncMap[string, *NodeInfo] renderer *renderer preprocessChan chan *corev1.Node @@ -104,13 +107,16 @@ type NodeController struct { cronjob *cron.Cron delayJobs jobInfoMap recorder record.EventRecorder + readOnlyFunc func(nodeName string) bool + triggerPreprocessChan chan string } // NodeControllerConfig is the configuration for the NodeController type NodeControllerConfig struct { + Clock clock.Clock ClientSet kubernetes.Interface NodeSelectorFunc func(node *corev1.Node) bool - OnNodeManagedFunc func(ctx context.Context, nodeName string) error + OnNodeManagedFunc func(nodeName string) DisregardStatusWithAnnotationSelector string DisregardStatusWithLabelSelector string ManageNodesWithLabelSelector string @@ -121,12 +127,15 @@ type NodeControllerConfig struct { PlayStageParallelism uint FuncMap template.FuncMap Recorder record.EventRecorder + ReadOnlyFunc func(nodeName string) bool } // NodeInfo is the collection of necessary node information type NodeInfo struct { - HostIPs []string - PodCIDRs []string + Node *corev1.Node + HostIPs []string + PodCIDRs []string + OwnerReferences []metav1.OwnerReference } // NewNodeController creates a new fake nodes controller @@ -150,7 +159,12 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { return nil, err } + if conf.Clock == nil { + conf.Clock = clock.RealClock{} + } + c := &NodeController{ + clock: conf.Clock, clientSet: conf.ClientSet, nodeSelectorFunc: conf.NodeSelectorFunc, disregardStatusWithAnnotationSelector: disregardStatusWithAnnotationSelector, @@ -164,8 +178,10 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { lifecycle: lifecycles, playStageParallelism: conf.PlayStageParallelism, preprocessChan: make(chan *corev1.Node), + triggerPreprocessChan: make(chan string, 16), playStageChan: make(chan resourceStageJob[*corev1.Node]), recorder: conf.Recorder, + readOnlyFunc: conf.ReadOnlyFunc, } funcMap := template.FuncMap{ "NodeIP": c.funcNodeIP, @@ -186,6 +202,7 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { // if nodeSelectorFunc is not nil, it will use it to determine if the node should be managed func (c *NodeController) Start(ctx context.Context) error { go c.preprocessWorker(ctx) + go c.triggerPreprocessWorker(ctx) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } @@ -205,7 +222,6 @@ func (c *NodeController) Start(ctx context.Context) error { logger.Error("Failed list nodes", err) } }() - return nil } @@ -254,7 +270,7 @@ func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOpti select { case <-ctx.Done(): break loop - case <-time.After(time.Second * 5): + case <-c.clock.After(time.Second * 5): } } } @@ -263,21 +279,32 @@ func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOpti node := event.Object.(*corev1.Node) if c.need(node) { c.putNodeInfo(node) - c.preprocessChan <- node + if c.readOnly(node.Name) { + logger.Debug("Skip node", + "reason", "read only", + "event", event.Type, + "node", node.Name, + ) + } else { + c.preprocessChan <- node + } if c.onNodeManagedFunc != nil { - err = c.onNodeManagedFunc(ctx, node.Name) - if err != nil { - logger.Error("Failed to onNodeManagedFunc", err, - "node", node.Name, - ) - } + c.onNodeManagedFunc(node.Name) } } case watch.Modified: node := event.Object.(*corev1.Node) if c.need(node) { c.putNodeInfo(node) - c.preprocessChan <- node + if c.readOnly(node.Name) { + logger.Debug("Skip node", + "reason", "read only", + "event", event.Type, + "node", node.Name, + ) + } else { + c.preprocessChan <- node + } } case watch.Deleted: node := event.Object.(*corev1.Node) @@ -307,11 +334,21 @@ func (c *NodeController) listResources(ctx context.Context, opt metav1.ListOptio listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return c.clientSet.CoreV1().Nodes().List(ctx, opts) }) + + logger := log.FromContext(ctx) + return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { node := obj.(*corev1.Node) if c.need(node) { c.putNodeInfo(node) - c.preprocessChan <- node + if c.readOnly(node.Name) { + logger.Debug("Skip node", + "node", node.Name, + "reason", "read only", + ) + } else { + c.preprocessChan <- node + } } return nil }) @@ -332,6 +369,7 @@ func (c *NodeController) finalizersModify(ctx context.Context, node *corev1.Node logger = logger.With( "node", node.Name, ) + result, err := c.clientSet.CoreV1().Nodes().Patch(ctx, node.Name, types.JSONPatchType, data, metav1.PatchOptions{}) if err != nil { if apierrors.IsNotFound(err) { @@ -352,6 +390,7 @@ func (c *NodeController) deleteResource(ctx context.Context, node *corev1.Node) logger = logger.With( "node", node.Name, ) + err := c.clientSet.CoreV1().Nodes().Delete(ctx, node.Name, deleteOpt) if err != nil { if apierrors.IsNotFound(err) { @@ -380,6 +419,26 @@ func (c *NodeController) preprocessWorker(ctx context.Context) { } } +// triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it +func (c *NodeController) triggerPreprocessWorker(ctx context.Context) { + logger := log.FromContext(ctx) + for nodeName := range c.triggerPreprocessChan { + nodeInfo, has := c.nodesSets.Load(nodeName) + if !has || nodeInfo.Node == nil { + logger.Warn("Node not found", + "node", nodeName, + ) + continue + } + err := c.preprocess(ctx, nodeInfo.Node) + if err != nil { + logger.Error("Failed to preprocess node", err, + "node", nodeName, + ) + } + } +} + // preprocess the pod and send it to the playStageWorker func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) error { key := node.Name @@ -409,7 +468,8 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro ) return nil } - now := time.Now() + + now := c.clock.Now() delay, _ := stage.Delay(ctx, data, now) if delay != 0 { @@ -453,6 +513,11 @@ func (c *NodeController) playStageWorker(ctx context.Context) { func (c *NodeController) playStage(ctx context.Context, node *corev1.Node, stage *LifecycleStage) { next := stage.Next() logger := log.FromContext(ctx) + logger = logger.With( + "node", node.Name, + "stage", stage.Name(), + ) + if next.Event != nil && c.recorder != nil { c.recorder.Event(&corev1.ObjectReference{ Kind: "Node", @@ -497,12 +562,20 @@ func (c *NodeController) playStage(ctx context.Context, node *corev1.Node, stage } } +func (c *NodeController) readOnly(nodeName string) bool { + if c.readOnlyFunc == nil { + return false + } + return c.readOnlyFunc(nodeName) +} + // patchResource patches the resource func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, patch []byte) (*corev1.Node, error) { logger := log.FromContext(ctx) logger = logger.With( "node", node.Name, ) + result, err := c.clientSet.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status") if err != nil { if apierrors.IsNotFound(err) { @@ -566,12 +639,26 @@ func (c *NodeController) putNodeInfo(node *corev1.Node) { } nodeInfo := &NodeInfo{ + Node: node, HostIPs: hostIps, PodCIDRs: podCIDRs, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: nodeKind.Version, + Kind: nodeKind.Kind, + Name: node.Name, + UID: node.UID, + }, + }, } c.nodesSets.Store(node.Name, nodeInfo) } +// Manage manages the node +func (c *NodeController) Manage(nodeName string) { + c.triggerPreprocessChan <- nodeName +} + // getNodeHostIPs returns the provided node's IP(s); either a single "primary IP" for the // node in a single-stack cluster, or a dual-stack pair of IPs in a dual-stack cluster // (for nodes that actually have dual-stack IPs). Among other things, the IPs returned diff --git a/pkg/kwok/controllers/node_controller_test.go b/pkg/kwok/controllers/node_controller_test.go index 82b26485ee..699e330af3 100644 --- a/pkg/kwok/controllers/node_controller_test.go +++ b/pkg/kwok/controllers/node_controller_test.go @@ -68,12 +68,14 @@ func TestNodeController(t *testing.T) { nodeSelectorFunc := func(node *corev1.Node) bool { return strings.HasPrefix(node.Name, "node") } - nodeStageStatus, _ := NewStagesFromYaml([]byte(stages.DefaultNodeStages)) + nodeStages, _ := NewStagesFromYaml([]byte(stages.DefaultNodeStages)) + nodeHeartbeatStage, _ := NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) + nodeStages = append(nodeStages, nodeHeartbeatStage...) nodes, err := NewNodeController(NodeControllerConfig{ ClientSet: clientset, NodeIP: "10.0.0.1", NodeSelectorFunc: nodeSelectorFunc, - Stages: nodeStageStatus, + Stages: nodeStages, FuncMap: defaultFuncMap, PlayStageParallelism: 2, }) diff --git a/pkg/kwok/controllers/node_lease_controller.go b/pkg/kwok/controllers/node_lease_controller.go new file mode 100644 index 0000000000..4e7d880f74 --- /dev/null +++ b/pkg/kwok/controllers/node_lease_controller.go @@ -0,0 +1,392 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/wzshiming/cron" + coordinationv1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/pager" + "k8s.io/utils/clock" + + "sigs.k8s.io/kwok/pkg/log" + "sigs.k8s.io/kwok/pkg/utils/format" + "sigs.k8s.io/kwok/pkg/utils/maps" + "sigs.k8s.io/kwok/pkg/utils/wait" +) + +// NodeLeaseController is responsible for creating and renewing a lease object +type NodeLeaseController struct { + clientSet clientset.Interface + leaseNamespace string + leaseDurationSeconds uint + leaseParallelism uint + renewInterval time.Duration + renewIntervalJitter float64 + clock clock.Clock + + // latestLease is the latest lease which the NodeLeaseController updated or created + latestLease maps.SyncMap[string, *coordinationv1.Lease] + + // mutateLeaseFunc allows customizing a lease object + mutateLeaseFunc func(*coordinationv1.Lease) error + + cronjob *cron.Cron + cancelJob maps.SyncMap[string, cron.DoFunc] + + leaseChan chan string + + holderIdentity string + onNodeManagedFunc func(nodeName string) +} + +// NodeLeaseControllerConfig is the configuration for NodeLeaseController +type NodeLeaseControllerConfig struct { + Clock clock.Clock + HolderIdentity string + ClientSet clientset.Interface + LeaseDurationSeconds uint + LeaseParallelism uint + RenewInterval time.Duration + RenewIntervalJitter float64 + LeaseNamespace string + MutateLeaseFunc func(*coordinationv1.Lease) error + OnNodeManagedFunc func(nodeName string) +} + +// NewNodeLeaseController constructs and returns a NodeLeaseController +func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseController, error) { + if conf.LeaseParallelism <= 0 { + return nil, fmt.Errorf("node leases parallelism must be greater than 0") + } + + if conf.Clock == nil { + conf.Clock = clock.RealClock{} + } + + c := &NodeLeaseController{ + clock: conf.Clock, + clientSet: conf.ClientSet, + leaseNamespace: conf.LeaseNamespace, + leaseDurationSeconds: conf.LeaseDurationSeconds, + leaseParallelism: conf.LeaseParallelism, + renewInterval: conf.RenewInterval, + renewIntervalJitter: conf.RenewIntervalJitter, + mutateLeaseFunc: conf.MutateLeaseFunc, + cronjob: cron.NewCron(), + leaseChan: make(chan string), + holderIdentity: conf.HolderIdentity, + onNodeManagedFunc: conf.OnNodeManagedFunc, + } + + return c, nil +} + +// Start starts the NodeLeaseController +func (c *NodeLeaseController) Start(ctx context.Context) error { + for i := uint(0); i < c.leaseParallelism; i++ { + go c.syncWorker(ctx) + } + + opt := metav1.ListOptions{} + err := c.watchResources(ctx, opt) + if err != nil { + return fmt.Errorf("failed watch node leases: %w", err) + } + + logger := log.FromContext(ctx) + go func() { + err = c.listResources(ctx, opt) + if err != nil { + logger.Error("Failed list node leases", err) + } + }() + return nil +} + +// watchResources watch resources and send to preprocessChan +func (c *NodeLeaseController) watchResources(ctx context.Context, opt metav1.ListOptions) error { + // Watch node leases in the cluster + watcher, err := c.clientSet.CoordinationV1().Leases(c.leaseNamespace).Watch(ctx, opt) + if err != nil { + return err + } + + logger := log.FromContext(ctx) + go func() { + rc := watcher.ResultChan() + loop: + for { + select { + case event, ok := <-rc: + if !ok { + for { + watcher, err := c.clientSet.CoordinationV1().Leases(c.leaseNamespace).Watch(ctx, opt) + if err == nil { + rc = watcher.ResultChan() + continue loop + } + + logger.Error("Failed to watch node leases", err) + select { + case <-ctx.Done(): + break loop + case <-c.clock.After(time.Second * 5): + } + } + } + switch event.Type { + case watch.Added, watch.Modified: + lease := event.Object.(*coordinationv1.Lease) + c.latestLease.Store(lease.Name, lease) + case watch.Deleted: + lease := event.Object.(*coordinationv1.Lease) + c.remove(lease.Name) + } + case <-ctx.Done(): + watcher.Stop() + break loop + } + } + logger.Info("Stop watch node leases") + }() + return nil +} + +// listResources lists all resources and sends to preprocessChan +func (c *NodeLeaseController) listResources(ctx context.Context, opt metav1.ListOptions) error { + listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return c.clientSet.CoordinationV1().Leases(c.leaseNamespace).List(ctx, opts) + }) + + return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { + lease := obj.(*coordinationv1.Lease) + c.latestLease.Store(lease.Name, lease) + return nil + }) +} + +func (c *NodeLeaseController) syncWorker(ctx context.Context) { + for nodeName := range c.leaseChan { + c.sync(ctx, nodeName) + } +} + +func (c *NodeLeaseController) nextTryTime(name string, now time.Time) time.Time { + next := now.Add(wait.Jitter(c.renewInterval, c.renewIntervalJitter)) + lease, ok := c.latestLease.Load(name) + if !ok || lease == nil || + lease.Spec.HolderIdentity == nil || + lease.Spec.LeaseDurationSeconds == nil || + lease.Spec.RenewTime == nil { + return next + } + return nextTryTime(lease, c.holderIdentity, next) +} + +// TryHold tries to hold a lease for the NodeLeaseController +func (c *NodeLeaseController) TryHold(name string) { + // trigger a sync immediately + c.leaseChan <- name + + // add a cron job to sync the lease periodically + cancel, ok := c.cronjob.AddWithCancel( + func(now time.Time) (time.Time, bool) { + return c.nextTryTime(name, now), true + }, + func() { + c.leaseChan <- name + }, + ) + if ok { + old, ok := c.cancelJob.LoadOrStore(name, cancel) + if ok { + old() + } + } +} + +// remove removes a lease from the NodeLeaseController +func (c *NodeLeaseController) remove(name string) { + cancel, ok := c.cancelJob.LoadAndDelete(name) + if ok { + cancel() + c.latestLease.Delete(name) + } +} + +// Held returns true if the NodeLeaseController holds the lease +func (c *NodeLeaseController) Held(name string) bool { + lease, ok := c.latestLease.Load(name) + if !ok || lease == nil || lease.Spec.HolderIdentity == nil { + return false + } + + return *lease.Spec.HolderIdentity == c.holderIdentity +} + +// sync syncs a lease for a node +func (c *NodeLeaseController) sync(ctx context.Context, nodeName string) { + logger := log.FromContext(ctx) + logger = logger.With("node", nodeName) + + latestLease, ok := c.latestLease.Load(nodeName) + if ok && latestLease != nil { + if !tryAcquireOrRenew(latestLease, c.holderIdentity, c.clock.Now()) { + logger.Debug("Lease already acquired by another holder") + return + } + logger.Info("Syncing lease") + lease, transitions, err := c.renewLease(ctx, latestLease) + if err != nil { + logger.Error("failed to update lease using latest lease", err) + return + } + c.latestLease.Store(nodeName, lease) + if transitions { + logger.Debug("Lease transitioned", + "transitions", transitions, + ) + if c.onNodeManagedFunc != nil { + if c.Held(nodeName) { + c.onNodeManagedFunc(nodeName) + } else { + logger.Warn("Lease not held") + } + } + } + } else { + logger.Info("Creating lease") + latestLease, err := c.ensureLease(ctx, nodeName) + if err != nil { + logger.Error("failed to create lease", err) + return + } + + c.latestLease.Store(nodeName, latestLease) + if c.onNodeManagedFunc != nil { + if c.Held(nodeName) { + c.onNodeManagedFunc(nodeName) + } else { + logger.Warn("Lease not held") + } + } + } +} + +// ensureLease creates a lease if it does not exist +func (c *NodeLeaseController) ensureLease(ctx context.Context, leaseName string) (*coordinationv1.Lease, error) { + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: c.leaseNamespace, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &c.holderIdentity, + LeaseDurationSeconds: format.Ptr(int32(c.leaseDurationSeconds)), + RenewTime: format.Ptr(metav1.NewMicroTime(c.clock.Now())), + }, + } + if c.mutateLeaseFunc != nil { + err := c.mutateLeaseFunc(lease) + if err != nil { + return nil, err + } + } + + lease, err := c.clientSet.CoordinationV1().Leases(c.leaseNamespace).Create(ctx, lease, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + return lease, nil +} + +// renewLease attempts to update the lease for maxUpdateRetries, call this once you're sure the lease has been created +func (c *NodeLeaseController) renewLease(ctx context.Context, base *coordinationv1.Lease) (*coordinationv1.Lease, bool, error) { + lease := base.DeepCopy() + + transitions := format.ElemOrDefault(lease.Spec.HolderIdentity) != c.holderIdentity + if transitions { + lease.Spec.HolderIdentity = &c.holderIdentity + lease.Spec.LeaseDurationSeconds = format.Ptr(int32(c.leaseDurationSeconds)) + lease.Spec.LeaseTransitions = format.Ptr(format.ElemOrDefault(lease.Spec.LeaseTransitions) + 1) + } + lease.Spec.RenewTime = format.Ptr(metav1.NewMicroTime(c.clock.Now())) + + if c.mutateLeaseFunc != nil { + err := c.mutateLeaseFunc(lease) + if err != nil { + return nil, false, err + } + } + + lease, err := c.clientSet.CoordinationV1().Leases(c.leaseNamespace).Update(ctx, lease, metav1.UpdateOptions{}) + if err != nil { + return nil, false, err + } + return lease, transitions, nil +} + +// setNodeOwnerFunc helps construct a mutateLeaseFunc which sets a node OwnerReference to the given lease object +// https://github.com/kubernetes/kubernetes/blob/1f22a173d9538e01c92529d02e4c95f77f5ea823/pkg/kubelet/util/nodelease.go#L32 +func setNodeOwnerFunc(nodeOwnerFunc func(nodeName string) []metav1.OwnerReference) func(lease *coordinationv1.Lease) error { + return func(lease *coordinationv1.Lease) error { + // Setting owner reference needs node's UID. Note that it is different from + // kubelet.nodeRef.UID. When lease is initially created, it is possible that + // the connection between master and node is not ready yet. So try to set + // owner reference every time when renewing the lease, until successful. + if len(lease.OwnerReferences) == 0 { + lease.OwnerReferences = nodeOwnerFunc(lease.Name) + } + return nil + } +} + +// tryAcquireOrRenew returns true if the lease is held by the given holderIdentity, +func tryAcquireOrRenew(lease *coordinationv1.Lease, holderIdentity string, now time.Time) bool { + if lease.Spec.HolderIdentity == nil || + *lease.Spec.HolderIdentity == holderIdentity { + return true + } + + if lease.Spec.RenewTime == nil || + lease.Spec.LeaseDurationSeconds == nil { + return true + } + + expireTime := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second) + return expireTime.Before(now) +} + +// nextTryTime returns the next time to try to acquire or renew the lease +func nextTryTime(lease *coordinationv1.Lease, holderIdentity string, next time.Time) time.Time { + expireTime := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second) + if *lease.Spec.HolderIdentity == holderIdentity { + if next.Before(expireTime) { + return next + } + } + + return expireTime +} diff --git a/pkg/kwok/controllers/node_lease_controller_test.go b/pkg/kwok/controllers/node_lease_controller_test.go new file mode 100644 index 0000000000..08a77437da --- /dev/null +++ b/pkg/kwok/controllers/node_lease_controller_test.go @@ -0,0 +1,274 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "os" + "reflect" + "testing" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + + "sigs.k8s.io/kwok/pkg/log" + "sigs.k8s.io/kwok/pkg/utils/format" +) + +func TestNodeLeaseController(t *testing.T) { + clientset := fake.NewSimpleClientset( + &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: "lease1", + Namespace: corev1.NamespaceNodeLease, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("lease1"), + RenewTime: format.Ptr(metav1.NewMicroTime(time.Now().Add(-61 * time.Second))), + LeaseDurationSeconds: format.Ptr(int32(60)), + }, + }, + &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: "lease2", + Namespace: corev1.NamespaceNodeLease, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("lease2"), + RenewTime: format.Ptr(metav1.NewMicroTime(time.Now())), + LeaseDurationSeconds: format.Ptr(int32(60)), + }, + }, + &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: "lease3", + Namespace: corev1.NamespaceNodeLease, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("lease3"), + RenewTime: format.Ptr(metav1.NewMicroTime(time.Now().Add(-61 * time.Second))), + LeaseDurationSeconds: format.Ptr(int32(60)), + }, + }, + ) + + nodeLeases, err := NewNodeLeaseController(NodeLeaseControllerConfig{ + ClientSet: clientset, + HolderIdentity: "test", + LeaseDurationSeconds: 40, + LeaseParallelism: 2, + RenewInterval: 10 * time.Second, + RenewIntervalJitter: 0.04, + LeaseNamespace: corev1.NamespaceNodeLease, + }) + if err != nil { + t.Fatal(fmt.Errorf("new node leases controller error: %w", err)) + } + + ctx := context.Background() + ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) + ctx, cancel := context.WithTimeout(ctx, 20*time.Second) + t.Cleanup(func() { + cancel() + time.Sleep(time.Second) + }) + err = nodeLeases.Start(ctx) + if err != nil { + t.Fatal(fmt.Errorf("start node leases controller error: %w", err)) + } + + nodeLeases.TryHold("lease0") + nodeLeases.TryHold("lease1") + nodeLeases.TryHold("lease2") + + time.Sleep(2 * time.Second) + + if !nodeLeases.Held("lease0") { + t.Fatal("lease0 not held") + } + + if !nodeLeases.Held("lease1") { + t.Fatal("lease1 not held") + } + + if nodeLeases.Held("lease2") { + t.Fatal("lease2 held") + } + + if nodeLeases.Held("lease3") { + t.Fatal("lease3 held") + } + + if nodeLeases.Held("lease4") { + t.Fatal("lease4 held") + } + + _ = clientset.CoordinationV1().Leases(corev1.NamespaceNodeLease).Delete(ctx, "lease1", metav1.DeleteOptions{}) + time.Sleep(2 * time.Second) + + if !nodeLeases.Held("lease0") { + t.Fatal("lease0 not held") + } + + if nodeLeases.Held("lease1") { + t.Fatal("lease1 held") + } + + if nodeLeases.Held("lease3") { + t.Fatal("lease3 held") + } + + if nodeLeases.Held("lease4") { + t.Fatal("lease4 held") + } +} + +func Test_tryAcquireOrRenew(t *testing.T) { + now := time.Now() + type args struct { + lease *coordinationv1.Lease + holderIdentity string + now time.Time + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "holder self", + args: args{ + lease: &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("test"), + }, + }, + holderIdentity: "test", + now: now, + }, + want: true, + }, + { + name: "holder not self", + args: args{ + lease: &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("test"), + LeaseDurationSeconds: format.Ptr(int32(10)), + RenewTime: format.Ptr(metav1.NewMicroTime(now.Add(-5 * time.Second))), + }, + }, + holderIdentity: "test-new", + now: now, + }, + want: false, + }, + { + name: "holder not self but expired", + args: args{ + lease: &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("test"), + LeaseDurationSeconds: format.Ptr(int32(10)), + RenewTime: format.Ptr(metav1.NewMicroTime(now.Add(-11 * time.Second))), + }, + }, + holderIdentity: "test-new", + now: now, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tryAcquireOrRenew(tt.args.lease, tt.args.holderIdentity, tt.args.now); got != tt.want { + t.Errorf("tryAcquireOrRenew() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_nextTryTime(t *testing.T) { + now := time.Now() + type args struct { + lease *coordinationv1.Lease + holderIdentity string + next time.Time + } + tests := []struct { + name string + args args + want time.Time + }{ + { + name: "holder self and not expired", + args: args{ + lease: &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("test"), + LeaseDurationSeconds: format.Ptr(int32(40)), + RenewTime: format.Ptr(metav1.NewMicroTime(now)), + }, + }, + holderIdentity: "test", + next: now.Add(39 * time.Second), + }, + want: now.Add(39 * time.Second), + }, + { + name: "holder self and expired", + args: args{ + lease: &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("test"), + LeaseDurationSeconds: format.Ptr(int32(40)), + RenewTime: format.Ptr(metav1.NewMicroTime(now)), + }, + }, + holderIdentity: "test", + next: now.Add(41 * time.Second), + }, + want: now.Add(40 * time.Second), + }, + { + name: "holder not self and not expired", + args: args{ + lease: &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: format.Ptr("test"), + LeaseDurationSeconds: format.Ptr(int32(40)), + RenewTime: format.Ptr(metav1.NewMicroTime(now)), + }, + }, + holderIdentity: "test-new", + next: now.Add(39 * time.Second), + }, + want: now.Add(40 * time.Second), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := nextTryTime(tt.args.lease, tt.args.holderIdentity, tt.args.next); !reflect.DeepEqual(got, tt.want) { + t.Errorf("nextTryTime() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index 20659e64a2..5c8d5d1149 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/pager" "k8s.io/client-go/tools/record" + "k8s.io/utils/clock" "sigs.k8s.io/kwok/pkg/apis/internalversion" "sigs.k8s.io/kwok/pkg/kwok/cni" @@ -52,12 +53,14 @@ var ( // PodController is a fake pods implementation that can be used to test type PodController struct { + clock clock.Clock enableCNI bool clientSet kubernetes.Interface disregardStatusWithAnnotationSelector labels.Selector disregardStatusWithLabelSelector labels.Selector nodeIP string defaultCIDR string + namespace string nodeGetFunc func(nodeName string) (*NodeInfo, bool) ipPools maps.SyncMap[string, *ipPool] renderer *renderer @@ -68,21 +71,26 @@ type PodController struct { cronjob *cron.Cron delayJobs jobInfoMap recorder record.EventRecorder + readOnlyFunc func(nodeName string) bool + triggerPreprocessChan chan string } // PodControllerConfig is the configuration for the PodController type PodControllerConfig struct { + Clock clock.Clock EnableCNI bool ClientSet kubernetes.Interface DisregardStatusWithAnnotationSelector string DisregardStatusWithLabelSelector string NodeIP string CIDR string + Namespace string NodeGetFunc func(nodeName string) (*NodeInfo, bool) Stages []*internalversion.Stage PlayStageParallelism uint FuncMap template.FuncMap Recorder record.EventRecorder + ReadOnlyFunc func(nodeName string) bool } // NewPodController creates a new fake pods controller @@ -106,20 +114,28 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { return nil, err } + if conf.Clock == nil { + conf.Clock = clock.RealClock{} + } + c := &PodController{ + clock: conf.Clock, enableCNI: conf.EnableCNI, clientSet: conf.ClientSet, disregardStatusWithAnnotationSelector: disregardStatusWithAnnotationSelector, disregardStatusWithLabelSelector: disregardStatusWithLabelSelector, nodeIP: conf.NodeIP, defaultCIDR: conf.CIDR, + namespace: conf.Namespace, nodeGetFunc: conf.NodeGetFunc, cronjob: cron.NewCron(), lifecycle: lifecycles, playStageParallelism: conf.PlayStageParallelism, preprocessChan: make(chan *corev1.Pod), + triggerPreprocessChan: make(chan string, 16), playStageChan: make(chan resourceStageJob[*corev1.Pod]), recorder: conf.Recorder, + readOnlyFunc: conf.ReadOnlyFunc, } funcMap := template.FuncMap{ "NodeIP": c.funcNodeIP, @@ -138,6 +154,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { // It will modify the pods status to we want func (c *PodController) Start(ctx context.Context) error { go c.preprocessWorker(ctx) + go c.triggerPreprocessWorker(ctx) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } @@ -176,6 +193,7 @@ func (c *PodController) finalizersModify(ctx context.Context, pod *corev1.Pod, f "pod", log.KObj(pod), "node", pod.Spec.NodeName, ) + result, err := c.clientSet.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.JSONPatchType, data, metav1.PatchOptions{}) if err != nil { if apierrors.IsNotFound(err) { @@ -197,6 +215,7 @@ func (c *PodController) deleteResource(ctx context.Context, pod *corev1.Pod) err "pod", log.KObj(pod), "node", pod.Spec.NodeName, ) + err := c.clientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, deleteOpt) if err != nil { if apierrors.IsNotFound(err) { @@ -226,6 +245,21 @@ func (c *PodController) preprocessWorker(ctx context.Context) { } } +// triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it +func (c *PodController) triggerPreprocessWorker(ctx context.Context) { + logger := log.FromContext(ctx) + for nodeName := range c.triggerPreprocessChan { + err := c.listResources(ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), + }) + if err != nil { + logger.Error("Failed to preprocess node", err, + "node", nodeName, + ) + } + } +} + // preprocess the pod and send it to the playStageWorker func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { key := log.KObj(pod).String() @@ -256,7 +290,8 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { ) return nil } - now := time.Now() + + now := c.clock.Now() delay, _ := stage.Delay(ctx, data, now) if delay != 0 { @@ -301,6 +336,12 @@ func (c *PodController) playStageWorker(ctx context.Context) { func (c *PodController) playStage(ctx context.Context, pod *corev1.Pod, stage *LifecycleStage) { next := stage.Next() logger := log.FromContext(ctx) + logger = logger.With( + "pod", log.KObj(pod), + "node", pod.Spec.NodeName, + "stage", stage.Name(), + ) + if next.Event != nil && c.recorder != nil { c.recorder.Event(&corev1.ObjectReference{ Kind: "Pod", @@ -345,6 +386,13 @@ func (c *PodController) playStage(ctx context.Context, pod *corev1.Pod, stage *L } } +func (c *PodController) readOnly(nodeName string) bool { + if c.readOnlyFunc == nil { + return false + } + return c.readOnlyFunc(nodeName) +} + // patchResource patches the resource func (c *PodController) patchResource(ctx context.Context, pod *corev1.Pod, patch []byte) (*corev1.Pod, error) { logger := log.FromContext(ctx) @@ -352,6 +400,7 @@ func (c *PodController) patchResource(ctx context.Context, pod *corev1.Pod, patc "pod", log.KObj(pod), "node", pod.Spec.NodeName, ) + result, err := c.clientSet.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status") if err != nil { if apierrors.IsNotFound(err) { @@ -387,7 +436,7 @@ func (c *PodController) need(pod *corev1.Pod) bool { // watchResources watch resources and send to preprocessChan func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptions) error { - watcher, err := c.clientSet.CoreV1().Pods(corev1.NamespaceAll).Watch(ctx, opt) + watcher, err := c.clientSet.CoreV1().Pods(c.namespace).Watch(ctx, opt) if err != nil { return err } @@ -401,7 +450,7 @@ func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptio case event, ok := <-rc: if !ok { for { - watcher, err := c.clientSet.CoreV1().Pods(corev1.NamespaceAll).Watch(ctx, opt) + watcher, err := c.clientSet.CoreV1().Pods(c.namespace).Watch(ctx, opt) if err == nil { rc = watcher.ResultChan() continue loop @@ -411,7 +460,7 @@ func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptio select { case <-ctx.Done(): break loop - case <-time.After(time.Second * 5): + case <-c.clock.After(time.Second * 5): } } } @@ -420,7 +469,16 @@ func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptio case watch.Added, watch.Modified: pod := event.Object.(*corev1.Pod) if c.need(pod) { - c.preprocessChan <- pod.DeepCopy() + if c.readOnly(pod.Spec.NodeName) { + logger.Debug("Skip pod", + "reason", "read only", + "event", event.Type, + "pod", log.KObj(pod), + "node", pod.Spec.NodeName, + ) + } else { + c.preprocessChan <- pod.DeepCopy() + } } else { logger.Debug("Skip pod", "reason", "not managed", @@ -458,22 +516,31 @@ func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptio // listResources lists all resources and sends to preprocessChan func (c *PodController) listResources(ctx context.Context, opt metav1.ListOptions) error { listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return c.clientSet.CoreV1().Pods(corev1.NamespaceAll).List(ctx, opts) + return c.clientSet.CoreV1().Pods(c.namespace).List(ctx, opts) }) + + logger := log.FromContext(ctx) + return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { pod := obj.(*corev1.Pod) if c.need(pod) { - c.preprocessChan <- pod.DeepCopy() + if c.readOnly(pod.Spec.NodeName) { + logger.Debug("Skip pod", + "pod", log.KObj(pod), + "node", pod.Spec.NodeName, + "reason", "read only", + ) + } else { + c.preprocessChan <- pod.DeepCopy() + } } return nil }) } // PlayStagePodsOnNode plays stage pods on node -func (c *PodController) PlayStagePodsOnNode(ctx context.Context, nodeName string) error { - return c.listResources(ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), - }) +func (c *PodController) PlayStagePodsOnNode(nodeName string) { + c.triggerPreprocessChan <- nodeName } // ipPool returns the ipPool for the given cidr diff --git a/pkg/kwokctl/components/kwok_controller.go b/pkg/kwokctl/components/kwok_controller.go index 42add92ead..977f8bb278 100644 --- a/pkg/kwokctl/components/kwok_controller.go +++ b/pkg/kwokctl/components/kwok_controller.go @@ -25,20 +25,21 @@ import ( // BuildKwokControllerComponentConfig is the configuration for building a kwok controller component. type BuildKwokControllerComponentConfig struct { - Binary string - Image string - Version version.Version - Workdir string - BindAddress string - Port uint32 - ConfigPath string - KubeconfigPath string - AdminCertPath string - AdminKeyPath string - NodeName string - Verbosity log.Level - ExtraArgs []internalversion.ExtraArgs - ExtraVolumes []internalversion.Volume + Binary string + Image string + Version version.Version + Workdir string + BindAddress string + Port uint32 + ConfigPath string + KubeconfigPath string + AdminCertPath string + AdminKeyPath string + NodeName string + Verbosity log.Level + NodeLeaseDurationSeconds uint + ExtraArgs []internalversion.ExtraArgs + ExtraVolumes []internalversion.Volume } // BuildKwokControllerComponent builds a kwok controller component. @@ -93,6 +94,7 @@ func BuildKwokControllerComponent(conf BuildKwokControllerComponentConfig) (comp "--node-name="+conf.NodeName, "--node-port=10247", "--server-address="+conf.BindAddress+":10247", + "--node-lease-duration-seconds="+format.String(conf.NodeLeaseDurationSeconds), ) } else { kwokControllerArgs = append(kwokControllerArgs, @@ -103,6 +105,7 @@ func BuildKwokControllerComponent(conf BuildKwokControllerComponentConfig) (comp "--node-name=localhost", "--node-port="+format.String(conf.Port), "--server-address="+conf.BindAddress+":"+format.String(conf.Port), + "--node-lease-duration-seconds="+format.String(conf.NodeLeaseDurationSeconds), ) } diff --git a/pkg/kwokctl/runtime/binary/cluster.go b/pkg/kwokctl/runtime/binary/cluster.go index 77df8e428b..d657464d58 100644 --- a/pkg/kwokctl/runtime/binary/cluster.go +++ b/pkg/kwokctl/runtime/binary/cluster.go @@ -386,18 +386,19 @@ func (c *Cluster) Install(ctx context.Context) error { kwokControllerComponentPatches := runtime.GetComponentPatches(config, "kwok-controller") kwokControllerComponent := components.BuildKwokControllerComponent(components.BuildKwokControllerComponentConfig{ - Workdir: workdir, - Binary: kwokControllerPath, - Version: kwokControllerVersion, - BindAddress: conf.BindAddress, - Port: conf.KwokControllerPort, - ConfigPath: kwokConfigPath, - KubeconfigPath: kubeconfigPath, - AdminCertPath: adminCertPath, - AdminKeyPath: adminKeyPath, - NodeName: "localhost", - Verbosity: verbosity, - ExtraArgs: kwokControllerComponentPatches.ExtraArgs, + Workdir: workdir, + Binary: kwokControllerPath, + Version: kwokControllerVersion, + BindAddress: conf.BindAddress, + Port: conf.KwokControllerPort, + ConfigPath: kwokConfigPath, + KubeconfigPath: kubeconfigPath, + AdminCertPath: adminCertPath, + AdminKeyPath: adminKeyPath, + NodeName: "localhost", + Verbosity: verbosity, + NodeLeaseDurationSeconds: conf.NodeLeaseDurationSeconds, + ExtraArgs: kwokControllerComponentPatches.ExtraArgs, }) if err != nil { return err diff --git a/pkg/kwokctl/runtime/cluster.go b/pkg/kwokctl/runtime/cluster.go index 87914e5aaf..c19bed5dac 100644 --- a/pkg/kwokctl/runtime/cluster.go +++ b/pkg/kwokctl/runtime/cluster.go @@ -128,25 +128,35 @@ func (c *Cluster) Save(ctx context.Context) error { others := config.FilterWithoutTypeFromContext[*internalversion.KwokctlConfiguration](ctx) objs = append(objs, others...) - if updateFrequency := c.conf.Options.NodeStatusUpdateFrequencyMilliseconds; updateFrequency > 0 && - c.conf.Options.Runtime != consts.RuntimeTypeKind && - c.conf.Options.Runtime != consts.RuntimeTypeKindPodman && - len(config.FilterWithTypeFromContext[*internalversion.Stage](ctx)) == 0 { - nodeStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages)) - if err != nil { - return err - } - hasUpdate := false - for _, stage := range nodeStages { - if stage.Name == "node-heartbeat" { - stage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency) - stage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10) - hasUpdate = true + if c.conf.Options.NodeLeaseDurationSeconds == 0 { + if updateFrequency := c.conf.Options.NodeStatusUpdateFrequencyMilliseconds; updateFrequency > 0 && + c.conf.Options.Runtime != consts.RuntimeTypeKind && + c.conf.Options.Runtime != consts.RuntimeTypeKindPodman && + len(config.FilterWithTypeFromContext[*internalversion.Stage](ctx)) == 0 { + nodeStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages)) + if err != nil { + return err + } + for _, stage := range nodeStages { + objs = append(objs, stage) + } + + nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages)) + if err != nil { + return err + } + hasUpdate := false + for _, stage := range nodeHeartbeatStages { + if stage.Name == "node-heartbeat" { + stage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency) + stage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10) + hasUpdate = true + } + objs = append(objs, stage) + } + if !hasUpdate { + return fmt.Errorf("failed to update node heartbeat stage") } - objs = append(objs, stage) - } - if !hasUpdate { - return fmt.Errorf("failed to update node heartbeat stage") } } diff --git a/pkg/kwokctl/runtime/compose/cluster.go b/pkg/kwokctl/runtime/compose/cluster.go index 0a56736213..901d6cbb97 100644 --- a/pkg/kwokctl/runtime/compose/cluster.go +++ b/pkg/kwokctl/runtime/compose/cluster.go @@ -366,19 +366,20 @@ func (c *Cluster) Install(ctx context.Context) error { kwokControllerExtraVolumes = append(kwokControllerExtraVolumes, logVolumes...) kwokControllerComponent := components.BuildKwokControllerComponent(components.BuildKwokControllerComponentConfig{ - Workdir: workdir, - Image: conf.KwokControllerImage, - Version: kwokControllerVersion, - BindAddress: net.PublicAddress, - Port: conf.KwokControllerPort, - ConfigPath: kwokConfigPath, - KubeconfigPath: inClusterOnHostKubeconfigPath, - AdminCertPath: adminCertPath, - AdminKeyPath: adminKeyPath, - NodeName: c.Name() + "-kwok-controller", - Verbosity: verbosity, - ExtraArgs: kwokControllerComponentPatches.ExtraArgs, - ExtraVolumes: kwokControllerExtraVolumes, + Workdir: workdir, + Image: conf.KwokControllerImage, + Version: kwokControllerVersion, + BindAddress: net.PublicAddress, + Port: conf.KwokControllerPort, + ConfigPath: kwokConfigPath, + KubeconfigPath: inClusterOnHostKubeconfigPath, + AdminCertPath: adminCertPath, + AdminKeyPath: adminKeyPath, + NodeName: c.Name() + "-kwok-controller", + Verbosity: verbosity, + NodeLeaseDurationSeconds: conf.NodeLeaseDurationSeconds, + ExtraArgs: kwokControllerComponentPatches.ExtraArgs, + ExtraVolumes: kwokControllerExtraVolumes, }) config.Components = append(config.Components, kwokControllerComponent) diff --git a/pkg/kwokctl/runtime/kind/cluster.go b/pkg/kwokctl/runtime/kind/cluster.go index 2f5e6d1c73..ae14ea6273 100644 --- a/pkg/kwokctl/runtime/kind/cluster.go +++ b/pkg/kwokctl/runtime/kind/cluster.go @@ -172,11 +172,12 @@ func (c *Cluster) Install(ctx context.Context) error { } kwokControllerPod, err := BuildKwokControllerPod(BuildKwokControllerPodConfig{ - KwokControllerImage: conf.KwokControllerImage, - Name: c.Name(), - Verbosity: verbosity, - ExtraArgs: kwokControllerComponentPatches.ExtraArgs, - ExtraVolumes: kwokControllerExtraVolumes, + KwokControllerImage: conf.KwokControllerImage, + Name: c.Name(), + Verbosity: verbosity, + NodeLeaseDurationSeconds: 40, + ExtraArgs: kwokControllerComponentPatches.ExtraArgs, + ExtraVolumes: kwokControllerExtraVolumes, }) if err != nil { return err diff --git a/pkg/kwokctl/runtime/kind/kwok_controller_pod.go b/pkg/kwokctl/runtime/kind/kwok_controller_pod.go index 11ec2f7492..24604686f0 100644 --- a/pkg/kwokctl/runtime/kind/kwok_controller_pod.go +++ b/pkg/kwokctl/runtime/kind/kwok_controller_pod.go @@ -56,11 +56,12 @@ func BuildKwokControllerPod(conf BuildKwokControllerPodConfig) (string, error) { // BuildKwokControllerPodConfig is the configuration for building the kwok controller pod type BuildKwokControllerPodConfig struct { - KwokControllerImage string - KwokControllerImageName string - KwokControllerImageTag string - Name string - Verbosity log.Level - ExtraArgs []internalversion.ExtraArgs - ExtraVolumes []internalversion.Volume + KwokControllerImage string + KwokControllerImageName string + KwokControllerImageTag string + Name string + Verbosity log.Level + NodeLeaseDurationSeconds uint + ExtraArgs []internalversion.ExtraArgs + ExtraVolumes []internalversion.Volume } diff --git a/pkg/kwokctl/runtime/kind/kwok_controller_pod.yaml.tpl b/pkg/kwokctl/runtime/kind/kwok_controller_pod.yaml.tpl index a47492224c..edc2c9d68c 100644 --- a/pkg/kwokctl/runtime/kind/kwok_controller_pod.yaml.tpl +++ b/pkg/kwokctl/runtime/kind/kwok_controller_pod.yaml.tpl @@ -21,6 +21,7 @@ spec: - --node-ip=$(POD_IP) - --node-name=kwok-controller.kube-system.svc - --node-port=10247 + - --node-lease-duration-seconds={{ .NodeLeaseDurationSeconds }} {{ range .ExtraArgs }} - --{{ .Key }}={{ .Value }} {{ end }} diff --git a/pkg/utils/format/pointer.go b/pkg/utils/format/pointer.go index d81207bea1..111ef9a936 100644 --- a/pkg/utils/format/pointer.go +++ b/pkg/utils/format/pointer.go @@ -20,3 +20,11 @@ package format func Ptr[T any](v T) *T { return &v } + +// ElemOrDefault returns the element of v or default. +func ElemOrDefault[T any](v *T) (t T) { + if v == nil { + return t + } + return *v +} diff --git a/pkg/utils/wait/wait.go b/pkg/utils/wait/wait.go index b9ef4bdc4e..80fe1c8d50 100644 --- a/pkg/utils/wait/wait.go +++ b/pkg/utils/wait/wait.go @@ -120,3 +120,8 @@ func Poll(ctx context.Context, conditionFunc ConditionWithContextFunc, opts ...O } return wait.PollUntilContextTimeout(ctx, options.Interval, options.Timeout, options.Immediate, cf) } + +// Jitter returns a time.Duration between duration and duration + maxFactor * duration. +func Jitter(duration time.Duration, maxFactor float64) time.Duration { + return wait.Jitter(duration, maxFactor) +} diff --git a/site/content/en/docs/generated/kwok.md b/site/content/en/docs/generated/kwok.md index 47d91ff28f..6b4d543838 100644 --- a/site/content/en/docs/generated/kwok.md +++ b/site/content/en/docs/generated/kwok.md @@ -21,6 +21,7 @@ kwok [flags] --manage-nodes-with-label-selector string Nodes that match the label selector will be watched and managed. It's conflicted with manage-all-nodes. --master string The address of the Kubernetes API server (overrides any value in kubeconfig). --node-ip string IP of the node + --node-lease-duration-seconds uint Duration of node lease seconds --node-name string Name of the node --node-port int Port of the node --server-address string Address to expose the server on diff --git a/stages/embed.go b/stages/embed.go index 9a13eaec68..e076127aed 100644 --- a/stages/embed.go +++ b/stages/embed.go @@ -26,6 +26,10 @@ var ( //go:embed node-fast.yaml DefaultNodeStages string + // DefaultNodeHeartbeatStages is the default node heartbeat stages. + //go:embed node-heartbeat.yaml + DefaultNodeHeartbeatStages string + // DefaultPodStages is the default pod stages. //go:embed pod-fast.yaml DefaultPodStages string diff --git a/stages/node-fast.yaml b/stages/node-fast.yaml index a1bed4a6a0..8f8a7042d7 100644 --- a/stages/node-fast.yaml +++ b/stages/node-fast.yaml @@ -77,39 +77,3 @@ spec: {{ end }} phase: Running immediateNextStage: true ---- -kind: Stage -apiVersion: kwok.x-k8s.io/v1alpha1 -metadata: - name: node-heartbeat -spec: - resourceRef: - apiGroup: v1 - kind: Node - selector: - matchExpressions: - - key: '.status.phase' - operator: 'In' - values: - - Running - - key: '.status.conditions.[] | select( .type == "Ready" ) | .status' - operator: 'In' - values: - - 'True' - delay: - durationMilliseconds: 20000 - jitterDurationMilliseconds: 25000 - next: - statusTemplate: | - {{ $now := Now }} - {{ $lastTransitionTime := or .creationTimestamp $now }} - conditions: - {{ range NodeConditions }} - - lastHeartbeatTime: "{{ $now }}" - lastTransitionTime: "{{ $lastTransitionTime }}" - message: "{{ .message }}" - reason: "{{ .reason }}" - status: "{{ .status }}" - type: "{{ .type }}" - {{ end }} - immediateNextStage: true diff --git a/stages/node-heartbeat.yaml b/stages/node-heartbeat.yaml new file mode 100644 index 0000000000..dd1768797a --- /dev/null +++ b/stages/node-heartbeat.yaml @@ -0,0 +1,35 @@ +kind: Stage +apiVersion: kwok.x-k8s.io/v1alpha1 +metadata: + name: node-heartbeat +spec: + resourceRef: + apiGroup: v1 + kind: Node + selector: + matchExpressions: + - key: '.status.phase' + operator: 'In' + values: + - Running + - key: '.status.conditions.[] | select( .type == "Ready" ) | .status' + operator: 'In' + values: + - 'True' + delay: + durationMilliseconds: 20000 + jitterDurationMilliseconds: 25000 + next: + statusTemplate: | + {{ $now := Now }} + {{ $lastTransitionTime := or .creationTimestamp $now }} + conditions: + {{ range NodeConditions }} + - lastHeartbeatTime: "{{ $now }}" + lastTransitionTime: "{{ $lastTransitionTime }}" + message: "{{ .message }}" + reason: "{{ .reason }}" + status: "{{ .status }}" + type: "{{ .type }}" + {{ end }} + immediateNextStage: true diff --git a/test/kwok/kustomization.yaml b/test/kwok/kustomization.yaml index 472cca97ce..ff83080f29 100644 --- a/test/kwok/kustomization.yaml +++ b/test/kwok/kustomization.yaml @@ -6,6 +6,15 @@ images: newName: kwok newTag: test +patchesStrategicMerge: + - |- + apiVersion: apps/v1 + kind: Deployment + metadata: + name: kwok-controller + spec: + replicas: 2 + resources: - ../../kustomize/kwok - fake-deployment.yaml diff --git a/test/kwok/kwok.test.sh b/test/kwok/kwok.test.sh index 9fdec50f03..a832e52715 100755 --- a/test/kwok/kwok.test.sh +++ b/test/kwok/kwok.test.sh @@ -110,6 +110,7 @@ function test_modify_node_status() { kubectl get node fake-node return 1 fi + kubectl annotate node fake-node --overwrite kwok.x-k8s.io/status- } # Check for the status of the Pod is modified by Kubectl @@ -127,6 +128,29 @@ function test_modify_pod_status() { kubectl get pod "${first_pod}" -o wide return 1 fi + + kubectl annotate pod "${first_pod}" --overwrite kwok.x-k8s.io/status- +} + +function test_check_node_lease_transitions() { + local want="${1}" + local node_leases_transitions + node_leases_transitions="$(kubectl get leases fake-node -n kube-node-lease -ojson | jq -r '.spec.leaseTransitions // 0')" + if [[ "${node_leases_transitions}" != "${want}" ]]; then + echo "Error: fake-node lease transitions is not ${want}, got ${node_leases_transitions}" + return 1 + fi +} + +function recreate_kwok() { + kubectl scale deployment/kwok-controller -n kube-system --replicas=0 + kubectl wait --for=delete pod -l app=kwok-controller -n kube-system --timeout=60s + + kubectl scale deployment/kwok-controller -n kube-system --replicas=2 +} + +function recreate_pods() { + kubectl delete pod --all -n default } # Cleanup @@ -148,6 +172,15 @@ function main() { test_modify_node_status || failed+=("modify_node_status") test_modify_pod_status || failed+=("modify_pod_status") + test_check_node_lease_transitions 0 || failed+=("check_node_lease_transitions") + + recreate_kwok || failed+=("recreate_kwok") + recreate_pods || failed+=("recreate_pods") + test_node_ready || failed+=("node_ready_again") + test_pod_running || failed+=("pod_running_again") + test_check_pod_status || failed+=("check_pod_status_again") + + test_check_node_lease_transitions 1 || failed+=("check_node_lease_transitions_again") if [[ "${#failed[@]}" -ne 0 ]]; then echo "Error: Some tests failed"