Skip to content

Commit

Permalink
fix: fix missing artifacts for stopped workflows. Fixes #12401 (#12402)
Browse files Browse the repository at this point in the history
Signed-off-by: Garett MacGowan <garettsoftware@gmail.com>
  • Loading branch information
Garett-MacGowan authored and sarabala1979 committed Jan 8, 2024
1 parent 852f8a3 commit 5568a25
Show file tree
Hide file tree
Showing 21 changed files with 910 additions and 771 deletions.
4 changes: 2 additions & 2 deletions api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func waitContainer(ctx context.Context) error {
defer stats.LogStats()
stats.StartStatsTicker(5 * time.Minute)

// Create a new empty (placeholder) task result with LabelKeyReportOutputsCompleted set to false.
wfExecutor.InitializeOutput(bgCtx)

// Wait for main container to complete
err := wfExecutor.Wait(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ WorkflowStatus contains overall status information about a workflow
|`storedTemplates`|[`Template`](#template)|StoredTemplates is a mapping between a template ref and the node's status.|
|`storedWorkflowTemplateSpec`|[`WorkflowSpec`](#workflowspec)|StoredWorkflowSpec stores the WorkflowTemplate spec for future execution.|
|`synchronization`|[`SynchronizationStatus`](#synchronizationstatus)|Synchronization stores the status of synchronization locks|
|`taskResultsCompleted`|`Map< boolean , string >`|Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.|
|`taskResultsCompletionStatus`|`Map< boolean , string >`|TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.|

## CronWorkflowSpec

Expand Down
2 changes: 1 addition & 1 deletion manifests/base/crds/full/argoproj.io_workflows.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,416 changes: 708 additions & 708 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 19 additions & 17 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1940,29 +1940,31 @@ type WorkflowStatus struct {
// ArtifactGCStatus maintains the status of Artifact Garbage Collection
ArtifactGCStatus *ArtGCStatus `json:"artifactGCStatus,omitempty" protobuf:"bytes,19,opt,name=artifactGCStatus"`

// Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.
TaskResultsCompleted map[string]bool `json:"taskResultsCompleted,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompleted"`
// TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.
TaskResultsCompletionStatus map[string]bool `json:"taskResultsCompletionStatus,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompletionStatus"`
}

func (ws *WorkflowStatus) InitializeTaskResultIncomplete(resultName string) {
if ws.TaskResultsCompleted == nil {
ws.TaskResultsCompleted = make(map[string]bool)
}
if _, ok := ws.TaskResultsCompleted[resultName]; !ok {
ws.MarkTaskResultIncomplete(resultName)
func (ws *WorkflowStatus) MarkTaskResultIncomplete(name string) {
if ws.TaskResultsCompletionStatus == nil {
ws.TaskResultsCompletionStatus = make(map[string]bool)
}
ws.TaskResultsCompletionStatus[name] = false
}

func (ws *WorkflowStatus) MarkTaskResultComplete(name string) {
ws.TaskResultsCompleted[name] = true
}
func (ws *WorkflowStatus) MarkTaskResultIncomplete(name string) {
ws.TaskResultsCompleted[name] = false
}
func (ws *WorkflowStatus) GetTaskResultCompleted(name string) bool {
return ws.TaskResultsCompleted[name]
if ws.TaskResultsCompletionStatus == nil {
ws.TaskResultsCompletionStatus = make(map[string]bool)
}
ws.TaskResultsCompletionStatus[name] = true
}
func (ws *WorkflowStatus) GetTaskResultsCompleted() map[string]bool {
return ws.TaskResultsCompleted

func (ws *WorkflowStatus) TaskResultsInProgress() bool {
for _, value := range ws.TaskResultsCompletionStatus {
if !value {
return true
}
}
return false
}

func (ws *WorkflowStatus) IsOffloadNodeStatus() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions sdks/python/client/docs/WorkflowServiceApi.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,29 @@ spec:
})
}

func (s *ArgoServerSuite) TestArtifactServerArchivedStoppedWorkflow() {
var uid types.UID
var nodeID string
s.Given().
Workflow(`@testdata/artifact-workflow-stopped.yaml`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeArchived).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
uid = metadata.UID
nodeID = status.Nodes.FindByDisplayName("create-artifact").ID
})

s.Run("GetArtifactByNodeID", func() {
s.e().GET("/artifact-files/argo/archived-workflows/{uid}/{nodeID}/outputs/artifact-creator", uid, nodeID).
Expect().
Status(200).
Body().
Contains("testing")
})
}

// make sure we can download an artifact
func (s *ArgoServerSuite) TestArtifactServer() {
var uid types.UID
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/daemon_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ spec:
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
WaitForWorkflow(fixtures.ToBeCompleted).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.False(t, status.FinishedAt.IsZero())
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ func (s *FunctionalSuite) TestDataTransformation() {
Workflow("@testdata/data-transformation.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(90 * time.Second).
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
Expand Down
99 changes: 99 additions & 0 deletions test/e2e/testdata/artifact-workflow-stopped.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: wf-stopped-
spec:
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflows.argoproj.io/workflow: "wf-stopped"
entrypoint: wf-stopped-main
serviceAccountName: argo
executor:
serviceAccountName: default
volumeClaimTemplates:
- metadata:
name: artifacts
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi
templates:
- name: wf-stopped-main
dag:
tasks:
- name: create-artifact
template: artifact-creator
- name: delay-stop-workflow
template: delay-stop
- name: stop-workflow
template: workflow-stopper
dependencies: [delay-stop-workflow]
- name: delay-stop
container:
image: alpine:latest
volumeMounts:
- name: artifacts
mountPath: /mnt/vol
command: [sh, -c]
args:
- |
echo "Delaying workflow stop"
ls /mnt
x=0
while [ $x -le 60 ]
do
sleep 1
if [ -f "/mnt/vol/test.txt" ]; then
echo "Artifact found in shared volume"
break
fi
x=$(( $x + 1 ))
done
- name: workflow-stopper
container:
image: argoproj/argocli:latest
args:
- stop
- -l
- workflows.argoproj.io/workflow=wf-stopped
- --namespace=argo
- --loglevel=debug
- name: artifact-creator
metadata:
labels:
template: "artifact-creator"
container:
image: alpine:latest
volumeMounts:
- name: artifacts
mountPath: /mnt/vol
command: [sh, -c]
args:
- |
echo 'testing' > /mnt/vol/test.txt
echo "Artifact saved to /mnt/vol/test.txt"
echo "Pretending to continue to do work."
ls /mnt
while :
do
sleep 1
done
outputs:
artifacts:
- name: artifact-creator
path: /mnt/vol/test.txt
s3:
key: artifact-creator
bucket: my-bucket-3
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
archive:
none: {}
Loading

0 comments on commit 5568a25

Please sign in to comment.