Skip to content

Commit

Permalink
refactor: Remove the need for pod annotations to be mounted as a volu…
Browse files Browse the repository at this point in the history
…me (#6022)

Signed-off-by: Antony Chazapis <chazapis@ics.forth.gr>
  • Loading branch information
chazapis committed Jun 22, 2021
1 parent 0e94283 commit cecc379
Show file tree
Hide file tree
Showing 22 changed files with 411 additions and 704 deletions.
9 changes: 2 additions & 7 deletions .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,8 @@ jobs:
- run: make install controller cli $(go env GOPATH)/bin/goreman PROFILE=${{matrix.profile}} E2E_EXECUTOR=${{matrix.containerRuntimeExecutor}} AUTH_MODE=client STATIC_FILES=false LOG_LEVEL=info
- run: make start PROFILE=${{matrix.profile}} E2E_EXECUTOR=${{matrix.containerRuntimeExecutor}} AUTH_MODE=client STATIC_FILES=false LOG_LEVEL=info UI=false > /tmp/log/argo-e2e/argo.log 2>&1 &
timeout-minutes: 4
- name: make/pull argoexec-image
run: |
if [ ${{matrix.test}} == test-executor ]; then
make argoexec-image STATIC_FILES=false
else
docker pull argoproj/argoexec:latest
fi
- name: make argoexec-image
run: make argoexec-image STATIC_FILES=false
- run: make wait
timeout-minutes: 4
- run: make ${{matrix.test}} E2E_TIMEOUT=1m STATIC_FILES=false
Expand Down
25 changes: 14 additions & 11 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"encoding/json"
"fmt"
"os"
"time"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"

"github.com/argoproj/pkg/cli"
kubecli "github.com/argoproj/pkg/kube/cli"
Expand Down Expand Up @@ -32,11 +35,10 @@ const (
)

var (
clientConfig clientcmd.ClientConfig
logLevel string // --loglevel
glogLevel int // --gloglevel
logFormat string // --log-format
podAnnotationsPath string // --pod-annotations
clientConfig clientcmd.ClientConfig
logLevel string // --loglevel
glogLevel int // --gloglevel
logFormat string // --log-format
)

func init() {
Expand Down Expand Up @@ -66,7 +68,6 @@ func NewRootCommand() *cobra.Command {
command.AddCommand(cmd.NewVersionCmd(CLIName))

clientConfig = kubecli.AddKubectlFlagsToCmd(&command)
command.PersistentFlags().StringVar(&podAnnotationsPath, "pod-annotations", common.PodMetadataAnnotationsPath, "Pod annotations file from k8s downward API")
command.PersistentFlags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
command.PersistentFlags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")
command.PersistentFlags().StringVar(&logFormat, "log-format", "text", "The formatter to use for logs. One of: text|json")
Expand Down Expand Up @@ -97,10 +98,12 @@ func initExecutor() *executor.WorkflowExecutor {
log.Fatalf("Unable to determine pod name from environment variable %s", common.EnvVarPodName)
}

tmpl, err := executor.LoadTemplate(podAnnotationsPath)
checkErr(err)
tmpl := &wfv1.Template{}
checkErr(json.Unmarshal([]byte(os.Getenv(common.EnvVarTemplate)), tmpl))

includeScriptOutput := os.Getenv(common.EnvVarIncludeScriptOutput) == "true"
deadline, err := time.Parse(time.RFC3339, os.Getenv(common.EnvVarDeadline))
checkErr(err)

var cre executor.ContainerRuntimeExecutor
switch executorType {
Expand All @@ -117,14 +120,14 @@ func initExecutor() *executor.WorkflowExecutor {
}
checkErr(err)

wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, podAnnotationsPath, cre, *tmpl, includeScriptOutput)
yamlBytes, _ := json.Marshal(&wfExecutor.Template)
wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, cre, *tmpl, includeScriptOutput, deadline)
log.
WithField("version", version.String()).
WithField("namespace", namespace).
WithField("podName", podName).
WithField("template", string(yamlBytes)).
WithField("template", wfv1.MustMarshallJSON(&wfExecutor.Template)).
WithField("includeScriptOutput", includeScriptOutput).
WithField("deadline", deadline).
Info("Executor initialized")
return &wfExecutor
}
Expand Down
30 changes: 4 additions & 26 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package common

import (
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
Expand All @@ -17,16 +15,6 @@ const (
InitContainerName = "init"
WaitContainerName = "wait"

// PodMetadataVolumeName is the volume name defined in a workflow pod spec to expose pod metadata via downward API
PodMetadataVolumeName = "podmetadata"

// PodMetadataAnnotationsVolumePath is volume path for metadata.annotations in the downward API
PodMetadataAnnotationsVolumePath = "annotations"
// PodMetadataMountPath is the directory mount location for DownwardAPI volume containing pod metadata
PodMetadataMountPath = "/argo/" + PodMetadataVolumeName
// PodMetadataAnnotationsPath is the file path containing pod metadata annotations. Examined by executor
PodMetadataAnnotationsPath = PodMetadataMountPath + "/" + PodMetadataAnnotationsVolumePath

// DockerSockVolumeName is the volume name for the /var/run/docker.sock host path volume
DockerSockVolumeName = "docker-sock"

Expand All @@ -39,14 +27,8 @@ const (
AnnotationKeyRBACRule = workflow.WorkflowFullName + "/rbac-rule"
AnnotationKeyRBACRulePrecedence = workflow.WorkflowFullName + "/rbac-rule-precedence"

// AnnotationKeyTemplate is the pod metadata annotation key containing the container template as JSON
AnnotationKeyTemplate = workflow.WorkflowFullName + "/template"
// AnnotationKeyOutputs is the pod metadata annotation key containing the container outputs
AnnotationKeyOutputs = workflow.WorkflowFullName + "/outputs"
// AnnotationKeyExecutionControl is the pod metadata annotation key containing execution control parameters
// set by the controller and obeyed by the executor. For example, the controller will use this annotation to
// signal the executors of daemoned containers that it should terminate.
AnnotationKeyExecutionControl = workflow.WorkflowFullName + "/execution"
// AnnotationKeyCronWfScheduledTime is the workflow metadata annotation key containing the time when the workflow
// was scheduled to run by CronWorkflow.
AnnotationKeyCronWfScheduledTime = workflow.WorkflowFullName + "/scheduled-time"
Expand Down Expand Up @@ -107,8 +89,12 @@ const (
EnvVarPodName = "ARGO_POD_NAME"
// EnvVarContainerName container the container's name for the current pod
EnvVarContainerName = "ARGO_CONTAINER_NAME"
// EnvVarDeadline is the deadline for the pod
EnvVarDeadline = "ARGO_DEADLINE"
// EnvVarIncludeScriptOutput capture the stdout and stderr
EnvVarIncludeScriptOutput = "ARGO_INCLUDE_SCRIPT_OUTPUT"
// EnvVarTemplate is the template
EnvVarTemplate = "ARGO_TEMPLATE"
// EnvVarContainerRuntimeExecutor contains the name of the container runtime executor to use, empty is equal to "docker"
EnvVarContainerRuntimeExecutor = "ARGO_CONTAINER_RUNTIME_EXECUTOR"
// EnvVarDownwardAPINodeIP is the envvar used to get the `status.hostIP`
Expand Down Expand Up @@ -186,14 +172,6 @@ var AnnotationKeyKillCmd = func(containerName string) string { return workflow.W
// GlobalVarWorkflowRootTags is a list of root tags in workflow which could be used for variable reference
var GlobalVarValidWorkflowVariablePrefix = []string{"item.", "steps.", "inputs.", "outputs.", "pod.", "workflow.", "tasks."}

// ExecutionControl contains execution control parameters for executor to decide how to execute the container
type ExecutionControl struct {
// Deadline is a max timestamp in which an executor can run the container before terminating it
// It is used to signal the executor to terminate a daemoned container. In the future it will be
// used to support workflow or steps/dag level timeouts.
Deadline *time.Time `json:"deadline,omitempty"`
}

func UnstructuredHasCompletedLabel(obj interface{}) bool {
if wf, ok := obj.(*unstructured.Unstructured); ok {
return wf.GetLabels()[LabelKeyCompleted] == "true"
Expand Down
31 changes: 0 additions & 31 deletions workflow/controller/container_set_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,6 @@ spec:

socket := corev1.HostPathSocket
assert.ElementsMatch(t, []corev1.Volume{
{
Name: "podmetadata",
VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{{
Path: "annotations",
FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
}},
}},
},
{Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
{Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
}, pod.Spec.Volumes)
Expand All @@ -64,7 +55,6 @@ spec:
switch c.Name {
case common.WaitContainerName:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "podmetadata", MountPath: "/argo/podmetadata"},
{Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
}, c.VolumeMounts)
case "ctr-0":
Expand Down Expand Up @@ -120,15 +110,6 @@ spec:

socket := corev1.HostPathSocket
assert.ElementsMatch(t, []corev1.Volume{
{
Name: "podmetadata",
VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{{
Path: "annotations",
FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
}},
}},
},
{Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
{Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
{Name: "input-artifacts", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
Expand All @@ -137,7 +118,6 @@ spec:
if assert.Len(t, pod.Spec.InitContainers, 1) {
c := pod.Spec.InitContainers[0]
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "podmetadata", MountPath: "/argo/podmetadata"},
{Name: "input-artifacts", MountPath: "/argo/inputs/artifacts"},
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
}, c.VolumeMounts)
Expand All @@ -148,7 +128,6 @@ spec:
switch c.Name {
case common.WaitContainerName:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "podmetadata", MountPath: "/argo/podmetadata"},
{Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
{Name: "input-artifacts", MountPath: "/mainctrfs/in/in-0", SubPath: "in-0"},
Expand Down Expand Up @@ -207,15 +186,6 @@ spec:

socket := corev1.HostPathSocket
assert.ElementsMatch(t, []corev1.Volume{
{
Name: "podmetadata",
VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{{
Path: "annotations",
FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
}},
}},
},
{Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
{Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
}, pod.Spec.Volumes)
Expand All @@ -227,7 +197,6 @@ spec:
switch c.Name {
case common.WaitContainerName:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "podmetadata", MountPath: "/argo/podmetadata"},
{Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
}, c.VolumeMounts)
Expand Down
35 changes: 31 additions & 4 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,24 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
err := func() error {
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
switch action {
case shutdownPod:
// to shutdown a pod, we signal the wait container to terminate, the wait container in turn will
// kill the main container (using whatever mechanism the executor uses), and will then exit itself
// once the main container exited
pod, err := wfc.getPod(namespace, podName)
if pod == nil || err != nil {
return err
}
for _, c := range pod.Spec.Containers {
if c.Name == common.WaitContainerName {
if err := signal.SignalContainer(wfc.restConfig, pod, common.WaitContainerName, syscall.SIGTERM); err != nil {
return err
}
return nil // done
}
}
// no wait container found
fallthrough
case terminateContainers:
if terminationGracePeriod, err := wfc.signalContainers(namespace, podName, syscall.SIGTERM); err != nil {
return err
Expand Down Expand Up @@ -509,18 +527,27 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
return true
}

func (wfc *WorkflowController) signalContainers(namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.Pod, error) {
obj, exists, err := wfc.podInformer.GetStore().GetByKey(namespace + "/" + podName)
if err != nil {
return 0, err
return nil, err
}
if !exists {
return 0, nil
return nil, nil
}
pod, ok := obj.(*apiv1.Pod)
if !ok {
return 0, fmt.Errorf("object is not a pod")
return nil, fmt.Errorf("object is not a pod")
}
return pod, nil
}

func (wfc *WorkflowController) signalContainers(namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
pod, err := wfc.getPod(namespace, podName)
if pod == nil || err != nil {
return 0, err
}

for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName || c.State.Terminated != nil {
continue
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl

defer func() {
if woc.wf.Status.Nodes[node.ID].Fulfilled() {
_ = woc.killDaemonedChildren(ctx, node.ID)
woc.killDaemonedChildren(node.ID)
}
}()

Expand Down
Loading

0 comments on commit cecc379

Please sign in to comment.