diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index 7dcbbe21e..781964249 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -409,12 +409,18 @@ func (c *NodeController) deleteResource(ctx context.Context, node *corev1.Node) // preprocessWorker receives the resource from the preprocessChan and preprocess it func (c *NodeController) preprocessWorker(ctx context.Context) { logger := log.FromContext(ctx) - for node := range c.preprocessChan { - err := c.preprocess(ctx, node) - if err != nil { - logger.Error("Failed to preprocess node", err, - "node", node.Name, - ) + for { + select { + case <-ctx.Done(): + logger.Debug("Stop preprocess worker") + return + case node := <-c.preprocessChan: + err := c.preprocess(ctx, node) + if err != nil { + logger.Error("Failed to preprocess node", err, + "node", node.Name, + ) + } } } } @@ -422,21 +428,27 @@ func (c *NodeController) preprocessWorker(ctx context.Context) { // triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it func (c *NodeController) triggerPreprocessWorker(ctx context.Context) { logger := log.FromContext(ctx) - for nodeName := range c.triggerPreprocessChan { - nodeInfo, has := c.nodesSets.Load(nodeName) - if !has || nodeInfo.Node == nil { - logger.Warn("Node not found", - "node", nodeName, - ) - continue - } - if c.readOnly(nodeInfo.Node.Name) { - logger.Debug("Skip node", - "node", nodeInfo.Node.Name, - "reason", "read only", - ) - } else { - c.preprocessChan <- nodeInfo.Node + for { + select { + case <-ctx.Done(): + logger.Debug("Stop trigger preprocess worker") + return + case nodeName := <-c.triggerPreprocessChan: + nodeInfo, has := c.nodesSets.Load(nodeName) + if !has || nodeInfo.Node == nil { + logger.Warn("Node not found", + "node", nodeName, + ) + continue + } + if c.readOnly(nodeInfo.Node.Name) { + logger.Debug("Skip node", + "node", nodeInfo.Node.Name, + "reason", "read only", + ) + } else { + c.preprocessChan <- nodeInfo.Node + } } } } @@ -506,8 +518,15 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro // playStageWorker receives the resource from the playStageChan and play the stage func (c *NodeController) playStageWorker(ctx context.Context) { - for node := range c.playStageChan { - c.playStage(ctx, node.Resource, node.Stage) + logger := log.FromContext(ctx) + for { + select { + case <-ctx.Done(): + logger.Debug("Stop play stage worker") + return + case node := <-c.playStageChan: + c.playStage(ctx, node.Resource, node.Stage) + } } } diff --git a/pkg/kwok/controllers/node_lease_controller.go b/pkg/kwok/controllers/node_lease_controller.go index 4e7d880f7..84094edb0 100644 --- a/pkg/kwok/controllers/node_lease_controller.go +++ b/pkg/kwok/controllers/node_lease_controller.go @@ -188,8 +188,15 @@ func (c *NodeLeaseController) listResources(ctx context.Context, opt metav1.List } func (c *NodeLeaseController) syncWorker(ctx context.Context) { - for nodeName := range c.leaseChan { - c.sync(ctx, nodeName) + logger := log.FromContext(ctx) + for { + select { + case <-ctx.Done(): + logger.Debug("Stop sync worker") + return + case nodeName := <-c.leaseChan: + c.sync(ctx, nodeName) + } } } diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index 5c8d5d114..b147c95f2 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -234,13 +234,19 @@ func (c *PodController) deleteResource(ctx context.Context, pod *corev1.Pod) err // preprocessWorker receives the resource from the preprocessChan and preprocess it func (c *PodController) preprocessWorker(ctx context.Context) { logger := log.FromContext(ctx) - for pod := range c.preprocessChan { - err := c.preprocess(ctx, pod) - if err != nil { - logger.Error("Failed to preprocess node", err, - "pod", log.KObj(pod), - "node", pod.Spec.NodeName, - ) + for { + select { + case <-ctx.Done(): + logger.Debug("Stop preprocess worker") + return + case pod := <-c.preprocessChan: + err := c.preprocess(ctx, pod) + if err != nil { + logger.Error("Failed to preprocess node", err, + "pod", log.KObj(pod), + "node", pod.Spec.NodeName, + ) + } } } } @@ -248,14 +254,20 @@ func (c *PodController) preprocessWorker(ctx context.Context) { // triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it func (c *PodController) triggerPreprocessWorker(ctx context.Context) { logger := log.FromContext(ctx) - for nodeName := range c.triggerPreprocessChan { - err := c.listResources(ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), - }) - if err != nil { - logger.Error("Failed to preprocess node", err, - "node", nodeName, - ) + for { + select { + case <-ctx.Done(): + logger.Debug("Stop trigger preprocess worker") + return + case nodeName := <-c.triggerPreprocessChan: + err := c.listResources(ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), + }) + if err != nil { + logger.Error("Failed to preprocess node", err, + "node", nodeName, + ) + } } } } @@ -327,8 +339,15 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { // playStageWorker receives the resource from the playStageChan and play the stage func (c *PodController) playStageWorker(ctx context.Context) { - for pod := range c.playStageChan { - c.playStage(ctx, pod.Resource, pod.Stage) + logger := log.FromContext(ctx) + for { + select { + case <-ctx.Done(): + logger.Debug("Stop play stage worker") + return + case pod := <-c.playStageChan: + c.playStage(ctx, pod.Resource, pod.Stage) + } } } diff --git a/pkg/kwok/controllers/pod_controller_test.go b/pkg/kwok/controllers/pod_controller_test.go index 8358eeeb5..c3003633d 100644 --- a/pkg/kwok/controllers/pod_controller_test.go +++ b/pkg/kwok/controllers/pod_controller_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "sigs.k8s.io/kwok/pkg/log" + "sigs.k8s.io/kwok/pkg/utils/slices" "sigs.k8s.io/kwok/pkg/utils/wait" "sigs.k8s.io/kwok/stages" ) @@ -298,7 +299,12 @@ func TestPodController(t *testing.T) { t.Fatal(err) } - pod := list.Items[0] + pod, ok := slices.Find(list.Items, func(pod corev1.Pod) bool { + return pod.Name == "pod0" + }) + if !ok { + t.Fatal(fmt.Errorf("not found pod0")) + } now := metav1.Now() pod.DeletionTimestamp = &now _, err = clientset.CoreV1().Pods("default").Update(ctx, &pod, metav1.UpdateOptions{}) @@ -315,7 +321,7 @@ func TestPodController(t *testing.T) { return false, fmt.Errorf("want 4 pods, got %d", len(list.Items)) } return true, nil - }, wait.WithContinueOnError(5)) + }, wait.WithContinueOnError(10)) if err != nil { t.Fatal(err) } diff --git a/pkg/utils/version/version.go b/pkg/utils/version/version.go index 1a69db593..c5122b7d8 100644 --- a/pkg/utils/version/version.go +++ b/pkg/utils/version/version.go @@ -58,7 +58,11 @@ func ParseFromOutput(s string) (Version, error) { if len(matches) == 0 { return semver.Version{}, fmt.Errorf("failed to parse version from output: %q", s) } - return semver.Parse(matches[2]) + v := matches[2] + if strings.HasPrefix(v, "0.0.0") { + return semver.Version{}, nil + } + return semver.Parse(v) } // ParseFromBinary parses the version from the binary. diff --git a/pkg/utils/version/version_test.go b/pkg/utils/version/version_test.go index 99d809bc9..52b77e01d 100644 --- a/pkg/utils/version/version_test.go +++ b/pkg/utils/version/version_test.go @@ -57,6 +57,12 @@ func TestParseFromOutput(t *testing.T) { }, want: semver.MustParse("3.5.6"), }, + { + args: args{ + s: "Kubernetes v0.0.0-master+$Format:%H$", + }, + want: semver.MustParse("0.0.0"), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {