diff --git a/cmd/argo/commands/resubmit.go b/cmd/argo/commands/resubmit.go index 1dfa11baee56..20a437fda33e 100644 --- a/cmd/argo/commands/resubmit.go +++ b/cmd/argo/commands/resubmit.go @@ -1,17 +1,37 @@ package commands import ( - "github.com/argoproj/pkg/errors" + "context" + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/argoproj/pkg/errors" "github.com/argoproj/argo-workflows/v3/cmd/argo/commands/client" workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) +type resubmitOps struct { + priority int32 // --priority + memoized bool // --memoized + namespace string // --namespace + labelSelector string // --selector + fieldSelector string // --field-selector +} + +// hasSelector returns true if the CLI arguments selects multiple workflows +func (o *resubmitOps) hasSelector() bool { + if o.labelSelector != "" || o.fieldSelector != "" { + return true + } + return false +} + func NewResubmitCommand() *cobra.Command { var ( - memoized bool - priority int32 + resubmitOpts resubmitOps cliSubmitOpts cliSubmitOpts ) command := &cobra.Command{ @@ -21,6 +41,18 @@ func NewResubmitCommand() *cobra.Command { argo resubmit my-wf +# Resubmit multiple workflows: + + argo resubmit my-wf my-other-wf my-third-wf + +# Resubmit multiple workflows by label selector: + + argo resubmit -l workflows.argoproj.io/test=true + +# Resubmit multiple workflows by field selector: + + argo resubmit --field-selector metadata.namespace=argo + # Resubmit and wait for completion: argo resubmit --wait my-wf.yaml @@ -39,31 +71,77 @@ func NewResubmitCommand() *cobra.Command { `, Run: func(cmd *cobra.Command, args []string) { if cmd.Flag("priority").Changed { - cliSubmitOpts.priority = &priority + cliSubmitOpts.priority = &resubmitOpts.priority } ctx, apiClient := client.NewAPIClient() serviceClient := apiClient.NewWorkflowServiceClient() - namespace := client.Namespace() - - for _, name := range args { - created, err := serviceClient.ResubmitWorkflow(ctx, &workflowpkg.WorkflowResubmitRequest{ - Namespace: namespace, - Name: name, - Memoized: memoized, - }) - errors.CheckError(err) - printWorkflow(created, getFlags{output: cliSubmitOpts.output}) - waitWatchOrLog(ctx, serviceClient, namespace, []string{created.Name}, cliSubmitOpts) - } + resubmitOpts.namespace = client.Namespace() + err := resubmitWorkflows(ctx, serviceClient, resubmitOpts, cliSubmitOpts, args) + errors.CheckError(err) }, } - command.Flags().Int32Var(&priority, "priority", 0, "workflow priority") + command.Flags().Int32Var(&resubmitOpts.priority, "priority", 0, "workflow priority") command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide") - command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete") - command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes") + command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete, only works when a single workflow is resubmitted") + command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes, only works when a single workflow is resubmitted") command.Flags().BoolVar(&cliSubmitOpts.log, "log", false, "log the workflow until it completes") - command.Flags().BoolVar(&memoized, "memoized", false, "re-use successful steps & outputs from the previous run (experimental)") + command.Flags().BoolVar(&resubmitOpts.memoized, "memoized", false, "re-use successful steps & outputs from the previous run (experimental)") + command.Flags().StringVarP(&resubmitOpts.labelSelector, "selector", "l", "", "Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)") + command.Flags().StringVar(&resubmitOpts.fieldSelector, "field-selector", "", "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.") return command } + +// resubmitWorkflows resubmits workflows by given resubmitOpts or workflow names +func resubmitWorkflows(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, resubmitOpts resubmitOps, cliSubmitOpts cliSubmitOpts, args []string) error { + var ( + wfs wfv1.Workflows + err error + ) + if resubmitOpts.hasSelector() { + wfs, err = listWorkflows(ctx, serviceClient, listFlags{ + namespace: resubmitOpts.namespace, + fields: resubmitOpts.fieldSelector, + labels: resubmitOpts.labelSelector, + }) + if err != nil { + return err + } + } + + for _, n := range args { + wfs = append(wfs, wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: n, + Namespace: resubmitOpts.namespace, + }, + }) + } + + var lastResubmitted *wfv1.Workflow + resubmittedNames := make(map[string]bool) + + for _, wf := range wfs { + if _, ok := resubmittedNames[wf.Name]; ok { + // de-duplication in case there is an overlap between the selector and given workflow names + continue + } + resubmittedNames[wf.Name] = true + + lastResubmitted, err = serviceClient.ResubmitWorkflow(ctx, &workflowpkg.WorkflowResubmitRequest{ + Namespace: wf.Namespace, + Name: wf.Name, + Memoized: resubmitOpts.memoized, + }) + if err != nil { + return err + } + printWorkflow(lastResubmitted, getFlags{output: cliSubmitOpts.output}) + } + if len(resubmittedNames) == 1 { + // watch or wait when there is only one workflow retried + waitWatchOrLog(ctx, serviceClient, lastResubmitted.Namespace, []string{lastResubmitted.Name}, cliSubmitOpts) + } + return nil +} diff --git a/cmd/argo/commands/resubmit_test.go b/cmd/argo/commands/resubmit_test.go new file mode 100644 index 000000000000..42783e165d55 --- /dev/null +++ b/cmd/argo/commands/resubmit_test.go @@ -0,0 +1,164 @@ +package commands + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" + workflowmocks "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow/mocks" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" +) + +func Test_resubmitWorkflows(t *testing.T) { + t.Run("Resubmit workflow by names", func(t *testing.T) { + c := &workflowmocks.WorkflowServiceClient{} + resubmitOpts := resubmitOps{ + namespace: "argo", + } + cliSubmitOpts := cliSubmitOpts{} + + c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil) + + err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{"foo", "bar"}) + c.AssertNumberOfCalls(t, "ResubmitWorkflow", 2) + + assert.NoError(t, err) + }) + + t.Run("Resubmit workflow with memoization", func(t *testing.T) { + c := &workflowmocks.WorkflowServiceClient{} + resubmitOpts := resubmitOps{ + namespace: "argo", + memoized: true, + } + cliSubmitOpts := cliSubmitOpts{} + + c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil) + + err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{"foo"}) + c.AssertNumberOfCalls(t, "ResubmitWorkflow", 1) + c.AssertCalled(t, "ResubmitWorkflow", mock.Anything, &workflowpkg.WorkflowResubmitRequest{ + Name: "foo", + Namespace: "argo", + Memoized: true, + }) + + assert.NoError(t, err) + }) + + t.Run("Resubmit workflow by selector", func(t *testing.T) { + c := &workflowmocks.WorkflowServiceClient{} + resubmitOpts := resubmitOps{ + namespace: "argo", + labelSelector: "custom-label=true", + } + cliSubmitOpts := cliSubmitOpts{} + + wfListReq := &workflowpkg.WorkflowListRequest{ + Namespace: "argo", + ListOptions: &metav1.ListOptions{ + LabelSelector: resubmitOpts.labelSelector, + }, + } + + wfList := &wfv1.WorkflowList{Items: wfv1.Workflows{ + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "argo"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "argo"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "argo"}}, + }} + + c.On("ListWorkflows", mock.Anything, wfListReq).Return(wfList, nil) + c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil) + + err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{}) + + c.AssertNumberOfCalls(t, "ResubmitWorkflow", 3) + for _, wf := range wfList.Items { + resubmitReq := &workflowpkg.WorkflowResubmitRequest{ + Name: wf.Name, + Namespace: wf.Namespace, + Memoized: false, + } + c.AssertCalled(t, "ResubmitWorkflow", mock.Anything, resubmitReq) + } + + assert.NoError(t, err) + }) + + t.Run("Resubmit workflow by selector and name", func(t *testing.T) { + c := &workflowmocks.WorkflowServiceClient{} + resubmitOpts := resubmitOps{ + namespace: "argo", + labelSelector: "custom-label=true", + } + cliSubmitOpts := cliSubmitOpts{} + + wfListReq := &workflowpkg.WorkflowListRequest{ + Namespace: "argo", + ListOptions: &metav1.ListOptions{ + LabelSelector: resubmitOpts.labelSelector, + }, + } + + wfList := &wfv1.WorkflowList{Items: wfv1.Workflows{ + {ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz"}}, + }} + + c.On("ListWorkflows", mock.Anything, wfListReq).Return(wfList, nil) + + c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil) + + err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{"foo", "qux"}) + // after de-duplication, there will be 4 workflows to resubmit + c.AssertNumberOfCalls(t, "ResubmitWorkflow", 4) + + // the 3 workflows from the selectors: "foo", "bar", "baz" + for _, wf := range wfList.Items { + resubmitReq := &workflowpkg.WorkflowResubmitRequest{ + Name: wf.Name, + Namespace: wf.Namespace, + Memoized: false, + } + c.AssertCalled(t, "ResubmitWorkflow", mock.Anything, resubmitReq) + } + + // the 1 workflow by the given name "qux + c.AssertCalled(t, "ResubmitWorkflow", mock.Anything, &workflowpkg.WorkflowResubmitRequest{ + Name: "qux", + Namespace: "argo", + Memoized: false, + }) + + assert.NoError(t, err) + }) + + t.Run("Resubmit workflow list error", func(t *testing.T) { + c := &workflowmocks.WorkflowServiceClient{} + resubmitOpts := resubmitOps{ + namespace: "argo", + labelSelector: "custom-label=true", + } + cliSubmitOpts := cliSubmitOpts{} + c.On("ListWorkflows", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")) + err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{}) + assert.Errorf(t, err, "mock error") + }) + + t.Run("Resubmit workflow error", func(t *testing.T) { + c := &workflowmocks.WorkflowServiceClient{} + resubmitOpts := resubmitOps{ + namespace: "argo", + } + cliSubmitOpts := cliSubmitOpts{} + c.On("ResubmitWorkflow", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")) + err := resubmitWorkflows(context.Background(), c, resubmitOpts, cliSubmitOpts, []string{"foo"}) + assert.Errorf(t, err, "mock error") + }) +} diff --git a/docs/cli/argo_resubmit.md b/docs/cli/argo_resubmit.md index 12da92d94c05..f2809c00e34f 100644 --- a/docs/cli/argo_resubmit.md +++ b/docs/cli/argo_resubmit.md @@ -17,6 +17,18 @@ argo resubmit [WORKFLOW...] [flags] argo resubmit my-wf +# Resubmit multiple workflows: + + argo resubmit my-wf my-other-wf my-third-wf + +# Resubmit multiple workflows by label selector: + + argo resubmit -l workflows.argoproj.io/test=true + +# Resubmit multiple workflows by field selector: + + argo resubmit --field-selector metadata.namespace=argo + # Resubmit and wait for completion: argo resubmit --wait my-wf.yaml @@ -38,13 +50,15 @@ argo resubmit [WORKFLOW...] [flags] ### Options ``` - -h, --help help for resubmit - --log log the workflow until it completes - --memoized re-use successful steps & outputs from the previous run (experimental) - -o, --output string Output format. One of: name|json|yaml|wide - --priority int32 workflow priority - -w, --wait wait for the workflow to complete - --watch watch the workflow until it completes + --field-selector string Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type. + -h, --help help for resubmit + --log log the workflow until it completes + --memoized re-use successful steps & outputs from the previous run (experimental) + -o, --output string Output format. One of: name|json|yaml|wide + --priority int32 workflow priority + -l, --selector string Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2) + -w, --wait wait for the workflow to complete, only works when a single workflow is resubmitted + --watch watch the workflow until it completes, only works when a single workflow is resubmitted ``` ### Options inherited from parent commands diff --git a/test/e2e/cli_test.go b/test/e2e/cli_test.go index 85e77236d3c3..685ac27b3efc 100644 --- a/test/e2e/cli_test.go +++ b/test/e2e/cli_test.go @@ -990,6 +990,42 @@ func (s *CLISuite) TestWorkflowResubmit() { }) } +func (s *CLISuite) TestWorkflowResubmitByLabelSelector() { + s.Given(). + Workflow("@testdata/exit-1.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(). + Given(). + RunCli([]string{"resubmit", "--selector", "workflows.argoproj.io/test=true"}, func(t *testing.T, output string, err error) { + if assert.NoError(t, err) { + assert.Contains(t, output, "Name:") + assert.Contains(t, output, "Namespace:") + assert.Contains(t, output, "ServiceAccount:") + assert.Contains(t, output, "Status:") + assert.Contains(t, output, "Created:") + } + }) +} + +func (s *CLISuite) TestWorkflowResubmitByFieldSelector() { + s.Given(). + Workflow("@testdata/exit-1.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(). + Given(). + RunCli([]string{"resubmit", "--field-selector", "metadata.namespace=argo"}, func(t *testing.T, output string, err error) { + if assert.NoError(t, err) { + assert.Contains(t, output, "Name:") + assert.Contains(t, output, "Namespace:") + assert.Contains(t, output, "ServiceAccount:") + assert.Contains(t, output, "Status:") + assert.Contains(t, output, "Created:") + } + }) +} + func (s *CLISuite) TestCron() { s.Run("Lint", func() { s.Given().RunCli([]string{"cron", "lint", "cron/basic.yaml"}, func(t *testing.T, output string, err error) {