From 0d05fecb3beef092edf37e7a712aaee06bfc3fac Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Tue, 2 Oct 2018 14:47:09 -0700 Subject: [PATCH 01/18] Support return values from Kanister functions - Phase 1 (#3984) * Support return values from Kanister functions * Change all Kanister functions to return output * Change function signature in all the tests * Move RenderArgs logic into Exec * Add output function unit test * Fix phase unit test * Minor improvement to func unit test --- pkg/apis/cr/v1alpha1/deepcopy.go | 8 ++++ pkg/apis/cr/v1alpha1/types.go | 5 ++- pkg/apis/cr/v1alpha1/zz_generated.deepcopy.go | 10 ++--- pkg/controller/controller.go | 11 +++-- pkg/controller/controller_test.go | 5 +++ pkg/function/backup_data.go | 24 +++++------ pkg/function/data_test.go | 2 +- pkg/function/delete_data.go | 10 ++--- pkg/function/kube_exec.go | 14 +++---- pkg/function/kube_exec_all.go | 14 +++---- pkg/function/kube_exec_all_test.go | 10 +++-- pkg/function/kube_exec_test.go | 5 ++- pkg/function/kube_task.go | 10 ++--- pkg/function/kube_task_test.go | 2 +- pkg/function/prepare_data.go | 18 ++++---- pkg/function/prepare_data_test.go | 2 +- pkg/function/restore_data.go | 28 ++++++------- pkg/function/scale_test.go | 12 +++--- pkg/function/scale_workload.go | 12 +++--- pkg/kanister.go | 2 +- pkg/phase.go | 32 +++++++++++---- pkg/phase_test.go | 6 +-- pkg/testutil/func.go | 41 ++++++++++++------- pkg/testutil/func_test.go | 17 ++++++-- 24 files changed, 178 insertions(+), 122 deletions(-) diff --git a/pkg/apis/cr/v1alpha1/deepcopy.go b/pkg/apis/cr/v1alpha1/deepcopy.go index e15fb6318e..77d2007e78 100644 --- a/pkg/apis/cr/v1alpha1/deepcopy.go +++ b/pkg/apis/cr/v1alpha1/deepcopy.go @@ -7,3 +7,11 @@ func (in *BlueprintPhase) DeepCopyInto(out *BlueprintPhase) { // TODO: Handle 'Args' return } + +// DeepCopyInto handles the Phase deep copies, copying the receiver, writing into out. in must be non-nil. +// This is a workaround to handle the map[string]interface{} output type +func (in *Phase) DeepCopyInto(out *Phase) { + *out = *in + // TODO: Handle 'Output' map[string]interface{} + return +} diff --git a/pkg/apis/cr/v1alpha1/types.go b/pkg/apis/cr/v1alpha1/types.go index 3d4e08c078..d0e825b94a 100644 --- a/pkg/apis/cr/v1alpha1/types.go +++ b/pkg/apis/cr/v1alpha1/types.go @@ -137,8 +137,9 @@ const ( // Phase is subcomponent of an action. type Phase struct { - Name string `json:"name"` - State State `json:"state"` + Name string `json:"name"` + State State `json:"state"` + Output map[string]interface{} `json:"output"` } // k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/cr/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/cr/v1alpha1/zz_generated.deepcopy.go index 662a3353bd..e8d8956f82 100644 --- a/pkg/apis/cr/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/cr/v1alpha1/zz_generated.deepcopy.go @@ -200,7 +200,9 @@ func (in *ActionStatus) DeepCopyInto(out *ActionStatus) { if in.Phases != nil { in, out := &in.Phases, &out.Phases *out = make([]Phase, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } if in.Artifacts != nil { in, out := &in.Artifacts, &out.Artifacts @@ -470,12 +472,6 @@ func (in *ObjectReference) DeepCopy() *ObjectReference { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Phase) DeepCopyInto(out *Phase) { - *out = *in - return -} - // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Phase. func (in *Phase) DeepCopy() *Phase { if in == nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7d64a06936..9603186fea 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -361,9 +361,13 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI for i, p := range phases { c.logAndSuccessEvent(fmt.Sprintf("Executing phase %s", p.Name()), "Started Phase", as) err = param.InitPhaseParams(ctx, c.clientset, tp, p.Name(), p.Objects()) - if err == nil { - err = p.Exec(ctx, *tp) + if err != nil { + reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name) + msg := fmt.Sprintf("Failed to execute phase: %#v:", as.Status.Actions[aIDX].Phases[i]) + c.logAndErrorEvent(msg, reason, err, as, bp) + return } + output, err := p.Exec(ctx, *bp, action.Name, *tp) var rf func(*crv1alpha1.ActionSet) error if err != nil { rf = func(ras *crv1alpha1.ActionSet) error { @@ -375,6 +379,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI rf = func(ras *crv1alpha1.ActionSet) error { ras.Status.Actions[aIDX].Artifacts = arts ras.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateComplete + ras.Status.Actions[aIDX].Phases[i].Output = output return nil } } @@ -390,7 +395,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI c.logAndErrorEvent(msg, reason, err, as, bp) return } - param.UpdatePhaseParams(ctx, tp, p.Name(), nil) + param.UpdatePhaseParams(ctx, tp, p.Name(), output) c.logAndSuccessEvent(fmt.Sprintf("Completed phase %s", p.Name()), "Ended Phase", as) } }() diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 7f364c18a8..e27b448bb9 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -221,6 +221,9 @@ func (s *ControllerSuite) TestExecActionSet(c *C) { { funcNames: []string{testutil.ArgFuncName, testutil.FailFuncName}, }, + { + funcNames: []string{testutil.OutputFuncName}, + }, } { var err error // Add a blueprint with a mocked kanister function. @@ -259,6 +262,8 @@ func (s *ControllerSuite) TestExecActionSet(c *C) { testutil.ReleaseWaitFunc() case testutil.ArgFuncName: c.Assert(testutil.ArgFuncArgs(), DeepEquals, map[string]interface{}{"key": "myValue"}) + case testutil.OutputFuncName: + c.Assert(testutil.OutputFuncOut(), DeepEquals, map[string]interface{}{"key": "myValue"}) } } diff --git a/pkg/function/backup_data.go b/pkg/function/backup_data.go index 0895ca091d..476aa4dc2f 100644 --- a/pkg/function/backup_data.go +++ b/pkg/function/backup_data.go @@ -69,34 +69,34 @@ func validateProfile(profile *param.Profile) error { return nil } -func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { var namespace, pod, container, includePath, backupArtifactPrefix, backupIdentifier string var err error if err = Arg(args, BackupDataNamespaceArg, &namespace); err != nil { - return err + return nil, err } if err = Arg(args, BackupDataPodArg, &pod); err != nil { - return err + return nil, err } if err = Arg(args, BackupDataContainerArg, &container); err != nil { - return err + return nil, err } if err = Arg(args, BackupDataIncludePathArg, &includePath); err != nil { - return err + return nil, err } if err = Arg(args, BackupDataBackupArtifactPrefixArg, &backupArtifactPrefix); err != nil { - return err + return nil, err } if err = Arg(args, BackupDataBackupIdentifierArg, &backupIdentifier); err != nil { - return err + return nil, err } // Validate the Profile if err = validateProfile(tp.Profile); err != nil { - return errors.Wrapf(err, "Failed to validate Profile") + return nil, errors.Wrapf(err, "Failed to validate Profile") } cli, err := kube.NewClient() if err != nil { - return errors.Wrapf(err, "Failed to create Kubernetes client") + return nil, errors.Wrapf(err, "Failed to create Kubernetes client") } // Use the snapshots command to check if the repository exists cmd := generateSnapshotsCommand(backupArtifactPrefix, tp.Profile) @@ -110,7 +110,7 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m formatAndLog(pod, container, stdout) formatAndLog(pod, container, stderr) if err != nil { - return errors.Wrapf(err, "Failed to create object store backup location") + return nil, errors.Wrapf(err, "Failed to create object store backup location") } } // Create backup and dump it on the object store @@ -119,9 +119,9 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m formatAndLog(pod, container, stdout) formatAndLog(pod, container, stderr) if err != nil { - return errors.Wrapf(err, "Failed to create and upload backup") + return nil, errors.Wrapf(err, "Failed to create and upload backup") } - return nil + return nil, nil } func (*backupDataFunc) RequiredArgs() []string { diff --git a/pkg/function/data_test.go b/pkg/function/data_test.go index 5852e24b9c..0d03f17f15 100644 --- a/pkg/function/data_test.go +++ b/pkg/function/data_test.go @@ -147,7 +147,7 @@ func (s *DataSuite) TestBackupRestoreData(c *C) { phases, err := kanister.GetPhases(bp, actionName, *tp) c.Assert(err, IsNil) for _, p := range phases { - err := p.Exec(context.Background(), *tp) + _, err = p.Exec(context.Background(), bp, actionName, *tp) c.Assert(err, IsNil) } } diff --git a/pkg/function/delete_data.go b/pkg/function/delete_data.go index 24a0cb44b0..ce9d90b52a 100644 --- a/pkg/function/delete_data.go +++ b/pkg/function/delete_data.go @@ -47,23 +47,23 @@ func generateDeleteCommand(artifact string, profile *param.Profile) []string { return []string{"bash", "-o", "errexit", "-o", "pipefail", "-c", command} } -func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { var artifact, namespace string var err error if err = Arg(args, DeleteDataArtifactArg, &artifact); err != nil { - return err + return nil, err } if err = OptArg(args, DeleteDataNamespaceArg, &namespace, ""); err != nil { - return err + return nil, err } // Validate the Profile if err = validateProfile(tp.Profile); err != nil { - return errors.Wrapf(err, "Failed to validate Profile") + return nil, errors.Wrapf(err, "Failed to validate Profile") } // Generate delete command cmd := generateDeleteCommand(artifact, tp.Profile) // Use KubeTask to delete the artifact - return kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.12.0", cmd) + return nil, kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.12.0", cmd) } func (*deleteDataFunc) RequiredArgs() []string { diff --git a/pkg/function/kube_exec.go b/pkg/function/kube_exec.go index f5708246a4..5413664e5f 100644 --- a/pkg/function/kube_exec.go +++ b/pkg/function/kube_exec.go @@ -33,24 +33,24 @@ func (*kubeExecFunc) Name() string { return "KubeExec" } -func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { cli, err := kube.NewClient() if err != nil { - return err + return nil, err } var namespace, pod, container string var cmd []string if err = Arg(args, KubeExecNamespaceArg, &namespace); err != nil { - return err + return nil, err } if err = Arg(args, KubeExecPodNameArg, &pod); err != nil { - return err + return nil, err } if err = Arg(args, KubeExecContainerNameArg, &container); err != nil { - return err + return nil, err } if err = Arg(args, KubeExecCommandArg, &cmd); err != nil { - return err + return nil, err } stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) @@ -70,7 +70,7 @@ func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args } } } - return err + return nil, err } func (*kubeExecFunc) RequiredArgs() []string { diff --git a/pkg/function/kube_exec_all.go b/pkg/function/kube_exec_all.go index 859f2531d7..c034627f86 100644 --- a/pkg/function/kube_exec_all.go +++ b/pkg/function/kube_exec_all.go @@ -35,28 +35,28 @@ func (*kubeExecAllFunc) Name() string { return "KubeExecAll" } -func (*kubeExecAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (*kubeExecAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { cli, err := kube.NewClient() if err != nil { - return err + return nil, err } var namespace, pods, containers string var cmd []string if err = Arg(args, KubeExecAllNamespaceArg, &namespace); err != nil { - return err + return nil, err } if err = Arg(args, KubeExecAllPodsNameArg, &pods); err != nil { - return err + return nil, err } if err = Arg(args, KubeExecAllContainersNameArg, &containers); err != nil { - return err + return nil, err } if err = Arg(args, KubeExecAllCommandArg, &cmd); err != nil { - return err + return nil, err } ps := strings.Fields(pods) cs := strings.Fields(containers) - return execAll(cli, namespace, ps, cs, cmd) + return nil, execAll(cli, namespace, ps, cs, cmd) } func (*kubeExecAllFunc) RequiredArgs() []string { diff --git a/pkg/function/kube_exec_all_test.go b/pkg/function/kube_exec_all_test.go index 71ea248813..3a2a68633d 100644 --- a/pkg/function/kube_exec_all_test.go +++ b/pkg/function/kube_exec_all_test.go @@ -112,10 +112,11 @@ func (s *KubeExecAllTest) TestKubeExecAllDeployment(c *C) { c.Assert(err, IsNil) action := "echo" - phases, err := kanister.GetPhases(*newExecAllBlueprint(kind), action, *tp) + bp := newExecAllBlueprint(kind) + phases, err := kanister.GetPhases(*bp, action, *tp) c.Assert(err, IsNil) for _, p := range phases { - err = p.Exec(ctx, *tp) + _, err = p.Exec(ctx, *bp, action, *tp) c.Assert(err, IsNil) } } @@ -145,10 +146,11 @@ func (s *KubeExecAllTest) TestKubeExecAllStatefulSet(c *C) { c.Assert(err, IsNil) action := "echo" - phases, err := kanister.GetPhases(*newExecAllBlueprint(kind), action, *tp) + bp := newExecAllBlueprint(kind) + phases, err := kanister.GetPhases(*bp, action, *tp) c.Assert(err, IsNil) for _, p := range phases { - err = p.Exec(ctx, *tp) + _, err = p.Exec(ctx, *bp, action, *tp) c.Assert(err, IsNil) } } diff --git a/pkg/function/kube_exec_test.go b/pkg/function/kube_exec_test.go index 0229e807a9..76ac9a08c9 100644 --- a/pkg/function/kube_exec_test.go +++ b/pkg/function/kube_exec_test.go @@ -141,10 +141,11 @@ func (s *KubeExecTest) TestKubeExec(c *C) { c.Assert(err, IsNil) action := "echo" - phases, err := kanister.GetPhases(*newKubeExecBlueprint(), action, *tp) + bp := newKubeExecBlueprint() + phases, err := kanister.GetPhases(*bp, action, *tp) c.Assert(err, IsNil) for _, p := range phases { - err := p.Exec(context.Background(), *tp) + _, err = p.Exec(context.Background(), *bp, action, *tp) c.Assert(err, IsNil) } } diff --git a/pkg/function/kube_task.go b/pkg/function/kube_task.go index 47279196bb..0500d74963 100644 --- a/pkg/function/kube_task.go +++ b/pkg/function/kube_task.go @@ -67,20 +67,20 @@ func kubeTask(ctx context.Context, namespace, image string, command []string) er return nil } -func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { var namespace, image string var command []string var err error if err = Arg(args, KubeTaskImageArg, &image); err != nil { - return err + return nil, err } if err = Arg(args, KubeTaskCommandArg, &command); err != nil { - return err + return nil, err } if err = OptArg(args, KubeTaskNamespaceArg, &namespace, ""); err != nil { - return err + return nil, err } - return kubeTask(ctx, namespace, image, command) + return nil, kubeTask(ctx, namespace, image, command) } func (*kubeTaskFunc) RequiredArgs() []string { diff --git a/pkg/function/kube_task_test.go b/pkg/function/kube_task_test.go index c20919d336..1f5d93face 100644 --- a/pkg/function/kube_task_test.go +++ b/pkg/function/kube_task_test.go @@ -95,7 +95,7 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) { phases, err := kanister.GetPhases(*bp, action, tp) c.Assert(err, IsNil) for _, p := range phases { - err := p.Exec(ctx, tp) + _, err = p.Exec(ctx, *bp, action, tp) c.Assert(err, IsNil) } } diff --git a/pkg/function/prepare_data.go b/pkg/function/prepare_data.go index a4adf09e90..e950381a6c 100644 --- a/pkg/function/prepare_data.go +++ b/pkg/function/prepare_data.go @@ -79,36 +79,36 @@ func prepareData(ctx context.Context, cli kubernetes.Interface, namespace, servi return nil } -func (*prepareDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (*prepareDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { var namespace, image, serviceAccount string var command []string var vols map[string]string var err error if err = Arg(args, PrepareDataNamespaceArg, &namespace); err != nil { - return err + return nil, err } if err = Arg(args, PrepareDataImageArg, &image); err != nil { - return err + return nil, err } if err = Arg(args, PrepareDataCommandArg, &command); err != nil { - return err + return nil, err } if err = OptArg(args, PrepareDataVolumes, &vols, nil); err != nil { - return err + return nil, err } if err = OptArg(args, PrepareDataServiceAccount, &serviceAccount, ""); err != nil { - return err + return nil, err } cli, err := kube.NewClient() if err != nil { - return errors.Wrapf(err, "Failed to create Kubernetes client") + return nil, errors.Wrapf(err, "Failed to create Kubernetes client") } if len(vols) == 0 { if vols, err = getVolumes(tp); err != nil { - return err + return nil, err } } - return prepareData(ctx, cli, namespace, serviceAccount, image, vols, command...) + return nil, prepareData(ctx, cli, namespace, serviceAccount, image, vols, command...) } func (*prepareDataFunc) RequiredArgs() []string { diff --git a/pkg/function/prepare_data_test.go b/pkg/function/prepare_data_test.go index 502c13d420..f9f06b0708 100644 --- a/pkg/function/prepare_data_test.go +++ b/pkg/function/prepare_data_test.go @@ -133,7 +133,7 @@ func (s *PrepareDataSuite) TestPrepareData(c *C) { phases, err := kanister.GetPhases(*bp, action, tp) c.Assert(err, IsNil) for _, p := range phases { - err := p.Exec(ctx, tp) + _, err = p.Exec(ctx, *bp, action, tp) c.Assert(err, IsNil) } } diff --git a/pkg/function/restore_data.go b/pkg/function/restore_data.go index 09c08591c3..9669df98ca 100644 --- a/pkg/function/restore_data.go +++ b/pkg/function/restore_data.go @@ -72,56 +72,56 @@ func generateRestoreCommand(backupArtifactPrefix, restorePath, id string, profil return []string{"bash", "-o", "errexit", "-o", "pipefail", "-c", command}, nil } -func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { var namespace, pod, image, backupArtifactPrefix, restorePath, backupIdentifier string var vols map[string]string var err error if err = Arg(args, RestoreDataNamespaceArg, &namespace); err != nil { - return err + return nil, err } if err = Arg(args, RestoreDataImageArg, &image); err != nil { - return err + return nil, err } if err = Arg(args, RestoreDataBackupArtifactPrefixArg, &backupArtifactPrefix); err != nil { - return err + return nil, err } if err = Arg(args, RestoreDataBackupIdentifierArg, &backupIdentifier); err != nil { - return err + return nil, err } if err = OptArg(args, RestoreDataRestorePathArg, &restorePath, "/"); err != nil { - return err + return nil, err } if err = OptArg(args, RestoreDataPodArg, &pod, ""); err != nil { - return err + return nil, err } if err = OptArg(args, RestoreDataVolsArg, &vols, nil); err != nil { - return err + return nil, err } if err = validateOptArgs(pod, vols); err != nil { - return err + return nil, err } // Validate profile if err = validateProfile(tp.Profile); err != nil { - return err + return nil, err } // Generate restore command cmd, err := generateRestoreCommand(backupArtifactPrefix, restorePath, backupIdentifier, tp.Profile) if err != nil { - return err + return nil, err } if len(vols) == 0 { // Fetch Volumes vols, err = fetchPodVolumes(pod, tp) if err != nil { - return err + return nil, err } } // Call PrepareData with generated command cli, err := kube.NewClient() if err != nil { - return errors.Wrapf(err, "Failed to create Kubernetes client") + return nil, errors.Wrapf(err, "Failed to create Kubernetes client") } - return prepareData(ctx, cli, namespace, "", image, vols, cmd...) + return nil, prepareData(ctx, cli, namespace, "", image, vols, cmd...) } func (*restoreDataFunc) RequiredArgs() []string { diff --git a/pkg/function/scale_test.go b/pkg/function/scale_test.go index c60c3e8846..901349f2ba 100644 --- a/pkg/function/scale_test.go +++ b/pkg/function/scale_test.go @@ -142,11 +142,11 @@ func (s *ScaleSuite) TestScaleDeployment(c *C) { for _, action := range []string{"scaleUp", "echoHello", "scaleDown"} { tp, err := param.New(ctx, s.cli, s.crCli, as) c.Assert(err, IsNil) - - phases, err := kanister.GetPhases(*newScaleBlueprint(kind), action, *tp) + bp := newScaleBlueprint(kind) + phases, err := kanister.GetPhases(*bp, action, *tp) c.Assert(err, IsNil) for _, p := range phases { - err := p.Exec(context.Background(), *tp) + _, err = p.Exec(context.Background(), *bp, action, *tp) c.Assert(err, IsNil) } ok, err := kube.DeploymentReady(ctx, s.cli, d.GetNamespace(), d.GetName()) @@ -191,11 +191,11 @@ func (s *ScaleSuite) TestScaleStatefulSet(c *C) { for _, action := range []string{"scaleUp", "echoHello", "scaleDown"} { tp, err := param.New(ctx, s.cli, s.crCli, as) c.Assert(err, IsNil) - - phases, err := kanister.GetPhases(*newScaleBlueprint(kind), action, *tp) + bp := newScaleBlueprint(kind) + phases, err := kanister.GetPhases(*bp, action, *tp) c.Assert(err, IsNil) for _, p := range phases { - err := p.Exec(context.Background(), *tp) + _, err = p.Exec(context.Background(), *bp, action, *tp) c.Assert(err, IsNil) } diff --git a/pkg/function/scale_workload.go b/pkg/function/scale_workload.go index 646515243c..cc3ea932d0 100644 --- a/pkg/function/scale_workload.go +++ b/pkg/function/scale_workload.go @@ -32,25 +32,25 @@ func (*scaleWorkloadFunc) Name() string { return "ScaleWorkload" } -func (*scaleWorkloadFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (*scaleWorkloadFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { var namespace, kind, name string var replicas int32 namespace, kind, name, replicas, err := getArgs(tp, args) if err != nil { - return err + return nil, err } cli, err := kube.NewClient() if err != nil { - return errors.Wrapf(err, "Failed to create Kubernetes client") + return nil, errors.Wrapf(err, "Failed to create Kubernetes client") } switch strings.ToLower(kind) { case param.StatefulSetKind: - return kube.ScaleStatefulSet(ctx, cli, namespace, name, replicas) + return nil, kube.ScaleStatefulSet(ctx, cli, namespace, name, replicas) case param.DeploymentKind: - return kube.ScaleDeployment(ctx, cli, namespace, name, replicas) + return nil, kube.ScaleDeployment(ctx, cli, namespace, name, replicas) default: - return errors.New("Workload type not supported " + kind) + return nil, errors.New("Workload type not supported " + kind) } } diff --git a/pkg/kanister.go b/pkg/kanister.go index 14d82497a8..d9e7947a8f 100644 --- a/pkg/kanister.go +++ b/pkg/kanister.go @@ -18,7 +18,7 @@ var ( type Func interface { Name() string RequiredArgs() []string - Exec(context.Context, param.TemplateParams, map[string]interface{}) error + Exec(context.Context, param.TemplateParams, map[string]interface{}) (map[string]interface{}, error) } // Register allows Funcs to be references by User Defined YAMLs diff --git a/pkg/phase.go b/pkg/phase.go index b69488bbc9..b296cf5beb 100644 --- a/pkg/phase.go +++ b/pkg/phase.go @@ -29,7 +29,29 @@ func (p *Phase) Objects() map[string]crv1alpha1.ObjectReference { // Exec renders the argument templates in this Phase's Func and executes with // those arguments. -func (p *Phase) Exec(ctx context.Context, tp param.TemplateParams) error { +func (p *Phase) Exec(ctx context.Context, bp crv1alpha1.Blueprint, action string, tp param.TemplateParams) (map[string]interface{}, error) { + if p.args == nil { + // Get the action from Blueprint + a, ok := bp.Actions[action] + if !ok { + return nil, errors.Errorf("Action {%s} not found in action map", action) + } + // Render the argument templates for the Phase's function + for _, ap := range a.Phases { + if ap.Name != p.name { + continue + } + args, err := param.RenderArgs(ap.Args, tp) + if err != nil { + return nil, err + } + if err = checkRequiredArgs(funcs[ap.Func].RequiredArgs(), args); err != nil { + return nil, errors.Wrapf(err, "Reqired args missing for function %s", funcs[ap.Func].Name()) + } + p.args = args + } + } + // Execute the function return p.f.Exec(ctx, tp, p.args) } @@ -53,16 +75,8 @@ func GetPhases(bp crv1alpha1.Blueprint, action string, tp param.TemplateParams) if err != nil { return nil, err } - args, err := param.RenderArgs(p.Args, tp) - if err != nil { - return nil, err - } - if err = checkRequiredArgs(funcs[p.Func].RequiredArgs(), args); err != nil { - return nil, errors.Wrapf(err, "Reqired args missing for function %s", funcs[p.Func].Name()) - } phases = append(phases, &Phase{ name: p.Name, - args: args, objects: objs, f: funcs[p.Func], }) diff --git a/pkg/phase_test.go b/pkg/phase_test.go index 4f7850e9c8..70415c30d9 100644 --- a/pkg/phase_test.go +++ b/pkg/phase_test.go @@ -25,9 +25,9 @@ func (*testFunc) Name() string { return "mock" } -func (tf *testFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (tf *testFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { *tf.output = args["testKey"].(string) - return tf.err + return nil, tf.err } func (tf *testFunc) RequiredArgs() []string { @@ -68,7 +68,7 @@ func (s *PhaseSuite) TestExec(c *C) { args, err := param.RenderArgs(rawArgs, tp) c.Assert(err, IsNil) p := Phase{args: args, f: tf} - err = p.Exec(context.Background(), tp) + _, err = p.Exec(context.Background(), crv1alpha1.Blueprint{}, "", tp) c.Assert(err, IsNil) c.Assert(output, Equals, tc.expected) } diff --git a/pkg/testutil/func.go b/pkg/testutil/func.go index 66c138c335..290244883f 100644 --- a/pkg/testutil/func.go +++ b/pkg/testutil/func.go @@ -10,38 +10,47 @@ import ( ) const ( - FailFuncName = "FailFunc" - WaitFuncName = "WaitFunc" - ArgFuncName = "ArgFunc" + FailFuncName = "FailFunc" + WaitFuncName = "WaitFunc" + ArgFuncName = "ArgFunc" + OutputFuncName = "OutputFunc" ) var ( - waitFuncCh chan struct{} - argFuncCh chan map[string]interface{} + waitFuncCh chan struct{} + argFuncCh chan map[string]interface{} + outputFuncCh chan map[string]interface{} ) -func failFunc(context.Context, param.TemplateParams, map[string]interface{}) error { - return errors.New("Kanister Function Failed") +func failFunc(context.Context, param.TemplateParams, map[string]interface{}) (map[string]interface{}, error) { + return nil, errors.New("Kanister Function Failed") } -func waitFunc(context.Context, param.TemplateParams, map[string]interface{}) error { +func waitFunc(context.Context, param.TemplateParams, map[string]interface{}) (map[string]interface{}, error) { <-waitFuncCh - return nil + return nil, nil } -func argsFunc(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func argsFunc(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { argFuncCh <- args - return nil + return nil, nil +} + +func outputFunc(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { + outputFuncCh <- args + return args, nil } func init() { waitFuncCh = make(chan struct{}) argFuncCh = make(chan map[string]interface{}) + outputFuncCh = make(chan map[string]interface{}) registerMockKanisterFunc(FailFuncName, failFunc) registerMockKanisterFunc(WaitFuncName, waitFunc) registerMockKanisterFunc(ArgFuncName, argsFunc) + registerMockKanisterFunc(OutputFuncName, outputFunc) } -func registerMockKanisterFunc(name string, f func(context.Context, param.TemplateParams, map[string]interface{}) error) { +func registerMockKanisterFunc(name string, f func(context.Context, param.TemplateParams, map[string]interface{}) (map[string]interface{}, error)) { kanister.Register(&mockKanisterFunc{name: name, f: f}) } @@ -49,10 +58,10 @@ var _ kanister.Func = (*mockKanisterFunc)(nil) type mockKanisterFunc struct { name string - f func(context.Context, param.TemplateParams, map[string]interface{}) error + f func(context.Context, param.TemplateParams, map[string]interface{}) (map[string]interface{}, error) } -func (mf *mockKanisterFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error { +func (mf *mockKanisterFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { return mf.f(ctx, tp, args) } @@ -68,6 +77,10 @@ func ArgFuncArgs() map[string]interface{} { return <-argFuncCh } +func OutputFuncOut() map[string]interface{} { + return <-outputFuncCh +} + func (mf *mockKanisterFunc) RequiredArgs() []string { return nil } diff --git a/pkg/testutil/func_test.go b/pkg/testutil/func_test.go index 375a9caadb..3d5ac6b99a 100644 --- a/pkg/testutil/func_test.go +++ b/pkg/testutil/func_test.go @@ -21,7 +21,7 @@ func (s *FuncSuite) TearDownSuite(c *C) { func (s *FuncSuite) TestFailFunc(c *C) { ctx := context.Background() - err := failFunc(ctx, param.TemplateParams{}, nil) + _, err := failFunc(ctx, param.TemplateParams{}, nil) c.Assert(err, NotNil) } @@ -29,7 +29,7 @@ func (s *FuncSuite) TestWaitFunc(c *C) { ctx := context.Background() done := make(chan bool) go func() { - err := waitFunc(ctx, param.TemplateParams{}, nil) + _, err := waitFunc(ctx, param.TemplateParams{}, nil) c.Assert(err, IsNil) close(done) }() @@ -46,8 +46,19 @@ func (s *FuncSuite) TestArgsFunc(c *C) { ctx := context.Background() args := map[string]interface{}{"arg1": []string{"foo", "bar"}} go func() { - err := argsFunc(ctx, param.TemplateParams{}, args) + _, err := argsFunc(ctx, param.TemplateParams{}, args) c.Assert(err, IsNil) }() c.Assert(ArgFuncArgs(), DeepEquals, args) } + +func (s *FuncSuite) TestOutputFunc(c *C) { + ctx := context.Background() + args := map[string]interface{}{"arg1": []string{"foo", "bar"}} + go func() { + output, err := outputFunc(ctx, param.TemplateParams{}, args) + c.Assert(err, IsNil) + c.Assert(output, DeepEquals, args) + }() + c.Assert(OutputFuncOut(), DeepEquals, args) +} From a6933804db0c475e005353a93833c187cf43bfff Mon Sep 17 00:00:00 2001 From: Supriya Kharade Date: Fri, 5 Oct 2018 11:09:01 -0700 Subject: [PATCH 02/18] move code to get/CreateRepo to new func (#4017) --- pkg/function/backup_data.go | 43 +++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/pkg/function/backup_data.go b/pkg/function/backup_data.go index 476aa4dc2f..3182a7b653 100644 --- a/pkg/function/backup_data.go +++ b/pkg/function/backup_data.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/pkg/errors" + "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" @@ -69,7 +70,27 @@ func validateProfile(profile *param.Profile) error { return nil } +func getOrCreateRepository(cli kubernetes.Interface, namespace, pod, container, artifactPrefix string, profile *param.Profile) error { + // Use the snapshots command to check if the repository exists + cmd := generateSnapshotsCommand(artifactPrefix, profile) + stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) + formatAndLog(pod, container, stdout) + formatAndLog(pod, container, stderr) + if err != nil { + // Create a repository + cmd := generateInitCommand(artifactPrefix, profile) + stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) + formatAndLog(pod, container, stdout) + formatAndLog(pod, container, stderr) + if err != nil { + return errors.Wrapf(err, "Failed to create object store backup location") + } + } + return nil +} + func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { + var namespace, pod, container, includePath, backupArtifactPrefix, backupIdentifier string var err error if err = Arg(args, BackupDataNamespaceArg, &namespace); err != nil { @@ -98,24 +119,14 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m if err != nil { return nil, errors.Wrapf(err, "Failed to create Kubernetes client") } - // Use the snapshots command to check if the repository exists - cmd := generateSnapshotsCommand(backupArtifactPrefix, tp.Profile) - stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) - formatAndLog(pod, container, stdout) - formatAndLog(pod, container, stderr) - if err != nil { - // Create a repository - cmd := generateInitCommand(backupArtifactPrefix, tp.Profile) - stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) - formatAndLog(pod, container, stdout) - formatAndLog(pod, container, stderr) - if err != nil { - return nil, errors.Wrapf(err, "Failed to create object store backup location") - } + + if err = getOrCreateRepository(cli, namespace, pod, container, backupArtifactPrefix, tp.Profile); err != nil { + return nil, err } + // Create backup and dump it on the object store - cmd = generateBackupCommand(includePath, backupArtifactPrefix, backupIdentifier, tp.Profile) - stdout, stderr, err = kube.Exec(cli, namespace, pod, container, cmd) + cmd := generateBackupCommand(includePath, backupArtifactPrefix, backupIdentifier, tp.Profile) + stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) formatAndLog(pod, container, stdout) formatAndLog(pod, container, stderr) if err != nil { From f691ba2daa752cce40103b6b80ee928ecb24df9d Mon Sep 17 00:00:00 2001 From: Supriya Kharade Date: Mon, 8 Oct 2018 11:39:06 -0700 Subject: [PATCH 03/18] Add helper functions to objectstore (#4031) Add get/put/delete data helper functions to objectstore #K10-1536 --- pkg/objectstore/helper.go | 102 +++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/pkg/objectstore/helper.go b/pkg/objectstore/helper.go index 579d192c74..403df91d21 100644 --- a/pkg/objectstore/helper.go +++ b/pkg/objectstore/helper.go @@ -1,6 +1,15 @@ package objectstore -import "context" +import ( + "context" + "os" + + "github.com/pkg/errors" + "golang.org/x/oauth2/google" + "google.golang.org/api/compute/v1" + + "github.com/kanisterio/kanister/pkg/param" +) // GetOrCreateBucket is a helper function to access the package level getOrCreateBucket func GetOrCreateBucket(ctx context.Context, p Provider, bucketName string, region string) (Directory, error) { @@ -14,3 +23,94 @@ func IsS3Provider(p Provider) bool { } return false } +func GetBucket(ctx context.Context, profile *param.Profile, osType ProviderType, bucketName string) (Bucket, error) { + pc := ProviderConfig{Type: osType} + if osType == ProviderTypeS3 { + if profile.Credential.Type != param.CredentialTypeKeyPair { + return nil, errors.New("Unsupported Credential type") + } + } + secret, err := GetSecret(osType, profile) + if err != nil { + return nil, errors.Wrapf(err, "Failed to get provider credentials") + } + provider, err := NewProvider(ctx, pc, secret) + if err != nil { + return nil, errors.Wrapf(err, "Failed to get new provider") + } + return provider.GetBucket(ctx, bucketName) +} + +func PutData(ctx context.Context, profile *param.Profile, osType ProviderType, bucketName, dirName, fileName string, data []byte) error { + bucket, err := GetBucket(ctx, profile, osType, bucketName) + if err != nil { + return errors.Wrapf(err, "Failed to get bucket") + } + directory, err := bucket.GetDirectory(ctx, dirName) + if err != nil { + directory, err = bucket.CreateDirectory(ctx, dirName) + if err != nil { + return errors.Wrapf(err, "Failed to create directory") + } + } + return directory.PutBytes(ctx, fileName, data, nil) +} + +func GetData(ctx context.Context, profile *param.Profile, osType ProviderType, bucketName, dirName, fileName string) ([]byte, error) { + bucket, err := GetBucket(ctx, profile, osType, bucketName) + if err != nil { + return nil, errors.Wrapf(err, "Failed to get bucket") + } + directory, err := bucket.GetDirectory(ctx, dirName) + if err != nil { + return nil, errors.Wrapf(err, "Failed to get directory") + } + data, _, err := directory.GetBytes(ctx, fileName) + if err != nil { + return nil, errors.Wrapf(err, "Failed to get data") + } + return data, nil +} + +func DeleteData(ctx context.Context, profile *param.Profile, osType ProviderType, bucketName, dirName string) error { + bucket, err := GetBucket(ctx, profile, osType, bucketName) + if err != nil { + return errors.Wrapf(err, "Failed to get bucket") + } + directory, err := bucket.GetDirectory(ctx, dirName) + if err != nil { + return errors.Wrapf(err, "Failed to get directory") + } + return errors.Wrapf(directory.DeleteDirectory(ctx), "Failed to delete data") +} + +func GetSecret(osType ProviderType, profile *param.Profile) (*Secret, error) { + secret := &Secret{} + switch osType { + case ProviderTypeS3: + secret.Type = SecretTypeAwsAccessKey + secret.Aws = &SecretAws{ + AccessKeyID: profile.Credential.KeyPair.ID, + SecretAccessKey: profile.Credential.KeyPair.Secret, + } + case ProviderTypeGCS: + creds, err := google.FindDefaultCredentials(context.Background(), compute.ComputeScope) + if err != nil { + return nil, errors.New("Could not get GCS credentials") + } + secret.Type = SecretTypeGcpServiceAccountKey + secret.Gcp = &SecretGcp{ + ServiceKey: string(creds.JSON), + ProjectID: creds.ProjectID, + } + case ProviderTypeAzure: + secret.Type = SecretTypeAzStorageAccount + secret.Azure = &SecretAzure{ + StorageAccount: os.Getenv("AZURE_STORAGE_ACCOUNT_NAME"), + StorageKey: os.Getenv("AZURE_STORAGE_ACCOUNT_KEY"), + } + default: + return nil, errors.New("Unsupported provider " + string(osType)) + } + return secret, nil +} From 2f639d02f507284505c63ae2236a749ee097922a Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Mon, 8 Oct 2018 16:39:25 -0700 Subject: [PATCH 04/18] Modify BPs to not use .ArtifactsOut as func args (#4045) --- .../kanister/elasticsearch-blueprint.yaml | 2 +- .../kanister/mongodb-blueprint.yaml | 2 +- .../kanister/kanister-mysql/kanister/mysql-blueprint.yaml | 2 +- .../kanister-postgresql/kanister/postgres-blueprint.yaml | 4 ++-- examples/mongo-sidecar/blueprint.yaml | 2 +- examples/postgres-basic-pgdump/blueprint.yaml | 2 +- examples/time-log/blueprint.yaml | 4 ++-- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/examples/helm/kanister/kanister-elasticsearch/kanister/elasticsearch-blueprint.yaml b/examples/helm/kanister/kanister-elasticsearch/kanister/elasticsearch-blueprint.yaml index 5f617d7202..b408df2e8b 100644 --- a/examples/helm/kanister/kanister-elasticsearch/kanister/elasticsearch-blueprint.yaml +++ b/examples/helm/kanister/kanister-elasticsearch/kanister/elasticsearch-blueprint.yaml @@ -21,7 +21,7 @@ actions: - | env_prefix="{{ snakecase .StatefulSet.Name | trimSuffix "_data" | upper}}" cluster_ip=$(env | grep ${env_prefix}_CLIENT_PORT_9200_TCP_ADDR= | sed 's/.*=//') - snapshot_prefix="{{ .ArtifactsOut.esBackup.KeyValue.path }}" + snapshot_prefix="/elasticsearch-backups/{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02T15-04-05" }}" list="$(curl -GET ${cluster_ip}:9200/_cat/indices | awk '{ print $3 }')" for index in $list do diff --git a/examples/helm/kanister/kanister-mongodb-replicaset/kanister/mongodb-blueprint.yaml b/examples/helm/kanister/kanister-mongodb-replicaset/kanister/mongodb-blueprint.yaml index 4def11a954..ca75e28115 100644 --- a/examples/helm/kanister/kanister-mongodb-replicaset/kanister/mongodb-blueprint.yaml +++ b/examples/helm/kanister/kanister-mongodb-replicaset/kanister/mongodb-blueprint.yaml @@ -28,7 +28,7 @@ actions: then dump_cmd+=(-u "${MONGO_ADMIN_USER}" -p "${MONGO_ADMIN_PASSWORD}") fi - ${dump_cmd[@]} | kando location push --profile '{{ toJson .Profile }}' --path '{{ .ArtifactsOut.cloudObject.KeyValue.path }}' - + ${dump_cmd[@]} | kando location push --profile '{{ toJson .Profile }}' --path '/mongodb-replicaset-backups/{{ .StatefulSet.Name }}/{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02T15-04-05" }}/rs_backup.gz' - restore: type: StatefulSet inputArtifactNames: diff --git a/examples/helm/kanister/kanister-mysql/kanister/mysql-blueprint.yaml b/examples/helm/kanister/kanister-mysql/kanister/mysql-blueprint.yaml index 6c64154aee..c6e020105b 100644 --- a/examples/helm/kanister/kanister-mysql/kanister/mysql-blueprint.yaml +++ b/examples/helm/kanister/kanister-mysql/kanister/mysql-blueprint.yaml @@ -20,7 +20,7 @@ actions: - pipefail - -c - | - s3_path="{{ .ArtifactsOut.mysqlCloudDump.KeyValue.path }}" + s3_path="/mysql-backups/{{ .Deployment.Namespace }}/{{ .Deployment.Name }}/{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02T15-04-05" }}/dump.sql.gz" mysqldump -u root --password="${MYSQL_ROOT_PASSWORD}" --single-transaction --all-databases | gzip - | kando location push --profile '{{ toJson .Profile }}' --path ${s3_path} - restore: type: Deployment diff --git a/examples/helm/kanister/kanister-postgresql/kanister/postgres-blueprint.yaml b/examples/helm/kanister/kanister-postgresql/kanister/postgres-blueprint.yaml index 11ec2f8815..64a6243245 100644 --- a/examples/helm/kanister/kanister-postgresql/kanister/postgres-blueprint.yaml +++ b/examples/helm/kanister/kanister-postgresql/kanister/postgres-blueprint.yaml @@ -34,7 +34,7 @@ actions: then # Setup wal-e s3 connection parameters. timeline={{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02T15-04-05" }} - wale_s3_prefix="s3://{{ .Profile.Location.S3Compliant.Bucket }}/{{ .ArtifactsOut.manifest.KeyValue.prefix }}/${timeline}" + wale_s3_prefix="s3://{{ .Profile.Location.S3Compliant.Bucket }}/postgres-backups/{{ .Deployment.Name }}/${timeline}" echo "${wale_s3_prefix}" > "${env_wal_prefix}" fi @@ -89,7 +89,7 @@ actions: {{- if .Profile.Location.S3Compliant.Region }} s3_cmd+=(--region "{{ .Profile.Location.S3Compliant.Region | quote}}") {{- end }} - s3_path="s3://{{ .Profile.Location.S3Compliant.Bucket }}/{{ .ArtifactsOut.manifest.KeyValue.path }}" + s3_path="s3://{{ .Profile.Location.S3Compliant.Bucket }}/postgres-backups/{{ .Deployment.Name }}/{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02T15-04-05" }}/manifest.txt" s3_cmd+=(s3 cp - "${s3_path}") set +o xtrace diff --git a/examples/mongo-sidecar/blueprint.yaml b/examples/mongo-sidecar/blueprint.yaml index faa68561e7..5936063d7f 100644 --- a/examples/mongo-sidecar/blueprint.yaml +++ b/examples/mongo-sidecar/blueprint.yaml @@ -39,7 +39,7 @@ actions: then cmd=(aws --endpoint https://storage.googleapis.com s3 cp) fi - ${cmd[@]} ${file} "{{ .ArtifactsOut.cloudObject.KeyValue.path }}" + ${cmd[@]} ${file} "{{ .ConfigMaps.location.Data.bucket }}/backups/{{ .StatefulSet.Name }}/{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02T15-04-05" }}/rs0.tar" restore: type: StatefulSet inputArtifactNames: diff --git a/examples/postgres-basic-pgdump/blueprint.yaml b/examples/postgres-basic-pgdump/blueprint.yaml index f174e7156e..c7048c6e82 100644 --- a/examples/postgres-basic-pgdump/blueprint.yaml +++ b/examples/postgres-basic-pgdump/blueprint.yaml @@ -32,7 +32,7 @@ actions: export PGPORT=${{ .StatefulSet.Name | upper | replace "-" "_" }}_PORT_5432_TCP_PORT export PGPASSWORD={{ .Secrets.postgres.Data.password_superuser | toString }} pg_dumpall -U postgres -c -f backup.tar - aws s3 cp backup.tar "{{ .ArtifactsOut.cloudObject.KeyValue.path }}" + aws s3 cp backup.tar "{{ .ConfigMaps.location.Data.bucket }}/backups/{{ .StatefulSet.Namespace }}/{{ .StatefulSet.Name }}/{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02T15-04-05" }}/pg_backup.tar" restore: type: StatefulSet secretNames: diff --git a/examples/time-log/blueprint.yaml b/examples/time-log/blueprint.yaml index 6c0d41d7ae..863e3aa2bd 100644 --- a/examples/time-log/blueprint.yaml +++ b/examples/time-log/blueprint.yaml @@ -21,8 +21,8 @@ actions: pod: "{{ index .Deployment.Pods 0 }}" container: test-container includePath: /var/log - backupArtifactPrefix: "{{ .ArtifactsOut.timeLog.KeyValue.path }}" - backupIdentifier: "{{ .ArtifactsOut.backupIdentifier.KeyValue.id }}" + backupArtifactPrefix: "{{ .Profile.Location.S3Compliant.Bucket }}/time-log" + backupIdentifier: "{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02" }}" restore: type: Deployment inputArtifactNames: From 4534e872c9495b46dda573a44230468f546f5a06 Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Mon, 8 Oct 2018 17:26:33 -0700 Subject: [PATCH 05/18] Kando command for creating phase output (#4021) * Kando command for creating phase output * Fix typo * Minor improvements --- pkg/kando/kando.go | 1 + pkg/kando/output.go | 34 ++++++++++++++++++++++++ pkg/output/output.go | 54 +++++++++++++++++++++++++++++++++++++++ pkg/output/output_test.go | 31 ++++++++++++++++++++++ 4 files changed, 120 insertions(+) create mode 100644 pkg/kando/output.go create mode 100644 pkg/output/output.go create mode 100644 pkg/output/output_test.go diff --git a/pkg/kando/kando.go b/pkg/kando/kando.go index 350e8c13b6..e6020c8302 100644 --- a/pkg/kando/kando.go +++ b/pkg/kando/kando.go @@ -27,5 +27,6 @@ func newRootCommand() *cobra.Command { Version: version.VersionString(), } rootCmd.AddCommand(newLocationCommand()) + rootCmd.AddCommand(newOutputCommand()) return rootCmd } diff --git a/pkg/kando/output.go b/pkg/kando/output.go new file mode 100644 index 0000000000..51474769ec --- /dev/null +++ b/pkg/kando/output.go @@ -0,0 +1,34 @@ +package kando + +import ( + "github.com/pkg/errors" + "github.com/spf13/cobra" + + "github.com/kanisterio/kanister/pkg/output" +) + +func newOutputCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "output ", + Short: "Create phase output with given key:value", + Args: func(c *cobra.Command, args []string) error { + return validateArguments(c, args) + }, + // TODO: Example invocations + RunE: func(c *cobra.Command, args []string) error { + return runOutputCommand(c, args) + }, + } + return cmd +} + +func validateArguments(c *cobra.Command, args []string) error { + if err := cobra.ExactArgs(2); err != nil { + return errors.New("Command requires exactly two arguments") + } + return output.ValidateKey(args[0]) +} + +func runOutputCommand(c *cobra.Command, args []string) error { + return output.PrintOutput(args[0], args[1]) +} diff --git a/pkg/output/output.go b/pkg/output/output.go new file mode 100644 index 0000000000..c5c9feaf8d --- /dev/null +++ b/pkg/output/output.go @@ -0,0 +1,54 @@ +package output + +import ( + "encoding/json" + "fmt" + "regexp" + + "github.com/pkg/errors" +) + +const ( + phaseOpString = "###Phase-output###:" +) + +type Output struct { + Key string `json:"key"` + Value string `json:"value"` +} + +func marshalOutput(key, value string) (string, error) { + out := &Output{ + Key: key, + Value: value, + } + outString, err := json.Marshal(out) + if err != nil { + return "", errors.Wrapf(err, "Failed to marshall key-value pair") + } + return string(outString), nil +} + +// ValidateKey validates the key argument +func ValidateKey(key string) error { + // key should be non-empty + if key == "" { + return errors.New("Key should not be empty") + } + // key can contain only alpha numeric characters and underscore + valid := regexp.MustCompile("^[a-zA-Z0-9_]*$").MatchString + if !valid(key) { + return errors.New("Key should contain only alphanumeric characters and underscore") + } + return nil +} + +// PrintOutput runs the `kando output` command +func PrintOutput(key, value string) error { + outString, err := marshalOutput(key, value) + if err != nil { + return err + } + fmt.Println(phaseOpString, outString) + return nil +} diff --git a/pkg/output/output_test.go b/pkg/output/output_test.go new file mode 100644 index 0000000000..2be719a470 --- /dev/null +++ b/pkg/output/output_test.go @@ -0,0 +1,31 @@ +package output + +import ( + "testing" + + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + +type OutputSuite struct{} + +var _ = Suite(&OutputSuite{}) + +func (s *OutputSuite) TestValidateKey(c *C) { + for _, tc := range []struct { + key string + checker Checker + }{ + {"validKey", IsNil}, + {"validKey2", IsNil}, + {"valid_key", IsNil}, + {"invalid-key", NotNil}, + {"invalid.key", NotNil}, + {"`invalidKey", NotNil}, + } { + err := ValidateKey(tc.key) + c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key)) + } +} From f988a34972bae1d7a0651fecba43fdd5cf647ab6 Mon Sep 17 00:00:00 2001 From: Supriya Kharade Date: Tue, 9 Oct 2018 11:50:48 -0700 Subject: [PATCH 06/18] modified controller dockerfile (#4056) --- Dockerfile.in | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Dockerfile.in b/Dockerfile.in index f15531de00..88a437a8e5 100644 --- a/Dockerfile.in +++ b/Dockerfile.in @@ -1,4 +1,6 @@ FROM ARG_FROM MAINTAINER Tom Manville ADD ARG_SOURCE_BIN /ARG_BIN +RUN apk -v --update add --no-cache ca-certificates && \ + rm -f /var/cache/apk/* ENTRYPOINT ["/ARG_BIN"] From 3eb8275901cb1e753ffbe14094dc3c96f2005ceb Mon Sep 17 00:00:00 2001 From: Vaibhav Kamra Date: Tue, 9 Oct 2018 14:28:57 -0700 Subject: [PATCH 07/18] Refactor Restic helpers (#4060) Move command generation into the `restic` pkg. Allows for re-use in other functions --- pkg/function/backup_data.go | 26 +++----------------------- pkg/function/restore_data.go | 13 +------------ pkg/restic/restic.go | 30 ++++++++++++++++++------------ 3 files changed, 22 insertions(+), 47 deletions(-) diff --git a/pkg/function/backup_data.go b/pkg/function/backup_data.go index 3182a7b653..d817c67e97 100644 --- a/pkg/function/backup_data.go +++ b/pkg/function/backup_data.go @@ -2,7 +2,6 @@ package function import ( "context" - "fmt" "github.com/pkg/errors" "k8s.io/client-go/kubernetes" @@ -41,25 +40,6 @@ func (*backupDataFunc) Name() string { return "BackupData" } -func generateSnapshotsCommand(destArtifact string, profile *param.Profile) []string { - // Restic Snapshots command - command := restic.SnapshotsCommand(profile, destArtifact) - return []string{"sh", "-o", "errexit", "-o", "pipefail", "-c", command} -} - -func generateInitCommand(destArtifact string, profile *param.Profile) []string { - // Restic Repository Init command - command := restic.InitCommand(profile, destArtifact) - return []string{"sh", "-o", "errexit", "-o", "pipefail", "-c", command} -} - -func generateBackupCommand(includePath, destArtifact, id string, profile *param.Profile) []string { - // Restic Backup command - command := restic.BackupCommand(profile, destArtifact) - command = fmt.Sprintf("%s --tag %s %s", command, id, includePath) - return []string{"sh", "-o", "errexit", "-o", "pipefail", "-c", command} -} - func validateProfile(profile *param.Profile) error { if profile == nil { return errors.New("Profile must be non-nil") @@ -72,13 +52,13 @@ func validateProfile(profile *param.Profile) error { func getOrCreateRepository(cli kubernetes.Interface, namespace, pod, container, artifactPrefix string, profile *param.Profile) error { // Use the snapshots command to check if the repository exists - cmd := generateSnapshotsCommand(artifactPrefix, profile) + cmd := restic.SnapshotsCommand(profile, artifactPrefix) stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) formatAndLog(pod, container, stdout) formatAndLog(pod, container, stderr) if err != nil { // Create a repository - cmd := generateInitCommand(artifactPrefix, profile) + cmd := restic.InitCommand(profile, artifactPrefix) stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) formatAndLog(pod, container, stdout) formatAndLog(pod, container, stderr) @@ -125,7 +105,7 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m } // Create backup and dump it on the object store - cmd := generateBackupCommand(includePath, backupArtifactPrefix, backupIdentifier, tp.Profile) + cmd := restic.BackupCommand(tp.Profile, backupArtifactPrefix, backupIdentifier, includePath) stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) formatAndLog(pod, container, stdout) formatAndLog(pod, container, stderr) diff --git a/pkg/function/restore_data.go b/pkg/function/restore_data.go index 9669df98ca..74f03af591 100644 --- a/pkg/function/restore_data.go +++ b/pkg/function/restore_data.go @@ -2,7 +2,6 @@ package function import ( "context" - "fmt" "github.com/pkg/errors" @@ -65,13 +64,6 @@ func fetchPodVolumes(pod string, tp param.TemplateParams) (map[string]string, er } } -func generateRestoreCommand(backupArtifactPrefix, restorePath, id string, profile *param.Profile) ([]string, error) { - // Restic restore command - command := restic.RestoreCommand(profile, backupArtifactPrefix) - command = fmt.Sprintf("%s --tag %s latest --target %s", command, id, restorePath) - return []string{"bash", "-o", "errexit", "-o", "pipefail", "-c", command}, nil -} - func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { var namespace, pod, image, backupArtifactPrefix, restorePath, backupIdentifier string var vols map[string]string @@ -105,10 +97,7 @@ func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args return nil, err } // Generate restore command - cmd, err := generateRestoreCommand(backupArtifactPrefix, restorePath, backupIdentifier, tp.Profile) - if err != nil { - return nil, err - } + cmd := restic.RestoreCommand(tp.Profile, backupArtifactPrefix, backupIdentifier, restorePath) if len(vols) == 0 { // Fetch Volumes vols, err = fetchPodVolumes(pod, tp) diff --git a/pkg/restic/restic.go b/pkg/restic/restic.go index 8ecc926c86..64fdf96b33 100644 --- a/pkg/restic/restic.go +++ b/pkg/restic/restic.go @@ -8,52 +8,58 @@ import ( "github.com/kanisterio/kanister/pkg/param" ) +func shCommand(command string) []string { + return []string{"sh", "-o", "errexit", "-o", "pipefail", "-c", command} +} + // BackupCommand returns restic backup command -func BackupCommand(profile *param.Profile, repository string) string { +func BackupCommand(profile *param.Profile, repository, id, includePath string) []string { cmd := resticArgs(profile, repository) cmd = append(cmd, "backup") command := strings.Join(cmd, " ") - return command + command = fmt.Sprintf("%s --tag %s %s", command, id, includePath) + return shCommand(command) } // RestoreCommand returns restic restore command -func RestoreCommand(profile *param.Profile, repository string) string { +func RestoreCommand(profile *param.Profile, repository, id, restorePath string) []string { cmd := resticArgs(profile, repository) cmd = append(cmd, "restore") command := strings.Join(cmd, " ") - return command + command = fmt.Sprintf("%s --tag %s latest --target %s", command, id, restorePath) + return shCommand(command) } // SnapshotsCommand returns restic snapshots command -func SnapshotsCommand(profile *param.Profile, repository string) string { +func SnapshotsCommand(profile *param.Profile, repository string) []string { cmd := resticArgs(profile, repository) cmd = append(cmd, "snapshots") command := strings.Join(cmd, " ") - return command + return shCommand(command) } // InitCommand returns restic init command -func InitCommand(profile *param.Profile, repository string) string { +func InitCommand(profile *param.Profile, repository string) []string { cmd := resticArgs(profile, repository) cmd = append(cmd, "init") command := strings.Join(cmd, " ") - return command + return shCommand(command) } // ForgetCommand returns restic forget command -func ForgetCommand(profile *param.Profile, repository string) string { +func ForgetCommand(profile *param.Profile, repository string) []string { cmd := resticArgs(profile, repository) cmd = append(cmd, "forget") command := strings.Join(cmd, " ") - return command + return shCommand(command) } // PruneCommand returns restic prune command -func PruneCommand(profile *param.Profile, repository string) string { +func PruneCommand(profile *param.Profile, repository string) []string { cmd := resticArgs(profile, repository) cmd = append(cmd, "prune") command := strings.Join(cmd, " ") - return command + return shCommand(command) } const ( From 352e5871913f9ec3c84d2103a156dd22d5b01a71 Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Tue, 9 Oct 2018 20:40:02 -0700 Subject: [PATCH 08/18] Fix kando output command (#4064) --- pkg/kando/output.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kando/output.go b/pkg/kando/output.go index 51474769ec..28369ea90d 100644 --- a/pkg/kando/output.go +++ b/pkg/kando/output.go @@ -23,8 +23,8 @@ func newOutputCommand() *cobra.Command { } func validateArguments(c *cobra.Command, args []string) error { - if err := cobra.ExactArgs(2); err != nil { - return errors.New("Command requires exactly two arguments") + if len(args) != 2 { + return errors.Errorf("Command accepts 2 arguments, received %d arguments", len(args)) } return output.ValidateKey(args[0]) } From 9b6902847f4850c3263cc13b2382645486cddc35 Mon Sep 17 00:00:00 2001 From: Vaibhav Kamra Date: Wed, 10 Oct 2018 11:08:04 -0700 Subject: [PATCH 09/18] Add support for running a pod with specified PVCs attached (#4067) * Add helpers to run a pod * Address review comments --- pkg/kube/pod.go | 69 +++++++++++++++++++++++++++++++++++ pkg/kube/pod_test.go | 87 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 pkg/kube/pod.go create mode 100644 pkg/kube/pod_test.go diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go new file mode 100644 index 0000000000..f19ebe9444 --- /dev/null +++ b/pkg/kube/pod.go @@ -0,0 +1,69 @@ +package kube + +import ( + "context" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/kanisterio/kanister/pkg/poll" +) + +// PodOptions specifies options for `CreatePod` +type PodOptions struct { + Namespace string + GenerateName string + Image string + Command []string + Volumes map[string]string +} + +// CreatePod creates a pod with a single container based on the specified image +func CreatePod(ctx context.Context, cli kubernetes.Interface, opts *PodOptions) (*v1.Pod, error) { + volumeMounts, podVolumes := createVolumeSpecs(opts.Volumes) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: opts.GenerateName, + Namespace: opts.Namespace, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Name: "container", + Image: opts.Image, + Command: opts.Command, + ImagePullPolicy: v1.PullPolicy(v1.PullIfNotPresent), + VolumeMounts: volumeMounts, + }, + }, + Volumes: podVolumes, + }, + } + pod, err := cli.Core().Pods(opts.Namespace).Create(pod) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create pod. Namespace: %s, NameFmt: %s", opts.Namespace, opts.GenerateName) + } + err = poll.Wait(ctx, func(ctx context.Context) (bool, error) { + p, err := cli.Core().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) + if err != nil { + return true, err + } + return (p.Status.Phase == v1.PodRunning), nil + }) + if err != nil { + defer DeletePod(context.Background(), cli, pod) + return nil, errors.Wrapf(err, "Pod did not transition to running state. Namespace:%s, Name:%s", pod.Namespace, pod.Name) + } + return pod, nil +} + +// DeletePod deletes the specified pod +func DeletePod(ctx context.Context, cli kubernetes.Interface, pod *v1.Pod) error { + if err := cli.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { + log.Errorf("DeletePod failed: %v", err) + } + return nil +} diff --git a/pkg/kube/pod_test.go b/pkg/kube/pod_test.go new file mode 100644 index 0000000000..ed068f3f77 --- /dev/null +++ b/pkg/kube/pod_test.go @@ -0,0 +1,87 @@ +// +build !unit + +package kube + +import ( + "context" + "fmt" + "time" + + . "gopkg.in/check.v1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/testing" +) + +type PodSuite struct { + cli kubernetes.Interface + namespace string +} + +var _ = Suite(&PodSuite{}) + +func (s *PodSuite) SetUpSuite(c *C) { + var err error + s.cli, err = NewClient() + c.Assert(err, IsNil) + ns := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "podtest-", + }, + } + ns, err = s.cli.Core().Namespaces().Create(ns) + c.Assert(err, IsNil) + s.namespace = ns.Name +} + +func (s *PodSuite) TearDownSuite(c *C) { + if s.namespace != "" { + err := s.cli.Core().Namespaces().Delete(s.namespace, nil) + c.Assert(err, IsNil) + } +} + +func (s *PodSuite) TestPod(c *C) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + pod, err := CreatePod(ctx, s.cli, &PodOptions{ + Namespace: s.namespace, + GenerateName: "test-", + Image: "kanisterio/kanister-tools:0.12.0", + Command: []string{"sh", "-c", "tail -f /dev/null"}, + }) + c.Assert(err, IsNil) + c.Assert(DeletePod(context.Background(), s.cli, pod), IsNil) +} + +func (s *PodSuite) TestPodWithVolumes(c *C) { + cli := fake.NewSimpleClientset() + vols := map[string]string{"pvc-test": "/mnt/data1"} + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + var p *v1.Pod + cli.PrependReactor("create", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) { + fmt.Println("found pod") + ca := action.(testing.CreateAction) + p = ca.GetObject().(*v1.Pod) + return false, nil, nil + }) + cli.PrependReactor("get", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) { + p.Status.Phase = v1.PodRunning + return true, p, nil + }) + pod, err := CreatePod(ctx, cli, &PodOptions{ + Namespace: s.namespace, + GenerateName: "test-", + Image: "kanisterio/kanister-tools:0.12.0", + Command: []string{"sh", "-c", "tail -f /dev/null"}, + Volumes: vols, + }) + c.Assert(err, IsNil) + c.Assert(pod.Spec.Volumes, HasLen, 1) + c.Assert(pod.Spec.Volumes[0].VolumeSource.PersistentVolumeClaim.ClaimName, Equals, "pvc-test") + c.Assert(pod.Spec.Containers[0].VolumeMounts[0].MountPath, Equals, "/mnt/data1") +} From cfa7d632d4886030f913fddd641434f7cf31872c Mon Sep 17 00:00:00 2001 From: Vaibhav Kamra Date: Wed, 10 Oct 2018 11:17:35 -0700 Subject: [PATCH 10/18] Move restic repo init into restic pkg (#4068) * Move restic repo init into restic pkg * Address review comments --- pkg/format/format.go | 19 +++++++++++++++++++ pkg/function/backup_data.go | 28 ++++------------------------ pkg/function/kube_exec_all.go | 18 +++--------------- pkg/restic/restic.go | 23 +++++++++++++++++++++++ 4 files changed, 49 insertions(+), 39 deletions(-) create mode 100644 pkg/format/format.go diff --git a/pkg/format/format.go b/pkg/format/format.go new file mode 100644 index 0000000000..d30014a36e --- /dev/null +++ b/pkg/format/format.go @@ -0,0 +1,19 @@ +package format + +import ( + "regexp" + "strings" + + log "github.com/sirupsen/logrus" +) + +func Log(podName string, containerName string, output string) { + if output != "" { + logs := regexp.MustCompile("[\r\n]").Split(output, -1) + for _, l := range logs { + if strings.TrimSpace(l) != "" { + log.Info("Pod: ", podName, " Container: ", containerName, " Out: ", l) + } + } + } +} diff --git a/pkg/function/backup_data.go b/pkg/function/backup_data.go index d817c67e97..a76c9a38da 100644 --- a/pkg/function/backup_data.go +++ b/pkg/function/backup_data.go @@ -4,10 +4,10 @@ import ( "context" "github.com/pkg/errors" - "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/format" "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" "github.com/kanisterio/kanister/pkg/restic" @@ -50,27 +50,7 @@ func validateProfile(profile *param.Profile) error { return nil } -func getOrCreateRepository(cli kubernetes.Interface, namespace, pod, container, artifactPrefix string, profile *param.Profile) error { - // Use the snapshots command to check if the repository exists - cmd := restic.SnapshotsCommand(profile, artifactPrefix) - stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) - formatAndLog(pod, container, stdout) - formatAndLog(pod, container, stderr) - if err != nil { - // Create a repository - cmd := restic.InitCommand(profile, artifactPrefix) - stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) - formatAndLog(pod, container, stdout) - formatAndLog(pod, container, stderr) - if err != nil { - return errors.Wrapf(err, "Failed to create object store backup location") - } - } - return nil -} - func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { - var namespace, pod, container, includePath, backupArtifactPrefix, backupIdentifier string var err error if err = Arg(args, BackupDataNamespaceArg, &namespace); err != nil { @@ -100,15 +80,15 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m return nil, errors.Wrapf(err, "Failed to create Kubernetes client") } - if err = getOrCreateRepository(cli, namespace, pod, container, backupArtifactPrefix, tp.Profile); err != nil { + if err = restic.GetOrCreateRepository(cli, namespace, pod, container, backupArtifactPrefix, tp.Profile); err != nil { return nil, err } // Create backup and dump it on the object store cmd := restic.BackupCommand(tp.Profile, backupArtifactPrefix, backupIdentifier, includePath) stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) - formatAndLog(pod, container, stdout) - formatAndLog(pod, container, stderr) + format.Log(pod, container, stdout) + format.Log(pod, container, stderr) if err != nil { return nil, errors.Wrapf(err, "Failed to create and upload backup") } diff --git a/pkg/function/kube_exec_all.go b/pkg/function/kube_exec_all.go index c034627f86..61b046143e 100644 --- a/pkg/function/kube_exec_all.go +++ b/pkg/function/kube_exec_all.go @@ -2,14 +2,13 @@ package function import ( "context" - "regexp" "strings" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" + "github.com/kanisterio/kanister/pkg/format" "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" ) @@ -71,8 +70,8 @@ func execAll(cli kubernetes.Interface, namespace string, ps []string, cs []strin for _, c := range cs { go func(p string, c string) { stdout, stderr, err := kube.Exec(cli, namespace, p, c, cmd) - formatAndLog(p, c, stdout) - formatAndLog(p, c, stderr) + format.Log(p, c, stdout) + format.Log(p, c, stderr) errChan <- err }(p, c) } @@ -89,14 +88,3 @@ func execAll(cli kubernetes.Interface, namespace string, ps []string, cs []strin } return nil } - -func formatAndLog(podName string, containerName string, output string) { - if output != "" { - logs := regexp.MustCompile("[\r\n]").Split(output, -1) - for _, l := range logs { - if strings.TrimSpace(l) != "" { - log.Info("Pod: ", podName, " Container: ", containerName, " Out: ", l) - } - } - } -} diff --git a/pkg/restic/restic.go b/pkg/restic/restic.go index 64fdf96b33..b268a03725 100644 --- a/pkg/restic/restic.go +++ b/pkg/restic/restic.go @@ -4,6 +4,11 @@ import ( "fmt" "strings" + "github.com/pkg/errors" + "k8s.io/client-go/kubernetes" + + "github.com/kanisterio/kanister/pkg/format" + "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/location" "github.com/kanisterio/kanister/pkg/param" ) @@ -82,3 +87,21 @@ func resticArgs(profile *param.Profile, repository string) []string { ResticCommand, } } + +// GetOrCreateRepository will check if the repository already exists and initialize one if not +func GetOrCreateRepository(cli kubernetes.Interface, namespace, pod, container, artifactPrefix string, profile *param.Profile) error { + // Use the snapshots command to check if the repository exists + cmd := SnapshotsCommand(profile, artifactPrefix) + stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) + format.Log(pod, container, stdout) + format.Log(pod, container, stderr) + if err == nil { + return nil + } + // Create a repository + cmd = InitCommand(profile, artifactPrefix) + stdout, stderr, err = kube.Exec(cli, namespace, pod, container, cmd) + format.Log(pod, container, stdout) + format.Log(pod, container, stderr) + return errors.Wrapf(err, "Failed to create object store backup location") +} From a3c277a266d201d8a6eca4902ac653f633910587 Mon Sep 17 00:00:00 2001 From: Vaibhav Kamra Date: Wed, 10 Oct 2018 11:54:59 -0700 Subject: [PATCH 11/18] Add a CopyVolumeData function (#4074) * Add a CopyVolumeData function This Kanister function can be used to copy data from a PVC to an objectstore location. It leverages the restic pkg and the artifacts should be compatible with the `RestoreData` function * Address review comments --- pkg/function/copy_volume_data.go | 97 ++++++++++++++++++++++++++++++++ pkg/function/data_test.go | 52 +++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 pkg/function/copy_volume_data.go diff --git a/pkg/function/copy_volume_data.go b/pkg/function/copy_volume_data.go new file mode 100644 index 0000000000..c181101ec9 --- /dev/null +++ b/pkg/function/copy_volume_data.go @@ -0,0 +1,97 @@ +package function + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/kubernetes" + + kanister "github.com/kanisterio/kanister/pkg" + "github.com/kanisterio/kanister/pkg/format" + "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/param" + "github.com/kanisterio/kanister/pkg/restic" +) + +const ( + kanisterToolsImage = "kanisterio/kanister-tools:0.12.0" + copyVolumeDataMountPoint = "/mnt/vol_data/%s" + copyVolumeDataJobPrefix = "copy-vol-data-" + CopyVolumeDataNamespaceArg = "namespace" + CopyVolumeDataVolumeArg = "volume" + CopyVolumeDataArtifactPrefixArg = "dataArtifactPrefix" +) + +func init() { + kanister.Register(©VolumeDataFunc{}) +} + +var _ kanister.Func = (*copyVolumeDataFunc)(nil) + +type copyVolumeDataFunc struct{} + +func (*copyVolumeDataFunc) Name() string { + return "CopyVolumeData" +} + +func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, namespace, pvc, targetPath string) (map[string]interface{}, error) { + // Validate PVC exists + if _, err := cli.CoreV1().PersistentVolumeClaims(namespace).Get(pvc, metav1.GetOptions{}); err != nil { + return nil, errors.Wrapf(err, "Failed to retrieve PVC. Namespace %s, Name %s", namespace, pvc) + } + // Create a pod with PVCs attached + mountPoint := fmt.Sprintf(copyVolumeDataMountPoint, pvc) + pod, err := kube.CreatePod(ctx, cli, &kube.PodOptions{ + Namespace: namespace, + GenerateName: copyVolumeDataJobPrefix, + Image: kanisterToolsImage, + Command: []string{"sh", "-c", "tail -f /dev/null"}, + Volumes: map[string]string{pvc: mountPoint}, + }) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create pod to copy volume data") + } + defer kube.DeletePod(context.Background(), cli, pod) + + // Get restic repository + if err = restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, tp.Profile); err != nil { + return nil, err + } + + // Copy data to object store + backupIdentifier := rand.String(10) + cmd := restic.BackupCommand(tp.Profile, targetPath, backupIdentifier, mountPoint) + stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd) + format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout) + format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create and upload backup") + } + return map[string]interface{}{"backupID": backupIdentifier}, nil +} + +func (*copyVolumeDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { + var namespace, vol, targetPath string + var err error + if err = Arg(args, CopyVolumeDataNamespaceArg, &namespace); err != nil { + return nil, err + } + if err = Arg(args, CopyVolumeDataVolumeArg, &vol); err != nil { + return nil, err + } + if err = Arg(args, CopyVolumeDataArtifactPrefixArg, &targetPath); err != nil { + return nil, err + } + cli, err := kube.NewClient() + if err != nil { + return nil, errors.Wrapf(err, "Failed to create Kubernetes client") + } + return copyVolumeData(ctx, cli, tp, namespace, vol, targetPath) +} + +func (*copyVolumeDataFunc) RequiredArgs() []string { + return []string{CopyVolumeDataNamespaceArg, CopyVolumeDataVolumeArg, CopyVolumeDataArtifactPrefixArg} +} diff --git a/pkg/function/data_test.go b/pkg/function/data_test.go index 0d03f17f15..c8506471b3 100644 --- a/pkg/function/data_test.go +++ b/pkg/function/data_test.go @@ -40,6 +40,7 @@ func (s *DataSuite) SetUpSuite(c *C) { s.crCli = crCli ns := testutil.NewTestNamespace() + ns.GenerateName = "kanister-datatest-" cns, err := s.cli.Core().Namespaces().Create(ns) c.Assert(err, IsNil) @@ -152,3 +153,54 @@ func (s *DataSuite) TestBackupRestoreData(c *C) { } } } + +func newCopyDataBlueprint() crv1alpha1.Blueprint { + return crv1alpha1.Blueprint{ + Actions: map[string]*crv1alpha1.BlueprintAction{ + "copy": &crv1alpha1.BlueprintAction{ + Phases: []crv1alpha1.BlueprintPhase{ + crv1alpha1.BlueprintPhase{ + Name: "testCopy", + Func: "CopyVolumeData", + Args: map[string]interface{}{ + CopyVolumeDataNamespaceArg: "{{ .PVC.Namespace }}", + CopyVolumeDataVolumeArg: "{{ .PVC.Name }}", + CopyVolumeDataArtifactPrefixArg: "{{ .Profile.Location.S3Compliant.Bucket }}/{{ .Profile.Location.S3Compliant.Prefix }}/{{ .PVC.Namespace }}/{{ .PVC.Name }}", + }, + }, + }, + }, + }, + } +} +func (s *DataSuite) TestCopyData(c *C) { + ctx := context.Background() + pvc := testutil.NewTestPVC() + pvc, err := s.cli.CoreV1().PersistentVolumeClaims(s.namespace).Create(pvc) + c.Assert(err, IsNil) + + as := crv1alpha1.ActionSpec{ + Object: crv1alpha1.ObjectReference{ + Kind: param.PVCKind, + Name: pvc.Name, + Namespace: pvc.Namespace, + }, + Profile: &crv1alpha1.ObjectReference{ + Name: testutil.TestProfileName, + Namespace: s.namespace, + }, + } + + tp, err := param.New(ctx, s.cli, s.crCli, as) + c.Assert(err, IsNil) + + tp.Profile = testutil.ObjectStoreProfileOrSkip(c) + + bp := newCopyDataBlueprint() + phases, err := kanister.GetPhases(bp, "copy", *tp) + c.Assert(err, IsNil) + for _, p := range phases { + _, err = p.Exec(context.Background(), bp, "copy", *tp) + c.Assert(err, IsNil) + } +} From 58dbe1aee74bfd32253447cb5375686bba63c18b Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Thu, 11 Oct 2018 19:20:25 -0700 Subject: [PATCH 12/18] Render ArtifactsOut after execution of all phases (#4043) * WIP Render ArtifactsOut after all the phases * Render arts inside go routine * Remove ArtifactsOut from TemplateParams * Add Unit test to check artifacts update * Fix ActionSet update --- pkg/controller/controller.go | 41 +++++++++++++++++----- pkg/controller/controller_test.go | 57 +++++++++++++++++++++++++++++++ pkg/param/param.go | 25 +++++++------- pkg/param/render_test.go | 48 +++++++++----------------- pkg/phase_test.go | 12 +++---- 5 files changed, 121 insertions(+), 62 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9603186fea..5bb05b22f1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -210,13 +210,20 @@ func (c *Controller) onUpdateActionSet(oldAS, newAS *crv1alpha1.ActionSet) error } return nil } - for _, as := range newAS.Status.Actions { + for i, as := range newAS.Status.Actions { for _, p := range as.Phases { if p.State != crv1alpha1.StateComplete { log.Infof("Updated ActionSet '%s' Status->%s, Phase: %s->%s", newAS.Name, newAS.Status.State, p.Name, p.State) return nil } } + if len(as.Artifacts) > 0 { + if reflect.DeepEqual(oldAS.Status.Actions[i].Artifacts, as.Artifacts) { + return nil + } + log.Infof("Added Output Artifacts for action %s", as.Name) + c.logAndSuccessEvent(fmt.Sprintf("Added Output Artifacts for action %s", as.Name), "Add Artifacts", newAS) + } } newAS.Status.State = crv1alpha1.StateComplete c.logAndSuccessEvent(fmt.Sprintf("Updated ActionSet '%s' Status->%s", newAS.Name, newAS.Status.State), "Update Complete", newAS) @@ -346,12 +353,6 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI if err != nil { return err } - artTpls := as.Status.Actions[aIDX].Artifacts - arts, err := param.RenderArtifacts(artTpls, *tp) - if err != nil { - return err - } - tp.ArtifactsOut = arts phases, err := kanister.GetPhases(*bp, action.Name, *tp) if err != nil { return err @@ -363,7 +364,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI err = param.InitPhaseParams(ctx, c.clientset, tp, p.Name(), p.Objects()) if err != nil { reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name) - msg := fmt.Sprintf("Failed to execute phase: %#v:", as.Status.Actions[aIDX].Phases[i]) + msg := fmt.Sprintf("Failed to init phase params: %#v:", as.Status.Actions[aIDX].Phases[i]) c.logAndErrorEvent(msg, reason, err, as, bp) return } @@ -377,7 +378,6 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI } } else { rf = func(ras *crv1alpha1.ActionSet) error { - ras.Status.Actions[aIDX].Artifacts = arts ras.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateComplete ras.Status.Actions[aIDX].Phases[i].Output = output return nil @@ -398,6 +398,29 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI param.UpdatePhaseParams(ctx, tp, p.Name(), output) c.logAndSuccessEvent(fmt.Sprintf("Completed phase %s", p.Name()), "Ended Phase", as) } + // Render output artifacts if present + artTpls := as.Status.Actions[aIDX].Artifacts + if len(artTpls) == 0 { + return + } + arts, err := param.RenderArtifacts(artTpls, *tp) + if err != nil { + reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name) + msg := fmt.Sprintf("Failed to render Output Artifacts") + c.logAndErrorEvent(msg, reason, err, as, bp) + return + } + // Update action set with artifacts + rf := func(ras *crv1alpha1.ActionSet) error { + ras.Status.Actions[aIDX].Artifacts = arts + return nil + } + if rErr := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), ns, name, rf); rErr != nil { + reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name) + msg := fmt.Sprintf("Failed to update Output Artifacts") + c.logAndErrorEvent(msg, reason, rErr, as, bp) + return + } }() return nil } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index e27b448bb9..23b81dbd0b 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -146,6 +146,32 @@ func (s *ControllerSuite) waitOnActionSetState(c *C, as *crv1alpha1.ActionSet, s return errors.Wrapf(err, "State '%s' never reached", state) } +func newBPWithOutputArtifact() *crv1alpha1.Blueprint { + return &crv1alpha1.Blueprint{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-blueprint-", + }, + Actions: map[string]*crv1alpha1.BlueprintAction{ + "myAction": &crv1alpha1.BlueprintAction{ + OutputArtifacts: map[string]crv1alpha1.Artifact{ + "myArt": crv1alpha1.Artifact{ + KeyValue: map[string]string{ + "key": "{{ .Phases.myPhase0.Output.key }}", + }, + }, + }, + Kind: "Deployment", + Phases: []crv1alpha1.BlueprintPhase{ + { + Name: "myPhase0", + Func: testutil.OutputFuncName, + }, + }, + }, + }, + } +} + func (s *ControllerSuite) TestEmptyActionSetStatus(c *C) { as := &crv1alpha1.ActionSet{ ObjectMeta: metav1.ObjectMeta{ @@ -338,3 +364,34 @@ func (s *ControllerSuite) TestRuntimeObjEventLogs(c *C) { c.Assert(err, NotNil) c.Assert(len(events.Items), Equals, 0) } + +func (s *ControllerSuite) TestPhaseOutputAsArtifact(c *C) { + // Create a blueprint that uses func output as artifact + bp := newBPWithOutputArtifact() + bp = testutil.BlueprintWithConfigMap(bp) + bp, err := s.crCli.Blueprints(s.namespace).Create(bp) + c.Assert(err, IsNil) + + // Add an actionset that references that blueprint. + as := testutil.NewTestActionSet(s.namespace, bp.GetName(), "Deployment", s.deployment.GetName(), s.namespace) + as = testutil.ActionSetWithConfigMap(as, s.confimap.GetName()) + as, err = s.crCli.ActionSets(s.namespace).Create(as) + c.Assert(err, IsNil) + + err = s.waitOnActionSetState(c, as, crv1alpha1.StateRunning) + c.Assert(err, IsNil) + + // Check if the func returned expected output + c.Assert(testutil.OutputFuncOut(), DeepEquals, map[string]interface{}{"key": "myValue"}) + + err = s.waitOnActionSetState(c, as, crv1alpha1.StateComplete) + c.Assert(err, IsNil) + + // Check if the artifacts got updated correctly + as, err = s.crCli.ActionSets(as.GetNamespace()).Get(as.GetName(), metav1.GetOptions{}) + arts := as.Status.Actions[0].Artifacts + c.Assert(arts, NotNil) + c.Assert(arts, HasLen, 1) + keyVal := arts["myArt"].KeyValue + c.Assert(keyVal, DeepEquals, map[string]string{"key": "myValue"}) +} diff --git a/pkg/param/param.go b/pkg/param/param.go index b020009e82..d68409afd2 100644 --- a/pkg/param/param.go +++ b/pkg/param/param.go @@ -20,19 +20,18 @@ const timeFormat = time.RFC3339Nano // TemplateParams are the values that will change between separate runs of Phases. type TemplateParams struct { - StatefulSet *StatefulSetParams - Deployment *DeploymentParams - PVC *PVCParams - Namespace *NamespaceParams - ArtifactsIn map[string]crv1alpha1.Artifact - ArtifactsOut map[string]crv1alpha1.Artifact - ConfigMaps map[string]v1.ConfigMap - Secrets map[string]v1.Secret - Time string - Profile *Profile - Options map[string]string - Object map[string]interface{} - Phases map[string]*Phase + StatefulSet *StatefulSetParams + Deployment *DeploymentParams + PVC *PVCParams + Namespace *NamespaceParams + ArtifactsIn map[string]crv1alpha1.Artifact + ConfigMaps map[string]v1.ConfigMap + Secrets map[string]v1.Secret + Time string + Profile *Profile + Options map[string]string + Object map[string]interface{} + Phases map[string]*Phase } // StatefulSetParams are params for stateful sets. diff --git a/pkg/param/render_test.go b/pkg/param/render_test.go index ad796e0164..215426f526 100644 --- a/pkg/param/render_test.go +++ b/pkg/param/render_test.go @@ -36,24 +36,20 @@ func (s *RenderSuite) TestRender(c *C) { checker: IsNil, }, { - arg: "{{ .ArtifactsOut.hello.KeyValue }}", + arg: "{{ .Options.hello }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "hello": crv1alpha1.Artifact{}, + Options: map[string]string{ + "hello": "", }, }, - out: "map[]", + out: "", checker: IsNil, }, { - arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", + arg: "{{ .Options.hello }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "hello": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "someKey": "someValue", - }, - }, + Options: map[string]string{ + "hello": "someValue", }, }, out: "someValue", @@ -61,14 +57,10 @@ func (s *RenderSuite) TestRender(c *C) { }, { // `-` cannot be used in a template path. - arg: "{{ .ArtifactsOut.hello-world.KeyValue.someKey }}", + arg: "{{ .Options.hello-world }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "hello-world": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "someKey": "someValue", - }, - }, + Options: map[string]string{ + "hello-world": "someValue", }, }, out: "", @@ -76,19 +68,11 @@ func (s *RenderSuite) TestRender(c *C) { }, { // `-` can exist in artifact keys, it just cannot be used in path. - arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", + arg: "{{ .Options.hello }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "hello": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "someKey": "someValue", - }, - }, - "hello-world": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "someKey": "someValue", - }, - }, + Options: map[string]string{ + "hello": "someValue", + "hello-world": "someValue", }, }, out: "someValue", @@ -120,9 +104,9 @@ func (s *RenderSuite) TestRender(c *C) { }, { // Render should fail if referenced key doesn't exist - arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", + arg: "{{ .Options.hello }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{}, + Options: map[string]string{}, }, checker: NotNil, }, diff --git a/pkg/phase_test.go b/pkg/phase_test.go index 70415c30d9..17445d24dd 100644 --- a/pkg/phase_test.go +++ b/pkg/phase_test.go @@ -42,24 +42,20 @@ func (s *PhaseSuite) TestExec(c *C) { }{ { artifact: "hello", - argument: "{{ .ArtifactsOut.test.KeyValue.in }} world", + argument: "{{ .Options.test }} world", expected: "hello world", }, { artifact: "HELLO", - argument: "{{ .ArtifactsOut.test.KeyValue.in | lower}} world", + argument: "{{ .Options.test | lower}} world", expected: "hello world", }, } { var output string tf := &testFunc{output: &output} tp := param.TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "test": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "in": tc.artifact, - }, - }, + Options: map[string]string{ + "test": tc.artifact, }, } rawArgs := map[string]interface{}{ From 119a69178f26e394d5d673e1b48b0c3a064ab4b7 Mon Sep 17 00:00:00 2001 From: Vaibhav Kamra Date: Fri, 12 Oct 2018 08:00:13 -0700 Subject: [PATCH 13/18] Revert "Render ArtifactsOut after execution of all phases (#4043)" (#4110) This reverts commit 7ccf63251ac301eac88cd896883292732fdf7dcd. --- pkg/controller/controller.go | 41 +++++----------------- pkg/controller/controller_test.go | 57 ------------------------------- pkg/param/param.go | 25 +++++++------- pkg/param/render_test.go | 48 +++++++++++++++++--------- pkg/phase_test.go | 12 ++++--- 5 files changed, 62 insertions(+), 121 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5bb05b22f1..9603186fea 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -210,20 +210,13 @@ func (c *Controller) onUpdateActionSet(oldAS, newAS *crv1alpha1.ActionSet) error } return nil } - for i, as := range newAS.Status.Actions { + for _, as := range newAS.Status.Actions { for _, p := range as.Phases { if p.State != crv1alpha1.StateComplete { log.Infof("Updated ActionSet '%s' Status->%s, Phase: %s->%s", newAS.Name, newAS.Status.State, p.Name, p.State) return nil } } - if len(as.Artifacts) > 0 { - if reflect.DeepEqual(oldAS.Status.Actions[i].Artifacts, as.Artifacts) { - return nil - } - log.Infof("Added Output Artifacts for action %s", as.Name) - c.logAndSuccessEvent(fmt.Sprintf("Added Output Artifacts for action %s", as.Name), "Add Artifacts", newAS) - } } newAS.Status.State = crv1alpha1.StateComplete c.logAndSuccessEvent(fmt.Sprintf("Updated ActionSet '%s' Status->%s", newAS.Name, newAS.Status.State), "Update Complete", newAS) @@ -353,6 +346,12 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI if err != nil { return err } + artTpls := as.Status.Actions[aIDX].Artifacts + arts, err := param.RenderArtifacts(artTpls, *tp) + if err != nil { + return err + } + tp.ArtifactsOut = arts phases, err := kanister.GetPhases(*bp, action.Name, *tp) if err != nil { return err @@ -364,7 +363,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI err = param.InitPhaseParams(ctx, c.clientset, tp, p.Name(), p.Objects()) if err != nil { reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name) - msg := fmt.Sprintf("Failed to init phase params: %#v:", as.Status.Actions[aIDX].Phases[i]) + msg := fmt.Sprintf("Failed to execute phase: %#v:", as.Status.Actions[aIDX].Phases[i]) c.logAndErrorEvent(msg, reason, err, as, bp) return } @@ -378,6 +377,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI } } else { rf = func(ras *crv1alpha1.ActionSet) error { + ras.Status.Actions[aIDX].Artifacts = arts ras.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateComplete ras.Status.Actions[aIDX].Phases[i].Output = output return nil @@ -398,29 +398,6 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI param.UpdatePhaseParams(ctx, tp, p.Name(), output) c.logAndSuccessEvent(fmt.Sprintf("Completed phase %s", p.Name()), "Ended Phase", as) } - // Render output artifacts if present - artTpls := as.Status.Actions[aIDX].Artifacts - if len(artTpls) == 0 { - return - } - arts, err := param.RenderArtifacts(artTpls, *tp) - if err != nil { - reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name) - msg := fmt.Sprintf("Failed to render Output Artifacts") - c.logAndErrorEvent(msg, reason, err, as, bp) - return - } - // Update action set with artifacts - rf := func(ras *crv1alpha1.ActionSet) error { - ras.Status.Actions[aIDX].Artifacts = arts - return nil - } - if rErr := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), ns, name, rf); rErr != nil { - reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name) - msg := fmt.Sprintf("Failed to update Output Artifacts") - c.logAndErrorEvent(msg, reason, rErr, as, bp) - return - } }() return nil } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 23b81dbd0b..e27b448bb9 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -146,32 +146,6 @@ func (s *ControllerSuite) waitOnActionSetState(c *C, as *crv1alpha1.ActionSet, s return errors.Wrapf(err, "State '%s' never reached", state) } -func newBPWithOutputArtifact() *crv1alpha1.Blueprint { - return &crv1alpha1.Blueprint{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "test-blueprint-", - }, - Actions: map[string]*crv1alpha1.BlueprintAction{ - "myAction": &crv1alpha1.BlueprintAction{ - OutputArtifacts: map[string]crv1alpha1.Artifact{ - "myArt": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "key": "{{ .Phases.myPhase0.Output.key }}", - }, - }, - }, - Kind: "Deployment", - Phases: []crv1alpha1.BlueprintPhase{ - { - Name: "myPhase0", - Func: testutil.OutputFuncName, - }, - }, - }, - }, - } -} - func (s *ControllerSuite) TestEmptyActionSetStatus(c *C) { as := &crv1alpha1.ActionSet{ ObjectMeta: metav1.ObjectMeta{ @@ -364,34 +338,3 @@ func (s *ControllerSuite) TestRuntimeObjEventLogs(c *C) { c.Assert(err, NotNil) c.Assert(len(events.Items), Equals, 0) } - -func (s *ControllerSuite) TestPhaseOutputAsArtifact(c *C) { - // Create a blueprint that uses func output as artifact - bp := newBPWithOutputArtifact() - bp = testutil.BlueprintWithConfigMap(bp) - bp, err := s.crCli.Blueprints(s.namespace).Create(bp) - c.Assert(err, IsNil) - - // Add an actionset that references that blueprint. - as := testutil.NewTestActionSet(s.namespace, bp.GetName(), "Deployment", s.deployment.GetName(), s.namespace) - as = testutil.ActionSetWithConfigMap(as, s.confimap.GetName()) - as, err = s.crCli.ActionSets(s.namespace).Create(as) - c.Assert(err, IsNil) - - err = s.waitOnActionSetState(c, as, crv1alpha1.StateRunning) - c.Assert(err, IsNil) - - // Check if the func returned expected output - c.Assert(testutil.OutputFuncOut(), DeepEquals, map[string]interface{}{"key": "myValue"}) - - err = s.waitOnActionSetState(c, as, crv1alpha1.StateComplete) - c.Assert(err, IsNil) - - // Check if the artifacts got updated correctly - as, err = s.crCli.ActionSets(as.GetNamespace()).Get(as.GetName(), metav1.GetOptions{}) - arts := as.Status.Actions[0].Artifacts - c.Assert(arts, NotNil) - c.Assert(arts, HasLen, 1) - keyVal := arts["myArt"].KeyValue - c.Assert(keyVal, DeepEquals, map[string]string{"key": "myValue"}) -} diff --git a/pkg/param/param.go b/pkg/param/param.go index d68409afd2..b020009e82 100644 --- a/pkg/param/param.go +++ b/pkg/param/param.go @@ -20,18 +20,19 @@ const timeFormat = time.RFC3339Nano // TemplateParams are the values that will change between separate runs of Phases. type TemplateParams struct { - StatefulSet *StatefulSetParams - Deployment *DeploymentParams - PVC *PVCParams - Namespace *NamespaceParams - ArtifactsIn map[string]crv1alpha1.Artifact - ConfigMaps map[string]v1.ConfigMap - Secrets map[string]v1.Secret - Time string - Profile *Profile - Options map[string]string - Object map[string]interface{} - Phases map[string]*Phase + StatefulSet *StatefulSetParams + Deployment *DeploymentParams + PVC *PVCParams + Namespace *NamespaceParams + ArtifactsIn map[string]crv1alpha1.Artifact + ArtifactsOut map[string]crv1alpha1.Artifact + ConfigMaps map[string]v1.ConfigMap + Secrets map[string]v1.Secret + Time string + Profile *Profile + Options map[string]string + Object map[string]interface{} + Phases map[string]*Phase } // StatefulSetParams are params for stateful sets. diff --git a/pkg/param/render_test.go b/pkg/param/render_test.go index 215426f526..ad796e0164 100644 --- a/pkg/param/render_test.go +++ b/pkg/param/render_test.go @@ -36,20 +36,24 @@ func (s *RenderSuite) TestRender(c *C) { checker: IsNil, }, { - arg: "{{ .Options.hello }}", + arg: "{{ .ArtifactsOut.hello.KeyValue }}", tp: TemplateParams{ - Options: map[string]string{ - "hello": "", + ArtifactsOut: map[string]crv1alpha1.Artifact{ + "hello": crv1alpha1.Artifact{}, }, }, - out: "", + out: "map[]", checker: IsNil, }, { - arg: "{{ .Options.hello }}", + arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", tp: TemplateParams{ - Options: map[string]string{ - "hello": "someValue", + ArtifactsOut: map[string]crv1alpha1.Artifact{ + "hello": crv1alpha1.Artifact{ + KeyValue: map[string]string{ + "someKey": "someValue", + }, + }, }, }, out: "someValue", @@ -57,10 +61,14 @@ func (s *RenderSuite) TestRender(c *C) { }, { // `-` cannot be used in a template path. - arg: "{{ .Options.hello-world }}", + arg: "{{ .ArtifactsOut.hello-world.KeyValue.someKey }}", tp: TemplateParams{ - Options: map[string]string{ - "hello-world": "someValue", + ArtifactsOut: map[string]crv1alpha1.Artifact{ + "hello-world": crv1alpha1.Artifact{ + KeyValue: map[string]string{ + "someKey": "someValue", + }, + }, }, }, out: "", @@ -68,11 +76,19 @@ func (s *RenderSuite) TestRender(c *C) { }, { // `-` can exist in artifact keys, it just cannot be used in path. - arg: "{{ .Options.hello }}", + arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", tp: TemplateParams{ - Options: map[string]string{ - "hello": "someValue", - "hello-world": "someValue", + ArtifactsOut: map[string]crv1alpha1.Artifact{ + "hello": crv1alpha1.Artifact{ + KeyValue: map[string]string{ + "someKey": "someValue", + }, + }, + "hello-world": crv1alpha1.Artifact{ + KeyValue: map[string]string{ + "someKey": "someValue", + }, + }, }, }, out: "someValue", @@ -104,9 +120,9 @@ func (s *RenderSuite) TestRender(c *C) { }, { // Render should fail if referenced key doesn't exist - arg: "{{ .Options.hello }}", + arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", tp: TemplateParams{ - Options: map[string]string{}, + ArtifactsOut: map[string]crv1alpha1.Artifact{}, }, checker: NotNil, }, diff --git a/pkg/phase_test.go b/pkg/phase_test.go index 17445d24dd..70415c30d9 100644 --- a/pkg/phase_test.go +++ b/pkg/phase_test.go @@ -42,20 +42,24 @@ func (s *PhaseSuite) TestExec(c *C) { }{ { artifact: "hello", - argument: "{{ .Options.test }} world", + argument: "{{ .ArtifactsOut.test.KeyValue.in }} world", expected: "hello world", }, { artifact: "HELLO", - argument: "{{ .Options.test | lower}} world", + argument: "{{ .ArtifactsOut.test.KeyValue.in | lower}} world", expected: "hello world", }, } { var output string tf := &testFunc{output: &output} tp := param.TemplateParams{ - Options: map[string]string{ - "test": tc.artifact, + ArtifactsOut: map[string]crv1alpha1.Artifact{ + "test": crv1alpha1.Artifact{ + KeyValue: map[string]string{ + "in": tc.artifact, + }, + }, }, } rawArgs := map[string]interface{}{ From f7fb375c207d8aa660ea17c9312f9cac4c6687bc Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Fri, 12 Oct 2018 08:32:41 -0700 Subject: [PATCH 14/18] Remove ArtifactsOut from docs (#4109) --- docs/templates.rst | 1 - docs/tooling.rst | 4 ++-- docs/tutorial.rst | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/templates.rst b/docs/templates.rst index 69af8d971e..e9697b3bf1 100644 --- a/docs/templates.rst +++ b/docs/templates.rst @@ -16,7 +16,6 @@ The TemplateParam struct is defined as: Deployment DeploymentParams PVC PVCParams ArtifactsIn map[string]crv1alpha1.Artifact // A Kanister Artifact - ArtifactsOut map[string]crv1alpha1.Artifact Profile *Profile ConfigMaps map[string]v1.ConfigMap Secrets map[string]v1.Secret diff --git a/docs/tooling.rst b/docs/tooling.rst index 15333ff1a5..6c019f5366 100644 --- a/docs/tooling.rst +++ b/docs/tooling.rst @@ -354,9 +354,9 @@ The following snippet is an example of using kando from inside a Blueprint. .. code-block:: console - kando location push --profile '{{ .Profile }}' --path '{{ .ArtifactsOut }}' - + kando location push --profile '{{ .Profile }}' --path '/backup/path' - - kando location delete --profile '{{ .Profile }}' --path '{{ .ArtifactsOut }}' + kando location delete --profile '{{ .Profile }}' --path '/backup/path' Docker Image diff --git a/docs/tutorial.rst b/docs/tutorial.rst index a0559e9f57..6d7645317b 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -435,7 +435,7 @@ ConfigMap. - | AWS_ACCESS_KEY_ID={{ .Secrets.aws.Data.aws_access_key_id | toString }} \ AWS_SECRET_ACCESS_KEY={{ .Secrets.aws.Data.aws_secret_access_key | toString }} \ - aws s3 cp /var/log/time.log {{ .ArtifactsOut.timeLog.KeyValue.path | quote }} + aws s3 cp /var/log/time.log {{ .ConfigMaps.location.Data.path }}/time-log/ EOF If you re-execute this Kanister Action, you'll be able to see the Artifact in the @@ -516,7 +516,7 @@ ConfigMap because the `inputArtifact` contains the fully specified path. - | AWS_ACCESS_KEY_ID={{ .Secrets.aws.Data.aws_access_key_id | toString }} \ AWS_SECRET_ACCESS_KEY={{ .Secrets.aws.Data.aws_secret_access_key | toString }} \ - aws s3 cp /var/log/time.log {{ .ArtifactsOut.timeLog.KeyValue.path | quote }} + aws s3 cp /var/log/time.log {{ .ConfigMaps.location.Data.path }}/time-log/ restore: type: Deployment secretNames: From b107e8acc3333699c690d858f68088b38143815c Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Fri, 12 Oct 2018 14:05:24 -0700 Subject: [PATCH 15/18] KubeExec: parse logs and return output (#4066) * KubeExec: parse logs and return output * Incorporated review comments --- pkg/function/kube_exec.go | 51 ++++++++++++++++++++++------------ pkg/function/kube_exec_test.go | 24 ++++++++++++++++ pkg/output/output.go | 13 +++++++-- 3 files changed, 68 insertions(+), 20 deletions(-) diff --git a/pkg/function/kube_exec.go b/pkg/function/kube_exec.go index 5413664e5f..7376e18623 100644 --- a/pkg/function/kube_exec.go +++ b/pkg/function/kube_exec.go @@ -5,10 +5,12 @@ import ( "regexp" "strings" - log "github.com/sirupsen/logrus" + "github.com/pkg/errors" kanister "github.com/kanisterio/kanister/pkg" + "github.com/kanisterio/kanister/pkg/format" "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/output" "github.com/kanisterio/kanister/pkg/param" ) @@ -33,6 +35,30 @@ func (*kubeExecFunc) Name() string { return "KubeExec" } +func parseLogAndCreateOutput(out string) (map[string]interface{}, error) { + if out == "" { + return nil, nil + } + var op map[string]interface{} + logs := regexp.MustCompile("[\n]").Split(out, -1) + for _, l := range logs { + // Log should contain "###Phase-output###:" string + if strings.Contains(l, output.PhaseOpString) { + if op == nil { + op = make(map[string]interface{}) + } + pattern := regexp.MustCompile(`###Phase-output###:(.*?)*$`) + match := pattern.FindAllStringSubmatch(l, 1) + opObj, err := output.UnmarshalOutput(match[0][1]) + if err != nil { + return nil, err + } + op[opObj.Key] = opObj.Value + } + } + return op, nil +} + func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { cli, err := kube.NewClient() if err != nil { @@ -54,23 +80,14 @@ func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args } stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd) - if stdout != "" { - logs := regexp.MustCompile("[\r\n]").Split(stdout, -1) - for _, stdoutLog := range logs { - if strings.TrimSpace(stdoutLog) != "" { - log.Info(stdoutLog) - } - } - } - if stderr != "" { - logs := regexp.MustCompile("[\r\n]").Split(stderr, -1) - for _, stderrLog := range logs { - if strings.TrimSpace(stderrLog) != "" { - log.Info(stderrLog) - } - } + format.Log(pod, container, stdout) + format.Log(pod, container, stderr) + if err != nil { + return nil, err } - return nil, err + + out, err := parseLogAndCreateOutput(stdout) + return out, errors.Wrap(err, "Failed to generate output") } func (*kubeExecFunc) RequiredArgs() []string { diff --git a/pkg/function/kube_exec_test.go b/pkg/function/kube_exec_test.go index 76ac9a08c9..4ec9f0f3fa 100644 --- a/pkg/function/kube_exec_test.go +++ b/pkg/function/kube_exec_test.go @@ -149,3 +149,27 @@ func (s *KubeExecTest) TestKubeExec(c *C) { c.Assert(err, IsNil) } } + +func (s *KubeExecTest) TestParseLogAndCreateOutput(c *C) { + for _, tc := range []struct { + log string + expected map[string]interface{} + errChecker Checker + outChecker Checker + }{ + {"###Phase-output###: {\"key\":\"version\",\"value\":\"0.12.0\"}", map[string]interface{}{"version": "0.12.0"}, IsNil, NotNil}, + {"###Phase-output###: {\"key\":\"version\",\"value\":\"0.12.0\"}\n###Phase-output###: {\"key\":\"path\",\"value\":\"/backup/path\"}", + map[string]interface{}{"version": "0.12.0", "path": "/backup/path"}, IsNil, NotNil}, + {"Random message ###Phase-output###: {\"key\":\"version\",\"value\":\"0.12.0\"}", map[string]interface{}{"version": "0.12.0"}, IsNil, NotNil}, + {"Random message with newline \n###Phase-output###: {\"key\":\"version\",\"value\":\"0.12.0\"}", map[string]interface{}{"version": "0.12.0"}, IsNil, NotNil}, + {"###Phase-output###: Invalid message", nil, NotNil, IsNil}, + {"Random message", nil, IsNil, IsNil}, + } { + out, err := parseLogAndCreateOutput(tc.log) + c.Check(err, tc.errChecker) + c.Check(out, tc.outChecker) + if out != nil { + c.Check(out, DeepEquals, tc.expected) + } + } +} diff --git a/pkg/output/output.go b/pkg/output/output.go index c5c9feaf8d..a2251e8a92 100644 --- a/pkg/output/output.go +++ b/pkg/output/output.go @@ -9,7 +9,7 @@ import ( ) const ( - phaseOpString = "###Phase-output###:" + PhaseOpString = "###Phase-output###:" ) type Output struct { @@ -24,11 +24,18 @@ func marshalOutput(key, value string) (string, error) { } outString, err := json.Marshal(out) if err != nil { - return "", errors.Wrapf(err, "Failed to marshall key-value pair") + return "", errors.Wrap(err, "Failed to marshal key-value pair") } return string(outString), nil } +// UnmarshalOutput unmarshals output json into Output struct +func UnmarshalOutput(opString string) (*Output, error) { + p := &Output{} + err := json.Unmarshal([]byte(opString), p) + return p, errors.Wrap(err, "Failed to unmarshal key-value pair") +} + // ValidateKey validates the key argument func ValidateKey(key string) error { // key should be non-empty @@ -49,6 +56,6 @@ func PrintOutput(key, value string) error { if err != nil { return err } - fmt.Println(phaseOpString, outString) + fmt.Println(PhaseOpString, outString) return nil } From 61cc8a299a33bcff119a3a09d689d8ef30e85603 Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Sun, 14 Oct 2018 21:13:22 -0700 Subject: [PATCH 16/18] Render Artifacts after execution of all phases (#4125) * WIP Render ArtifactsOut after all the phases * Render arts inside go routine * Remove ArtifactsOut from TemplateParams * Add Unit test to check artifacts update * Fix ActionSet update * Restructure actionset update * Minor log update --- pkg/controller/controller.go | 54 ++++++++++++++++++++++++----- pkg/controller/controller_test.go | 57 +++++++++++++++++++++++++++++++ pkg/param/param.go | 25 +++++++------- pkg/param/render_test.go | 48 +++++++++----------------- pkg/phase_test.go | 12 +++---- pkg/validate/validate.go | 3 -- 6 files changed, 135 insertions(+), 64 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9603186fea..ee295a21d3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -205,6 +205,8 @@ func (c *Controller) onUpdateActionSet(oldAS, newAS *crv1alpha1.ActionSet) error if newAS.Status == nil || newAS.Status.State != crv1alpha1.StateRunning { if newAS.Status == nil { log.Infof("Updated ActionSet '%s' Status->nil", newAS.Name) + } else if newAS.Status.State == crv1alpha1.StateComplete { + c.logAndSuccessEvent(fmt.Sprintf("Updated ActionSet '%s' Status->%s", newAS.Name, newAS.Status.State), "Update Complete", newAS) } else { log.Infof("Updated ActionSet '%s' Status->%s", newAS.Name, newAS.Status.State) } @@ -346,12 +348,6 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI if err != nil { return err } - artTpls := as.Status.Actions[aIDX].Artifacts - arts, err := param.RenderArtifacts(artTpls, *tp) - if err != nil { - return err - } - tp.ArtifactsOut = arts phases, err := kanister.GetPhases(*bp, action.Name, *tp) if err != nil { return err @@ -363,7 +359,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI err = param.InitPhaseParams(ctx, c.clientset, tp, p.Name(), p.Objects()) if err != nil { reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name) - msg := fmt.Sprintf("Failed to execute phase: %#v:", as.Status.Actions[aIDX].Phases[i]) + msg := fmt.Sprintf("Failed to init phase params: %#v:", as.Status.Actions[aIDX].Phases[i]) c.logAndErrorEvent(msg, reason, err, as, bp) return } @@ -377,7 +373,6 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI } } else { rf = func(ras *crv1alpha1.ActionSet) error { - ras.Status.Actions[aIDX].Artifacts = arts ras.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateComplete ras.Status.Actions[aIDX].Phases[i].Output = output return nil @@ -398,6 +393,49 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI param.UpdatePhaseParams(ctx, tp, p.Name(), output) c.logAndSuccessEvent(fmt.Sprintf("Completed phase %s", p.Name()), "Ended Phase", as) } + // Check if output artifacts are present + artTpls := as.Status.Actions[aIDX].Artifacts + if len(artTpls) == 0 { + // No artifacts, set ActionSetStatus to complete + if rErr := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), ns, name, func(ras *crv1alpha1.ActionSet) error { + ras.Status.State = crv1alpha1.StateComplete + return nil + }); rErr != nil { + reason := fmt.Sprintf("ActionSetFailed Action: %s", action.Name) + msg := fmt.Sprintf("Failed to update ActionSet: %s", name) + c.logAndErrorEvent(msg, reason, err, as, bp) + } + return + } + // Render the artifacts + arts, err := param.RenderArtifacts(artTpls, *tp) + var af func(*crv1alpha1.ActionSet) error + if err != nil { + af = func(ras *crv1alpha1.ActionSet) error { + ras.Status.State = crv1alpha1.StateFailed + return nil + } + } else { + af = func(ras *crv1alpha1.ActionSet) error { + ras.Status.Actions[aIDX].Artifacts = arts + ras.Status.State = crv1alpha1.StateComplete + return nil + } + } + // Update ActionSet with artifacts + if aErr := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), ns, name, af); aErr != nil { + reason := fmt.Sprintf("ActionSetFailed Action: %s", action.Name) + msg := fmt.Sprintf("Failed to update Output Artifacts: %#v:", artTpls) + c.logAndErrorEvent(msg, reason, aErr, as, bp) + return + } + // Failed to render artifacts + if err != nil { + reason := fmt.Sprintf("ActionSetFailed Action: %s", action.Name) + msg := fmt.Sprintf("Failed to render Output Artifacts: %#v:", artTpls) + c.logAndErrorEvent(msg, reason, err, as, bp) + return + } }() return nil } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index e27b448bb9..23b81dbd0b 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -146,6 +146,32 @@ func (s *ControllerSuite) waitOnActionSetState(c *C, as *crv1alpha1.ActionSet, s return errors.Wrapf(err, "State '%s' never reached", state) } +func newBPWithOutputArtifact() *crv1alpha1.Blueprint { + return &crv1alpha1.Blueprint{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-blueprint-", + }, + Actions: map[string]*crv1alpha1.BlueprintAction{ + "myAction": &crv1alpha1.BlueprintAction{ + OutputArtifacts: map[string]crv1alpha1.Artifact{ + "myArt": crv1alpha1.Artifact{ + KeyValue: map[string]string{ + "key": "{{ .Phases.myPhase0.Output.key }}", + }, + }, + }, + Kind: "Deployment", + Phases: []crv1alpha1.BlueprintPhase{ + { + Name: "myPhase0", + Func: testutil.OutputFuncName, + }, + }, + }, + }, + } +} + func (s *ControllerSuite) TestEmptyActionSetStatus(c *C) { as := &crv1alpha1.ActionSet{ ObjectMeta: metav1.ObjectMeta{ @@ -338,3 +364,34 @@ func (s *ControllerSuite) TestRuntimeObjEventLogs(c *C) { c.Assert(err, NotNil) c.Assert(len(events.Items), Equals, 0) } + +func (s *ControllerSuite) TestPhaseOutputAsArtifact(c *C) { + // Create a blueprint that uses func output as artifact + bp := newBPWithOutputArtifact() + bp = testutil.BlueprintWithConfigMap(bp) + bp, err := s.crCli.Blueprints(s.namespace).Create(bp) + c.Assert(err, IsNil) + + // Add an actionset that references that blueprint. + as := testutil.NewTestActionSet(s.namespace, bp.GetName(), "Deployment", s.deployment.GetName(), s.namespace) + as = testutil.ActionSetWithConfigMap(as, s.confimap.GetName()) + as, err = s.crCli.ActionSets(s.namespace).Create(as) + c.Assert(err, IsNil) + + err = s.waitOnActionSetState(c, as, crv1alpha1.StateRunning) + c.Assert(err, IsNil) + + // Check if the func returned expected output + c.Assert(testutil.OutputFuncOut(), DeepEquals, map[string]interface{}{"key": "myValue"}) + + err = s.waitOnActionSetState(c, as, crv1alpha1.StateComplete) + c.Assert(err, IsNil) + + // Check if the artifacts got updated correctly + as, err = s.crCli.ActionSets(as.GetNamespace()).Get(as.GetName(), metav1.GetOptions{}) + arts := as.Status.Actions[0].Artifacts + c.Assert(arts, NotNil) + c.Assert(arts, HasLen, 1) + keyVal := arts["myArt"].KeyValue + c.Assert(keyVal, DeepEquals, map[string]string{"key": "myValue"}) +} diff --git a/pkg/param/param.go b/pkg/param/param.go index b020009e82..d68409afd2 100644 --- a/pkg/param/param.go +++ b/pkg/param/param.go @@ -20,19 +20,18 @@ const timeFormat = time.RFC3339Nano // TemplateParams are the values that will change between separate runs of Phases. type TemplateParams struct { - StatefulSet *StatefulSetParams - Deployment *DeploymentParams - PVC *PVCParams - Namespace *NamespaceParams - ArtifactsIn map[string]crv1alpha1.Artifact - ArtifactsOut map[string]crv1alpha1.Artifact - ConfigMaps map[string]v1.ConfigMap - Secrets map[string]v1.Secret - Time string - Profile *Profile - Options map[string]string - Object map[string]interface{} - Phases map[string]*Phase + StatefulSet *StatefulSetParams + Deployment *DeploymentParams + PVC *PVCParams + Namespace *NamespaceParams + ArtifactsIn map[string]crv1alpha1.Artifact + ConfigMaps map[string]v1.ConfigMap + Secrets map[string]v1.Secret + Time string + Profile *Profile + Options map[string]string + Object map[string]interface{} + Phases map[string]*Phase } // StatefulSetParams are params for stateful sets. diff --git a/pkg/param/render_test.go b/pkg/param/render_test.go index ad796e0164..215426f526 100644 --- a/pkg/param/render_test.go +++ b/pkg/param/render_test.go @@ -36,24 +36,20 @@ func (s *RenderSuite) TestRender(c *C) { checker: IsNil, }, { - arg: "{{ .ArtifactsOut.hello.KeyValue }}", + arg: "{{ .Options.hello }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "hello": crv1alpha1.Artifact{}, + Options: map[string]string{ + "hello": "", }, }, - out: "map[]", + out: "", checker: IsNil, }, { - arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", + arg: "{{ .Options.hello }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "hello": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "someKey": "someValue", - }, - }, + Options: map[string]string{ + "hello": "someValue", }, }, out: "someValue", @@ -61,14 +57,10 @@ func (s *RenderSuite) TestRender(c *C) { }, { // `-` cannot be used in a template path. - arg: "{{ .ArtifactsOut.hello-world.KeyValue.someKey }}", + arg: "{{ .Options.hello-world }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "hello-world": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "someKey": "someValue", - }, - }, + Options: map[string]string{ + "hello-world": "someValue", }, }, out: "", @@ -76,19 +68,11 @@ func (s *RenderSuite) TestRender(c *C) { }, { // `-` can exist in artifact keys, it just cannot be used in path. - arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", + arg: "{{ .Options.hello }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "hello": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "someKey": "someValue", - }, - }, - "hello-world": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "someKey": "someValue", - }, - }, + Options: map[string]string{ + "hello": "someValue", + "hello-world": "someValue", }, }, out: "someValue", @@ -120,9 +104,9 @@ func (s *RenderSuite) TestRender(c *C) { }, { // Render should fail if referenced key doesn't exist - arg: "{{ .ArtifactsOut.hello.KeyValue.someKey }}", + arg: "{{ .Options.hello }}", tp: TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{}, + Options: map[string]string{}, }, checker: NotNil, }, diff --git a/pkg/phase_test.go b/pkg/phase_test.go index 70415c30d9..17445d24dd 100644 --- a/pkg/phase_test.go +++ b/pkg/phase_test.go @@ -42,24 +42,20 @@ func (s *PhaseSuite) TestExec(c *C) { }{ { artifact: "hello", - argument: "{{ .ArtifactsOut.test.KeyValue.in }} world", + argument: "{{ .Options.test }} world", expected: "hello world", }, { artifact: "HELLO", - argument: "{{ .ArtifactsOut.test.KeyValue.in | lower}} world", + argument: "{{ .Options.test | lower}} world", expected: "hello world", }, } { var output string tf := &testFunc{output: &output} tp := param.TemplateParams{ - ArtifactsOut: map[string]crv1alpha1.Artifact{ - "test": crv1alpha1.Artifact{ - KeyValue: map[string]string{ - "in": tc.artifact, - }, - }, + Options: map[string]string{ + "test": tc.artifact, }, } rawArgs := map[string]interface{}{ diff --git a/pkg/validate/validate.go b/pkg/validate/validate.go index b9dfc4fa0f..8cf4602f3b 100644 --- a/pkg/validate/validate.go +++ b/pkg/validate/validate.go @@ -91,9 +91,6 @@ func actionSetStatus(as *crv1alpha1.ActionSetStatus) error { return errorf("ActionSet cannot be complete if any actions are not complete") } } - if saw[crv1alpha1.StateFailed] != (as.State == crv1alpha1.StateFailed) { - return errorf("Iff any action is failed, the whole ActionSet must be failed") - } return nil } From 808802b667b25e1c7f032c7027cc5c3f41a5f7f0 Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Mon, 15 Oct 2018 11:06:00 -0700 Subject: [PATCH 17/18] Fix actionset based kube tests (#4127) * Fix actionset based kube tests * Minor readability fix --- pkg/controller/controller.go | 32 ++++++++++++-------------------- pkg/validate/validate.go | 3 +++ 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ee295a21d3..ce70072478 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -220,8 +220,9 @@ func (c *Controller) onUpdateActionSet(oldAS, newAS *crv1alpha1.ActionSet) error } } } - newAS.Status.State = crv1alpha1.StateComplete - c.logAndSuccessEvent(fmt.Sprintf("Updated ActionSet '%s' Status->%s", newAS.Name, newAS.Status.State), "Update Complete", newAS) + if len(newAS.Status.Actions) != 0 { + return nil + } return reconcile.ActionSet(context.TODO(), c.crClient.CrV1alpha1(), newAS.GetNamespace(), newAS.GetName(), func(ras *crv1alpha1.ActionSet) error { ras.Status.State = crv1alpha1.StateComplete return nil @@ -409,18 +410,16 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI } // Render the artifacts arts, err := param.RenderArtifacts(artTpls, *tp) - var af func(*crv1alpha1.ActionSet) error if err != nil { - af = func(ras *crv1alpha1.ActionSet) error { - ras.Status.State = crv1alpha1.StateFailed - return nil - } - } else { - af = func(ras *crv1alpha1.ActionSet) error { - ras.Status.Actions[aIDX].Artifacts = arts - ras.Status.State = crv1alpha1.StateComplete - return nil - } + reason := fmt.Sprintf("ActionSetFailed Action: %s", action.Name) + msg := fmt.Sprintf("Failed to render Output Artifacts: %#v:", artTpls) + c.logAndErrorEvent(msg, reason, err, as, bp) + return + } + af := func(ras *crv1alpha1.ActionSet) error { + ras.Status.Actions[aIDX].Artifacts = arts + ras.Status.State = crv1alpha1.StateComplete + return nil } // Update ActionSet with artifacts if aErr := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), ns, name, af); aErr != nil { @@ -429,13 +428,6 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI c.logAndErrorEvent(msg, reason, aErr, as, bp) return } - // Failed to render artifacts - if err != nil { - reason := fmt.Sprintf("ActionSetFailed Action: %s", action.Name) - msg := fmt.Sprintf("Failed to render Output Artifacts: %#v:", artTpls) - c.logAndErrorEvent(msg, reason, err, as, bp) - return - } }() return nil } diff --git a/pkg/validate/validate.go b/pkg/validate/validate.go index 8cf4602f3b..b9dfc4fa0f 100644 --- a/pkg/validate/validate.go +++ b/pkg/validate/validate.go @@ -91,6 +91,9 @@ func actionSetStatus(as *crv1alpha1.ActionSetStatus) error { return errorf("ActionSet cannot be complete if any actions are not complete") } } + if saw[crv1alpha1.StateFailed] != (as.State == crv1alpha1.StateFailed) { + return errorf("Iff any action is failed, the whole ActionSet must be failed") + } return nil } From 6367d83ad1d25a5b8be1211016e5e72561b863c4 Mon Sep 17 00:00:00 2001 From: DeepikaDixit Date: Mon, 15 Oct 2018 15:03:34 -0700 Subject: [PATCH 18/18] Enum types follow PascalCasing (#4128) --- pkg/objectstore/const.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/objectstore/const.go b/pkg/objectstore/const.go index a33b8d8e62..e41d08c3bc 100644 --- a/pkg/objectstore/const.go +++ b/pkg/objectstore/const.go @@ -22,10 +22,10 @@ const ( type SecretType string const ( - // SecretTypeAwsAccessKey captures enum value "awsAccessKey" - SecretTypeAwsAccessKey SecretType = "awsAccessKey" - // SecretTypeGcpServiceAccountKey captures enum value "gcpServiceAccountKey" - SecretTypeGcpServiceAccountKey SecretType = "gcpServiceAccountKey" - // SecretTypeAzStorageAccount captures enum value "azStorageAccount" - SecretTypeAzStorageAccount SecretType = "azStorageAccount" + // SecretTypeAwsAccessKey captures enum value "AwsAccessKey" + SecretTypeAwsAccessKey SecretType = "AwsAccessKey" + // SecretTypeGcpServiceAccountKey captures enum value "GcpServiceAccountKey" + SecretTypeGcpServiceAccountKey SecretType = "GcpServiceAccountKey" + // SecretTypeAzStorageAccount captures enum value "AzStorageAccount" + SecretTypeAzStorageAccount SecretType = "AzStorageAccount" )