Skip to content

Commit

Permalink
Merge pull request #644 from wzshiming/flake/ut
Browse files Browse the repository at this point in the history
Fix flake unit test
  • Loading branch information
wzshiming authored Jun 12, 2023
2 parents 48fbf26 + 6ca01be commit 25f4d02
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 45 deletions.
65 changes: 42 additions & 23 deletions pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,34 +409,46 @@ func (c *NodeController) deleteResource(ctx context.Context, node *corev1.Node)
// preprocessWorker receives the resource from the preprocessChan and preprocess it
func (c *NodeController) preprocessWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for node := range c.preprocessChan {
err := c.preprocess(ctx, node)
if err != nil {
logger.Error("Failed to preprocess node", err,
"node", node.Name,
)
for {
select {
case <-ctx.Done():
logger.Debug("Stop preprocess worker")
return
case node := <-c.preprocessChan:
err := c.preprocess(ctx, node)
if err != nil {
logger.Error("Failed to preprocess node", err,
"node", node.Name,
)
}
}
}
}

// triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it
func (c *NodeController) triggerPreprocessWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for nodeName := range c.triggerPreprocessChan {
nodeInfo, has := c.nodesSets.Load(nodeName)
if !has || nodeInfo.Node == nil {
logger.Warn("Node not found",
"node", nodeName,
)
continue
}
if c.readOnly(nodeInfo.Node.Name) {
logger.Debug("Skip node",
"node", nodeInfo.Node.Name,
"reason", "read only",
)
} else {
c.preprocessChan <- nodeInfo.Node
for {
select {
case <-ctx.Done():
logger.Debug("Stop trigger preprocess worker")
return
case nodeName := <-c.triggerPreprocessChan:
nodeInfo, has := c.nodesSets.Load(nodeName)
if !has || nodeInfo.Node == nil {
logger.Warn("Node not found",
"node", nodeName,
)
continue
}
if c.readOnly(nodeInfo.Node.Name) {
logger.Debug("Skip node",
"node", nodeInfo.Node.Name,
"reason", "read only",
)
} else {
c.preprocessChan <- nodeInfo.Node
}
}
}
}
Expand Down Expand Up @@ -506,8 +518,15 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro

// playStageWorker receives the resource from the playStageChan and play the stage
func (c *NodeController) playStageWorker(ctx context.Context) {
for node := range c.playStageChan {
c.playStage(ctx, node.Resource, node.Stage)
logger := log.FromContext(ctx)
for {
select {
case <-ctx.Done():
logger.Debug("Stop play stage worker")
return
case node := <-c.playStageChan:
c.playStage(ctx, node.Resource, node.Stage)
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/kwok/controllers/node_lease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,15 @@ func (c *NodeLeaseController) listResources(ctx context.Context, opt metav1.List
}

func (c *NodeLeaseController) syncWorker(ctx context.Context) {
for nodeName := range c.leaseChan {
c.sync(ctx, nodeName)
logger := log.FromContext(ctx)
for {
select {
case <-ctx.Done():
logger.Debug("Stop sync worker")
return
case nodeName := <-c.leaseChan:
c.sync(ctx, nodeName)
}
}
}

Expand Down
53 changes: 36 additions & 17 deletions pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,28 +234,40 @@ func (c *PodController) deleteResource(ctx context.Context, pod *corev1.Pod) err
// preprocessWorker receives the resource from the preprocessChan and preprocess it
func (c *PodController) preprocessWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for pod := range c.preprocessChan {
err := c.preprocess(ctx, pod)
if err != nil {
logger.Error("Failed to preprocess node", err,
"pod", log.KObj(pod),
"node", pod.Spec.NodeName,
)
for {
select {
case <-ctx.Done():
logger.Debug("Stop preprocess worker")
return
case pod := <-c.preprocessChan:
err := c.preprocess(ctx, pod)
if err != nil {
logger.Error("Failed to preprocess node", err,
"pod", log.KObj(pod),
"node", pod.Spec.NodeName,
)
}
}
}
}

// triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it
func (c *PodController) triggerPreprocessWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for nodeName := range c.triggerPreprocessChan {
err := c.listResources(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
})
if err != nil {
logger.Error("Failed to preprocess node", err,
"node", nodeName,
)
for {
select {
case <-ctx.Done():
logger.Debug("Stop trigger preprocess worker")
return
case nodeName := <-c.triggerPreprocessChan:
err := c.listResources(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
})
if err != nil {
logger.Error("Failed to preprocess node", err,
"node", nodeName,
)
}
}
}
}
Expand Down Expand Up @@ -327,8 +339,15 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error {

// playStageWorker receives the resource from the playStageChan and play the stage
func (c *PodController) playStageWorker(ctx context.Context) {
for pod := range c.playStageChan {
c.playStage(ctx, pod.Resource, pod.Stage)
logger := log.FromContext(ctx)
for {
select {
case <-ctx.Done():
logger.Debug("Stop play stage worker")
return
case pod := <-c.playStageChan:
c.playStage(ctx, pod.Resource, pod.Stage)
}
}
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/kwok/controllers/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/client-go/kubernetes/fake"

"sigs.k8s.io/kwok/pkg/log"
"sigs.k8s.io/kwok/pkg/utils/slices"
"sigs.k8s.io/kwok/pkg/utils/wait"
"sigs.k8s.io/kwok/stages"
)
Expand Down Expand Up @@ -298,7 +299,12 @@ func TestPodController(t *testing.T) {
t.Fatal(err)
}

pod := list.Items[0]
pod, ok := slices.Find(list.Items, func(pod corev1.Pod) bool {
return pod.Name == "pod0"
})
if !ok {
t.Fatal(fmt.Errorf("not found pod0"))
}
now := metav1.Now()
pod.DeletionTimestamp = &now
_, err = clientset.CoreV1().Pods("default").Update(ctx, &pod, metav1.UpdateOptions{})
Expand All @@ -315,7 +321,7 @@ func TestPodController(t *testing.T) {
return false, fmt.Errorf("want 4 pods, got %d", len(list.Items))
}
return true, nil
}, wait.WithContinueOnError(5))
}, wait.WithContinueOnError(10))
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/utils/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func ParseFromOutput(s string) (Version, error) {
if len(matches) == 0 {
return semver.Version{}, fmt.Errorf("failed to parse version from output: %q", s)
}
return semver.Parse(matches[2])
v := matches[2]
if strings.HasPrefix(v, "0.0.0") {
return semver.Version{}, nil
}
return semver.Parse(v)
}

// ParseFromBinary parses the version from the binary.
Expand Down
6 changes: 6 additions & 0 deletions pkg/utils/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func TestParseFromOutput(t *testing.T) {
},
want: semver.MustParse("3.5.6"),
},
{
args: args{
s: "Kubernetes v0.0.0-master+$Format:%H$",
},
want: semver.MustParse("0.0.0"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 25f4d02

Please sign in to comment.