diff --git a/pkg/function/kube_task.go b/pkg/function/kube_task.go index fa95fd4025..74b10c4cf8 100644 --- a/pkg/function/kube_task.go +++ b/pkg/function/kube_task.go @@ -16,6 +16,7 @@ package function import ( "context" + "path" "time" "github.com/pkg/errors" @@ -33,7 +34,9 @@ import ( ) const ( - jobPrefix = "kanister-job-" + jobPrefix = "kanister-job-" + jobIDSuffix = "JobID" + // KubeTaskFuncName gives the function name KubeTaskFuncName = "KubeTask" KubeTaskNamespaceArg = "namespace" @@ -64,7 +67,8 @@ func kubeTask(ctx context.Context, cli kubernetes.Interface, namespace, image st Command: command, PodOverride: podOverride, } - + // Mark pod with label having key `kanister.io/JobID`, the value of which is a reference to the origin of the pod. + kube.AddLabelsToPodOptionsFromContext(ctx, options, path.Join(consts.LabelPrefix, jobIDSuffix)) pr := kube.NewPodRunner(cli, options) podFunc := kubeTaskPodFunc() return pr.Run(ctx, podFunc) diff --git a/pkg/kube/pod_runner_test.go b/pkg/kube/pod_runner_test.go index b072abb1ee..85d250de7a 100644 --- a/pkg/kube/pod_runner_test.go +++ b/pkg/kube/pod_runner_test.go @@ -17,12 +17,17 @@ package kube import ( "context" "os" + "path" + "github.com/pkg/errors" . "gopkg.in/check.v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/testing" + + "github.com/kanisterio/kanister/pkg/consts" + "github.com/kanisterio/kanister/pkg/field" ) type PodRunnerTestSuite struct{} @@ -109,9 +114,87 @@ func (s *PodRunnerTestSuite) TestPodRunnerForSuccessCase(c *C) { cancel() } +// TestPodRunnerWithDebugLabelForSuccessCase adds a debug entry into the context and verifies the +// pod got created with corresponding label using the entry or not. +func (s *PodRunnerTestSuite) TestPodRunnerWithDebugLabelForSuccessCase(c *C) { + jobIDSuffix := "JobID" + for _, tc := range []struct { + name string + targetKey string + contextKey string + contextValue string + isLabelExpected bool + }{ + { + name: "target key (kanister.io/JobID) present in pod labels", + targetKey: path.Join(consts.LabelPrefix, jobIDSuffix), + contextKey: path.Join(consts.LabelPrefix, jobIDSuffix), + contextValue: "xyz123", + isLabelExpected: true, + }, + { + name: "target key (kanister.io/JobID) not present in pod labels", + targetKey: path.Join(consts.LabelPrefix, jobIDSuffix), + contextKey: path.Join(consts.LabelPrefix, "NonJobID"), + contextValue: "some-other-value", + isLabelExpected: false, + }, + } { + ctx, cancel := context.WithCancel(context.Background()) + ctx = field.Context(ctx, tc.contextKey, tc.contextValue) + cli := fake.NewSimpleClientset() + cli.PrependReactor("create", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) { + return false, nil, nil + }) + cli.PrependReactor("get", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) { + p := &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + return true, p, nil + }) + po := &PodOptions{ + Namespace: podRunnerNS, + Name: podName, + Command: []string{"sh", "-c", "tail -f /dev/null"}, + } + deleted := make(chan struct{}) + cli.PrependReactor("delete", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) { + c.Log("Pod deleted due to Context Cancelled") + close(deleted) + return true, nil, nil + }) + AddLabelsToPodOptionsFromContext(ctx, po, tc.targetKey) + pr := NewPodRunner(cli, po) + errorCh := make(chan error) + go func() { + _, err := pr.Run(ctx, afterPodRunTestKeyPresentFunc(tc.targetKey, tc.contextValue, tc.isLabelExpected, deleted)) + errorCh <- err + }() + deleted <- struct{}{} + c.Assert(<-errorCh, IsNil) + cancel() + } +} + func makePodRunnerTestFunc(ch chan struct{}) func(ctx context.Context, pc PodController) (map[string]interface{}, error) { return func(ctx context.Context, pc PodController) (map[string]interface{}, error) { <-ch return nil, nil } } + +func afterPodRunTestKeyPresentFunc(labelKey, expectedLabelValue string, isLabelExpected bool, ch chan struct{}) func(ctx context.Context, pc PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc PodController) (map[string]interface{}, error) { + <-ch + labelValue, found := pc.Pod().Labels[labelKey] + if found != isLabelExpected { + return nil, errors.New("Got different label than expected") + } + if isLabelExpected && labelValue != expectedLabelValue { + return nil, errors.New("Found label doesn't match with expected label") + } + return nil, nil + } +} diff --git a/pkg/kube/utils.go b/pkg/kube/utils.go index 98112cbcab..854e16683c 100644 --- a/pkg/kube/utils.go +++ b/pkg/kube/utils.go @@ -21,6 +21,8 @@ import ( osversioned "github.com/openshift/client-go/apps/clientset/versioned" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" + + "github.com/kanisterio/kanister/pkg/field" ) const ( @@ -171,3 +173,25 @@ func PVCContainsReadOnlyAccessMode(pvc *corev1.PersistentVolumeClaim) bool { return false } + +// AddLabelsToPodOptionsFromContext adds a label to `PodOptions`. It extracts the value from the context +// if targetKey is present and assigns to the options. +func AddLabelsToPodOptionsFromContext( + ctx context.Context, + options *PodOptions, + targetKey string, +) { + fields := field.FromContext(ctx) + if fields == nil { + return + } + if options.Labels == nil { + options.Labels = make(map[string]string) + } + for _, f := range fields.Fields() { + if f.Key() == targetKey { + options.Labels[targetKey] = f.Value().(string) + return + } + } +}