diff --git a/cmd/argoexec/commands/agent.go b/cmd/argoexec/commands/agent.go index 84c2c716e5a1..ead89fedc599 100644 --- a/cmd/argoexec/commands/agent.go +++ b/cmd/argoexec/commands/agent.go @@ -11,7 +11,6 @@ import ( restclient "k8s.io/client-go/rest" "github.com/argoproj/argo-workflows/v3" - workflow "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/argoproj/argo-workflows/v3/util/logs" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/executor" @@ -49,14 +48,5 @@ func initAgentExecutor() *executor.AgentExecutor { if !ok { log.Fatalf("Unable to determine workflow name from environment variable %s", common.EnvVarWorkflowName) } - agentExecutor := executor.AgentExecutor{ - ClientSet: clientSet, - RESTClient: restClient, - Namespace: namespace, - WorkflowName: workflowName, - WorkflowInterface: workflow.NewForConfigOrDie(config), - CompleteTask: make(map[string]struct{}), - } - return &agentExecutor - + return executor.NewAgentExecutor(clientSet, restClient, config, namespace, workflowName) } diff --git a/docs/environment-variables.md b/docs/environment-variables.md index bcbbcfc93b39..1e3931a5c56c 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -8,6 +8,7 @@ Note that these environment variables may be removed at any time. | Name | Type | Default | Description | |------|------|---------|-------------| +| `ARGO_AGENT_TASK_WORKERS` | `int` | `16` | The number of task workers for the agent pod. | | `ALL_POD_CHANGES_SIGNIFICANT` | `bool` | `false` | Whether to consider all pod changes as significant during pod reconciliation. | | `ALWAYS_OFFLOAD_NODE_STATUS` | `bool` | `false` | Whether to always offload the node status. | | `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | `24h` | The periodicity for GC of archived workflows. | diff --git a/test/e2e/agent_test.go b/test/e2e/agent_test.go new file mode 100644 index 000000000000..dc29f5bf22ce --- /dev/null +++ b/test/e2e/agent_test.go @@ -0,0 +1,85 @@ +//go:build functional +// +build functional + +package e2e + +import ( + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/test/e2e/fixtures" +) + +type AgentSuite struct { + fixtures.E2ESuite +} + +func (s *AgentSuite) TestParallel() { + s.Given(). + Workflow(`apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: http-template-par + workflowMetadata: + labels: + workflows.argoproj.io/test: "true" +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: one + template: http + arguments: + parameters: [{name: url, value: "http://httpstat.us/200?sleep=5000"}] + - name: two + template: http + arguments: + parameters: [{name: url, value: "http://httpstat.us/200?sleep=5000"}] + - name: three + template: http + arguments: + parameters: [{name: url, value: "http://httpstat.us/200?sleep=5000"}] + - name: four + template: http + arguments: + parameters: [{name: url, value: "http://httpstat.us/200?sleep=5000"}] + - name: http + inputs: + parameters: + - name: url + http: + url: "{{inputs.parameters.url}}" +`). + When(). + SubmitWorkflow(). + WaitForWorkflow(time.Minute). + Then(). + ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase) + // Ensure that the workflow ran for less than 20 seconds (5 seconds per task, 4 tasks) + assert.True(t, status.FinishedAt.Sub(status.StartedAt.Time) < time.Duration(20)*time.Second) + + var finishedTimes []time.Time + for _, node := range status.Nodes { + if node.Type != wfv1.NodeTypeHTTP { + continue + } + finishedTimes = append(finishedTimes, node.FinishedAt.Time) + } + + if assert.Len(t, finishedTimes, 4) { + sort.Slice(finishedTimes, func(i, j int) bool { + return finishedTimes[i].Before(finishedTimes[j]) + }) + + // Everything finished with a two second window + assert.True(t, finishedTimes[3].Sub(finishedTimes[0]) < time.Duration(2)*time.Second) + } + }) +} diff --git a/workflow/controller/agent.go b/workflow/controller/agent.go index 459c512e9f98..f112dd216fd6 100644 --- a/workflow/controller/agent.go +++ b/workflow/controller/agent.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "os" log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" @@ -14,6 +15,7 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/workflow/common" + "github.com/argoproj/argo-workflows/v3/workflow/executor" ) func (woc *wfOperationCtx) getAgentPodName() string { @@ -80,6 +82,18 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro } } + envVars := []apiv1.EnvVar{ + {Name: common.EnvVarWorkflowName, Value: woc.wf.Name}, + } + + // If the default number of task workers is overridden, then pass it to the agent pod. + if taskWorkers, exists := os.LookupEnv(executor.EnvAgentTaskWorkers); exists { + envVars = append(envVars, apiv1.EnvVar{ + Name: executor.EnvAgentTaskWorkers, + Value: taskWorkers, + }) + } + pod := &apiv1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, @@ -101,9 +115,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro Command: []string{"argoexec"}, Args: []string{"agent"}, Image: woc.controller.executorImage(), - Env: []apiv1.EnvVar{ - {Name: common.EnvVarWorkflowName, Value: woc.wf.Name}, - }, + Env: envVars, }, }, }, diff --git a/workflow/executor/agent.go b/workflow/executor/agent.go index 8f8cab934104..d66268b45f81 100644 --- a/workflow/executor/agent.go +++ b/workflow/executor/agent.go @@ -6,7 +6,7 @@ import ( "encoding/json" "fmt" "net/http" - "os" + "time" log "github.com/sirupsen/logrus" apierr "k8s.io/apimachinery/pkg/api/errors" @@ -17,10 +17,12 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "github.com/argoproj/argo-workflows/v3/errors" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" workflow "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" + "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util" + "github.com/argoproj/argo-workflows/v3/util/env" + "github.com/argoproj/argo-workflows/v3/util/errors" "github.com/argoproj/argo-workflows/v3/workflow/common" argohttp "github.com/argoproj/argo-workflows/v3/workflow/executor/http" ) @@ -31,13 +33,47 @@ type AgentExecutor struct { WorkflowInterface workflow.Interface RESTClient rest.Interface Namespace string - CompleteTask map[string]struct{} + consideredTasks map[string]bool } +func NewAgentExecutor(clientSet kubernetes.Interface, restClient rest.Interface, config *rest.Config, namespace, workflowName string) *AgentExecutor { + return &AgentExecutor{ + ClientSet: clientSet, + RESTClient: restClient, + Namespace: namespace, + WorkflowName: workflowName, + WorkflowInterface: workflow.NewForConfigOrDie(config), + consideredTasks: make(map[string]bool), + } +} + +type task struct { + NodeId string + Template wfv1.Template +} + +type response struct { + NodeId string + Result *wfv1.NodeResult +} + +const EnvAgentTaskWorkers = "ARGO_AGENT_TASK_WORKERS" + func (ae *AgentExecutor) Agent(ctx context.Context) error { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + taskWorkers := env.LookupEnvIntOr(EnvAgentTaskWorkers, 16) + log.WithField("task_workers", taskWorkers).Info("Starting Agent s15") + + taskQueue := make(chan task) + responseQueue := make(chan response) taskSetInterface := ae.WorkflowInterface.ArgoprojV1alpha1().WorkflowTaskSets(ae.Namespace) + + go ae.patchWorker(ctx, taskSetInterface, responseQueue) + for i := 0; i < taskWorkers; i++ { + go ae.taskWorker(ctx, taskQueue, responseQueue) + } + for { wfWatch, err := taskSetInterface.Watch(ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + ae.WorkflowName}) if err != nil { @@ -45,65 +81,117 @@ func (ae *AgentExecutor) Agent(ctx context.Context) error { } for event := range wfWatch.ResultChan() { - log.WithField("taskset", ae.WorkflowName).Infof("watching taskset, %v", event) + log.WithFields(log.Fields{"workflow": ae.WorkflowName, "event_type": event.Type}).Infof("TaskSet Event") if event.Type == watch.Deleted { // We're done if the task set is deleted return nil } - obj, ok := event.Object.(*wfv1.WorkflowTaskSet) + taskSet, ok := event.Object.(*wfv1.WorkflowTaskSet) if !ok { return apierr.FromObject(event.Object) } - if IsWorkflowCompleted(obj) { - log.WithField("taskset", ae.WorkflowName).Info("stopped agent") - os.Exit(0) + if IsWorkflowCompleted(taskSet) { + log.WithField("workflow", ae.WorkflowName).Info("Workflow completed... stopping agent") + return nil } - tasks := obj.Spec.Tasks - for nodeID, tmpl := range tasks { - - if _, ok := ae.CompleteTask[nodeID]; ok { - continue - } - switch { - case tmpl.HTTP != nil: - result := wfv1.NodeResult{} - if outputs, err := ae.executeHTTPTemplate(ctx, tmpl); err != nil { - result.Phase = wfv1.NodeFailed - result.Message = err.Error() - } else { - result.Phase = wfv1.NodeSucceeded - result.Outputs = outputs - } + for nodeID, tmpl := range taskSet.Spec.Tasks { + taskQueue <- task{NodeId: nodeID, Template: tmpl} + } + } + } +} - nodeResults := map[string]wfv1.NodeResult{} +func (ae *AgentExecutor) taskWorker(ctx context.Context, taskQueue chan task, responseQueue chan response) { + for task := range taskQueue { + nodeID, tmpl := task.NodeId, task.Template + log.WithFields(log.Fields{"nodeID": nodeID}).Info("Attempting task") - nodeResults[nodeID] = result + // Do not work on tasks that have already been considered once, to prevent calling an endpoint more + // than once unintentionally. + if _, ok := ae.consideredTasks[nodeID]; ok { + log.WithFields(log.Fields{"nodeID": nodeID}).Info("Task is already considered") + continue + } - patch, err := json.Marshal(map[string]interface{}{"status": wfv1.WorkflowTaskSetStatus{Nodes: nodeResults}}) + ae.consideredTasks[nodeID] = true - if err != nil { - return errors.InternalWrapError(err) - } + log.WithFields(log.Fields{"nodeID": nodeID}).Info("Processing task") + result, err := ae.processTask(ctx, tmpl) + if err != nil { + log.WithFields(log.Fields{"error": err, "nodeID": nodeID}).Error("Error in agent task") + return + } - log.WithFields(log.Fields{"taskset": obj, "workflow": ae.WorkflowName, "namespace": ae.Namespace}).Infof("Patch content, %s", patch) + log.WithFields(log.Fields{"nodeID": nodeID}).Info("Sending result") + responseQueue <- response{NodeId: nodeID, Result: result} + } +} - obj, err = taskSetInterface.Patch(ctx, ae.WorkflowName, types.MergePatchType, patch, metav1.PatchOptions{}) +func (ae *AgentExecutor) patchWorker(ctx context.Context, taskSetInterface v1alpha1.WorkflowTaskSetInterface, responseQueue chan response) { + ticker := time.NewTicker(1 * time.Second) + nodeResults := map[string]wfv1.NodeResult{} + for { + select { + case res := <-responseQueue: + nodeResults[res.NodeId] = *res.Result + case <-ticker.C: + if len(nodeResults) == 0 { + continue + } - log.WithField("taskset", obj).Infof("updated content, %s", patch) + patch, err := json.Marshal(map[string]interface{}{"status": wfv1.WorkflowTaskSetStatus{Nodes: nodeResults}}) + if err != nil { + log.WithError(err).Error("Generating Patch Failed") + continue + } - ae.CompleteTask[nodeID] = struct{}{} + log.WithFields(log.Fields{"workflow": ae.WorkflowName}).Info("Processing Patch") - if err != nil { - log.WithError(err).WithField("taskset", obj).Errorf("failed to update the taskset") + obj, err := taskSetInterface.Patch(ctx, ae.WorkflowName, types.MergePatchType, patch, metav1.PatchOptions{}) + if err != nil { + isTransientErr := errors.IsTransientErr(err) + log.WithError(err).WithFields(log.Fields{"taskset": obj, "is_transient_error": isTransientErr}).Errorf("TaskSet Patch Failed") + + // If this is not a transient error, then it's likely that the contents of the patch have caused the error. + // To avoid a deadlock with the workflow overall, or an infinite loop, fail and propagate the error messages + // to the nodes. + // If this is a transient error, then simply do nothing and another patch will be retried in the next tick. + if !isTransientErr { + for node := range nodeResults { + nodeResults[node] = wfv1.NodeResult{ + Phase: wfv1.NodeError, + Message: fmt.Sprintf("HTTP request completed successfully but an error occurred when patching its result: %s", err), + } } - default: - return fmt.Errorf("agent cannot execute: unknown task type") } + continue } + + // Patch was successful, clear nodeResults for next iteration + nodeResults = map[string]wfv1.NodeResult{} + + log.WithField("taskset", obj).Infof("Patched TaskSet") + } + } +} + +func (ae *AgentExecutor) processTask(ctx context.Context, tmpl wfv1.Template) (*wfv1.NodeResult, error) { + switch { + case tmpl.HTTP != nil: + var result wfv1.NodeResult + if outputs, err := ae.executeHTTPTemplate(ctx, tmpl); err != nil { + result.Phase = wfv1.NodeFailed + result.Message = err.Error() + } else { + result.Phase = wfv1.NodeSucceeded + result.Outputs = outputs } + return &result, nil + default: + return nil, fmt.Errorf("agent cannot execute: unknown task type") } } @@ -116,6 +204,7 @@ func (ae *AgentExecutor) executeHTTPTemplate(ctx context.Context, tmpl wfv1.Temp if err != nil { return nil, err } + request = request.WithContext(ctx) for _, header := range httpTemplate.Headers { value := header.Value @@ -139,6 +228,5 @@ func (ae *AgentExecutor) executeHTTPTemplate(ctx context.Context, tmpl wfv1.Temp } func IsWorkflowCompleted(wts *wfv1.WorkflowTaskSet) bool { - value := wts.Labels[common.LabelKeyCompleted] - return value == "true" + return wts.Labels[common.LabelKeyCompleted] == "true" }