Skip to content

Commit

Permalink
feat: Allow parallel HTTP requests (#7113)
Browse files Browse the repository at this point in the history
* feat: Allow parallel HTTP requests
  • Loading branch information
simster7 committed Nov 23, 2021
1 parent 9ac6b3d commit 160bdc6
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 54 deletions.
12 changes: 1 addition & 11 deletions cmd/argoexec/commands/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
restclient "k8s.io/client-go/rest"

"github.com/argoproj/argo-workflows/v3"
workflow "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/argoproj/argo-workflows/v3/util/logs"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/executor"
Expand Down Expand Up @@ -49,14 +48,5 @@ func initAgentExecutor() *executor.AgentExecutor {
if !ok {
log.Fatalf("Unable to determine workflow name from environment variable %s", common.EnvVarWorkflowName)
}
agentExecutor := executor.AgentExecutor{
ClientSet: clientSet,
RESTClient: restClient,
Namespace: namespace,
WorkflowName: workflowName,
WorkflowInterface: workflow.NewForConfigOrDie(config),
CompleteTask: make(map[string]struct{}),
}
return &agentExecutor

return executor.NewAgentExecutor(clientSet, restClient, config, namespace, workflowName)
}
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Note that these environment variables may be removed at any time.

| Name | Type | Default | Description |
|------|------|---------|-------------|
| `ARGO_AGENT_TASK_WORKERS` | `int` | `16` | The number of task workers for the agent pod. |
| `ALL_POD_CHANGES_SIGNIFICANT` | `bool` | `false` | Whether to consider all pod changes as significant during pod reconciliation. |
| `ALWAYS_OFFLOAD_NODE_STATUS` | `bool` | `false` | Whether to always offload the node status. |
| `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | `24h` | The periodicity for GC of archived workflows. |
Expand Down
85 changes: 85 additions & 0 deletions test/e2e/agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//go:build functional
// +build functional

package e2e

import (
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

type AgentSuite struct {
fixtures.E2ESuite
}

func (s *AgentSuite) TestParallel() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: http-template-par
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: one
template: http
arguments:
parameters: [{name: url, value: "http://httpstat.us/200?sleep=5000"}]
- name: two
template: http
arguments:
parameters: [{name: url, value: "http://httpstat.us/200?sleep=5000"}]
- name: three
template: http
arguments:
parameters: [{name: url, value: "http://httpstat.us/200?sleep=5000"}]
- name: four
template: http
arguments:
parameters: [{name: url, value: "http://httpstat.us/200?sleep=5000"}]
- name: http
inputs:
parameters:
- name: url
http:
url: "{{inputs.parameters.url}}"
`).
When().
SubmitWorkflow().
WaitForWorkflow(time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
// Ensure that the workflow ran for less than 20 seconds (5 seconds per task, 4 tasks)
assert.True(t, status.FinishedAt.Sub(status.StartedAt.Time) < time.Duration(20)*time.Second)

var finishedTimes []time.Time
for _, node := range status.Nodes {
if node.Type != wfv1.NodeTypeHTTP {
continue
}
finishedTimes = append(finishedTimes, node.FinishedAt.Time)
}

if assert.Len(t, finishedTimes, 4) {
sort.Slice(finishedTimes, func(i, j int) bool {
return finishedTimes[i].Before(finishedTimes[j])
})

// Everything finished with a two second window
assert.True(t, finishedTimes[3].Sub(finishedTimes[0]) < time.Duration(2)*time.Second)
}
})
}
18 changes: 15 additions & 3 deletions workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"fmt"
"os"

log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/executor"
)

func (woc *wfOperationCtx) getAgentPodName() string {
Expand Down Expand Up @@ -80,6 +82,18 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
}
}

envVars := []apiv1.EnvVar{
{Name: common.EnvVarWorkflowName, Value: woc.wf.Name},
}

// If the default number of task workers is overridden, then pass it to the agent pod.
if taskWorkers, exists := os.LookupEnv(executor.EnvAgentTaskWorkers); exists {
envVars = append(envVars, apiv1.EnvVar{
Name: executor.EnvAgentTaskWorkers,
Value: taskWorkers,
})
}

pod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Expand All @@ -101,9 +115,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
Command: []string{"argoexec"},
Args: []string{"agent"},
Image: woc.controller.executorImage(),
Env: []apiv1.EnvVar{
{Name: common.EnvVarWorkflowName, Value: woc.wf.Name},
},
Env: envVars,
},
},
},
Expand Down
Loading

0 comments on commit 160bdc6

Please sign in to comment.