Skip to content

Commit

Permalink
fix: Fix active pods count in node pending status with pod deleted. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 authored May 13, 2021
1 parent 0207105 commit d9e583a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
15 changes: 15 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,21 @@ func createRunningPods(ctx context.Context, woc *wfOperationCtx) {
}
}

func syncPodsInformer(ctx context.Context, woc *wfOperationCtx, podObjs ...apiv1.Pod) {
podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace())
pods, err := podcs.List(ctx, metav1.ListOptions{})
if err != nil {
panic(err)
}
podObjs = append(podObjs, pods.Items...)
for _, pod := range podObjs {
err = woc.controller.podInformer.GetIndexer().Add(&pod)
if err != nil {
panic(err)
}
}
}

// makePodsPhase acts like a pod controller and simulates the transition of pods transitioning into a specified state
func makePodsPhase(ctx context.Context, woc *wfOperationCtx, phase apiv1.PodPhase, with ...with) {
podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace())
Expand Down
11 changes: 9 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
// If the node is pending and the pod does not exist, it could be the case that we want to try to submit it
// again instead of marking it as an error. Check if that's the case.
// Node will be in pending state without Pod create if Node is waiting for Synchronize lock
if node.Pending() && node.GetReason() == wfv1.WaitingForSyncLock {
if node.Pending() {
continue
}

Expand Down Expand Up @@ -1059,12 +1059,19 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 {
// Do not include pending nodes that are waiting for a lock
continue
}
activePods++
if woc.nodePodExist(node) {
activePods++
}
}
}
return activePods
}

func (woc *wfOperationCtx) nodePodExist(node wfv1.NodeStatus) bool {
_, podExist, _ := woc.controller.podInformer.GetIndexer().GetByKey(fmt.Sprintf("%s/%s", woc.wf.Namespace, node.ID))
return podExist
}

// countActiveChildren counts the number of active (Pending/Running) children nodes of parent parentName
func (woc *wfOperationCtx) countActiveChildren(boundaryIDs ...string) int64 {
boundaryID := ""
Expand Down
11 changes: 4 additions & 7 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: parallelism-limit
namespace: default
spec:
entrypoint: parallelism-limit
parallelism: 2
Expand Down Expand Up @@ -1331,7 +1332,7 @@ func TestWorkflowParallelismLimit(t *testing.T) {
defer cancel()

ctx := context.Background()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("default")
wf := unmarshalWF(workflowParallelismLimit)
wf, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
Expand All @@ -1345,6 +1346,7 @@ func TestWorkflowParallelismLimit(t *testing.T) {
assert.Equal(t, 2, len(pods.Items))
// operate again and make sure we don't schedule any more pods
makePodsPhase(ctx, woc, apiv1.PodRunning)
syncPodsInformer(ctx, woc)
wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
// wfBytes, _ := json.MarshalIndent(wf, "", " ")
Expand Down Expand Up @@ -6300,10 +6302,7 @@ metadata:
name: hello-world-4srt7
namespace: argo
spec:
activeDeadlineSeconds: 300
entrypoint: whalesay
podSpecPatch: |
terminationGracePeriodSeconds: 3
templates:
- container:
args:
Expand All @@ -6313,8 +6312,6 @@ spec:
image: docker/whalesay:latest
name: ""
name: whalesay
ttlStrategy:
secondsAfterCompletion: 600
status:
artifactRepositoryRef:
configMap: artifact-repositories
Expand Down Expand Up @@ -6346,7 +6343,7 @@ func TestWfPendingWithNoPod(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
}

var wfPendingWithSync = `apiVersion: argoproj.io/v1alpha1
Expand Down

0 comments on commit d9e583a

Please sign in to comment.