Skip to content

Commit

Permalink
Support return values from Kanister functions - Phase 1 (#3984)
Browse files Browse the repository at this point in the history
* Support return values from Kanister functions

* Change all Kanister functions to return output

* Change function signature in all the tests

* Move RenderArgs logic into Exec

* Add output function unit test

* Fix phase unit test

* Minor improvement to func unit test
  • Loading branch information
pavannd1 authored and Ilya Kislenko committed Oct 2, 2018
1 parent 0ae887c commit 0d05fec
Show file tree
Hide file tree
Showing 24 changed files with 178 additions and 122 deletions.
8 changes: 8 additions & 0 deletions pkg/apis/cr/v1alpha1/deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,11 @@ func (in *BlueprintPhase) DeepCopyInto(out *BlueprintPhase) {
// TODO: Handle 'Args'
return
}

// DeepCopyInto handles the Phase deep copies, copying the receiver, writing into out. in must be non-nil.
// This is a workaround to handle the map[string]interface{} output type
func (in *Phase) DeepCopyInto(out *Phase) {
*out = *in
// TODO: Handle 'Output' map[string]interface{}
return
}
5 changes: 3 additions & 2 deletions pkg/apis/cr/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ const (

// Phase is subcomponent of an action.
type Phase struct {
Name string `json:"name"`
State State `json:"state"`
Name string `json:"name"`
State State `json:"state"`
Output map[string]interface{} `json:"output"`
}

// k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
10 changes: 3 additions & 7 deletions pkg/apis/cr/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,13 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI
for i, p := range phases {
c.logAndSuccessEvent(fmt.Sprintf("Executing phase %s", p.Name()), "Started Phase", as)
err = param.InitPhaseParams(ctx, c.clientset, tp, p.Name(), p.Objects())
if err == nil {
err = p.Exec(ctx, *tp)
if err != nil {
reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name)
msg := fmt.Sprintf("Failed to execute phase: %#v:", as.Status.Actions[aIDX].Phases[i])
c.logAndErrorEvent(msg, reason, err, as, bp)
return
}
output, err := p.Exec(ctx, *bp, action.Name, *tp)
var rf func(*crv1alpha1.ActionSet) error
if err != nil {
rf = func(ras *crv1alpha1.ActionSet) error {
Expand All @@ -375,6 +379,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI
rf = func(ras *crv1alpha1.ActionSet) error {
ras.Status.Actions[aIDX].Artifacts = arts
ras.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateComplete
ras.Status.Actions[aIDX].Phases[i].Output = output
return nil
}
}
Expand All @@ -390,7 +395,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI
c.logAndErrorEvent(msg, reason, err, as, bp)
return
}
param.UpdatePhaseParams(ctx, tp, p.Name(), nil)
param.UpdatePhaseParams(ctx, tp, p.Name(), output)
c.logAndSuccessEvent(fmt.Sprintf("Completed phase %s", p.Name()), "Ended Phase", as)
}
}()
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func (s *ControllerSuite) TestExecActionSet(c *C) {
{
funcNames: []string{testutil.ArgFuncName, testutil.FailFuncName},
},
{
funcNames: []string{testutil.OutputFuncName},
},
} {
var err error
// Add a blueprint with a mocked kanister function.
Expand Down Expand Up @@ -259,6 +262,8 @@ func (s *ControllerSuite) TestExecActionSet(c *C) {
testutil.ReleaseWaitFunc()
case testutil.ArgFuncName:
c.Assert(testutil.ArgFuncArgs(), DeepEquals, map[string]interface{}{"key": "myValue"})
case testutil.OutputFuncName:
c.Assert(testutil.OutputFuncOut(), DeepEquals, map[string]interface{}{"key": "myValue"})
}
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,34 @@ func validateProfile(profile *param.Profile) error {
return nil
}

func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error {
func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, pod, container, includePath, backupArtifactPrefix, backupIdentifier string
var err error
if err = Arg(args, BackupDataNamespaceArg, &namespace); err != nil {
return err
return nil, err
}
if err = Arg(args, BackupDataPodArg, &pod); err != nil {
return err
return nil, err
}
if err = Arg(args, BackupDataContainerArg, &container); err != nil {
return err
return nil, err
}
if err = Arg(args, BackupDataIncludePathArg, &includePath); err != nil {
return err
return nil, err
}
if err = Arg(args, BackupDataBackupArtifactPrefixArg, &backupArtifactPrefix); err != nil {
return err
return nil, err
}
if err = Arg(args, BackupDataBackupIdentifierArg, &backupIdentifier); err != nil {
return err
return nil, err
}
// Validate the Profile
if err = validateProfile(tp.Profile); err != nil {
return errors.Wrapf(err, "Failed to validate Profile")
return nil, errors.Wrapf(err, "Failed to validate Profile")
}
cli, err := kube.NewClient()
if err != nil {
return errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
// Use the snapshots command to check if the repository exists
cmd := generateSnapshotsCommand(backupArtifactPrefix, tp.Profile)
Expand All @@ -110,7 +110,7 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
formatAndLog(pod, container, stdout)
formatAndLog(pod, container, stderr)
if err != nil {
return errors.Wrapf(err, "Failed to create object store backup location")
return nil, errors.Wrapf(err, "Failed to create object store backup location")
}
}
// Create backup and dump it on the object store
Expand All @@ -119,9 +119,9 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
formatAndLog(pod, container, stdout)
formatAndLog(pod, container, stderr)
if err != nil {
return errors.Wrapf(err, "Failed to create and upload backup")
return nil, errors.Wrapf(err, "Failed to create and upload backup")
}
return nil
return nil, nil
}

func (*backupDataFunc) RequiredArgs() []string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *DataSuite) TestBackupRestoreData(c *C) {
phases, err := kanister.GetPhases(bp, actionName, *tp)
c.Assert(err, IsNil)
for _, p := range phases {
err := p.Exec(context.Background(), *tp)
_, err = p.Exec(context.Background(), bp, actionName, *tp)
c.Assert(err, IsNil)
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,23 @@ func generateDeleteCommand(artifact string, profile *param.Profile) []string {
return []string{"bash", "-o", "errexit", "-o", "pipefail", "-c", command}
}

func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error {
func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var artifact, namespace string
var err error
if err = Arg(args, DeleteDataArtifactArg, &artifact); err != nil {
return err
return nil, err
}
if err = OptArg(args, DeleteDataNamespaceArg, &namespace, ""); err != nil {
return err
return nil, err
}
// Validate the Profile
if err = validateProfile(tp.Profile); err != nil {
return errors.Wrapf(err, "Failed to validate Profile")
return nil, errors.Wrapf(err, "Failed to validate Profile")
}
// Generate delete command
cmd := generateDeleteCommand(artifact, tp.Profile)
// Use KubeTask to delete the artifact
return kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.12.0", cmd)
return nil, kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.12.0", cmd)
}

func (*deleteDataFunc) RequiredArgs() []string {
Expand Down
14 changes: 7 additions & 7 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,24 @@ func (*kubeExecFunc) Name() string {
return "KubeExec"
}

func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error {
func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
cli, err := kube.NewClient()
if err != nil {
return err
return nil, err
}
var namespace, pod, container string
var cmd []string
if err = Arg(args, KubeExecNamespaceArg, &namespace); err != nil {
return err
return nil, err
}
if err = Arg(args, KubeExecPodNameArg, &pod); err != nil {
return err
return nil, err
}
if err = Arg(args, KubeExecContainerNameArg, &container); err != nil {
return err
return nil, err
}
if err = Arg(args, KubeExecCommandArg, &cmd); err != nil {
return err
return nil, err
}

stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd)
Expand All @@ -70,7 +70,7 @@ func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args
}
}
}
return err
return nil, err
}

func (*kubeExecFunc) RequiredArgs() []string {
Expand Down
14 changes: 7 additions & 7 deletions pkg/function/kube_exec_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,28 @@ func (*kubeExecAllFunc) Name() string {
return "KubeExecAll"
}

func (*kubeExecAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error {
func (*kubeExecAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
cli, err := kube.NewClient()
if err != nil {
return err
return nil, err
}
var namespace, pods, containers string
var cmd []string
if err = Arg(args, KubeExecAllNamespaceArg, &namespace); err != nil {
return err
return nil, err
}
if err = Arg(args, KubeExecAllPodsNameArg, &pods); err != nil {
return err
return nil, err
}
if err = Arg(args, KubeExecAllContainersNameArg, &containers); err != nil {
return err
return nil, err
}
if err = Arg(args, KubeExecAllCommandArg, &cmd); err != nil {
return err
return nil, err
}
ps := strings.Fields(pods)
cs := strings.Fields(containers)
return execAll(cli, namespace, ps, cs, cmd)
return nil, execAll(cli, namespace, ps, cs, cmd)
}

func (*kubeExecAllFunc) RequiredArgs() []string {
Expand Down
10 changes: 6 additions & 4 deletions pkg/function/kube_exec_all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func (s *KubeExecAllTest) TestKubeExecAllDeployment(c *C) {
c.Assert(err, IsNil)

action := "echo"
phases, err := kanister.GetPhases(*newExecAllBlueprint(kind), action, *tp)
bp := newExecAllBlueprint(kind)
phases, err := kanister.GetPhases(*bp, action, *tp)
c.Assert(err, IsNil)
for _, p := range phases {
err = p.Exec(ctx, *tp)
_, err = p.Exec(ctx, *bp, action, *tp)
c.Assert(err, IsNil)
}
}
Expand Down Expand Up @@ -145,10 +146,11 @@ func (s *KubeExecAllTest) TestKubeExecAllStatefulSet(c *C) {
c.Assert(err, IsNil)

action := "echo"
phases, err := kanister.GetPhases(*newExecAllBlueprint(kind), action, *tp)
bp := newExecAllBlueprint(kind)
phases, err := kanister.GetPhases(*bp, action, *tp)
c.Assert(err, IsNil)
for _, p := range phases {
err = p.Exec(ctx, *tp)
_, err = p.Exec(ctx, *bp, action, *tp)
c.Assert(err, IsNil)
}
}
5 changes: 3 additions & 2 deletions pkg/function/kube_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ func (s *KubeExecTest) TestKubeExec(c *C) {
c.Assert(err, IsNil)

action := "echo"
phases, err := kanister.GetPhases(*newKubeExecBlueprint(), action, *tp)
bp := newKubeExecBlueprint()
phases, err := kanister.GetPhases(*bp, action, *tp)
c.Assert(err, IsNil)
for _, p := range phases {
err := p.Exec(context.Background(), *tp)
_, err = p.Exec(context.Background(), *bp, action, *tp)
c.Assert(err, IsNil)
}
}
10 changes: 5 additions & 5 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,20 @@ func kubeTask(ctx context.Context, namespace, image string, command []string) er
return nil
}

func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) error {
func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, image string
var command []string
var err error
if err = Arg(args, KubeTaskImageArg, &image); err != nil {
return err
return nil, err
}
if err = Arg(args, KubeTaskCommandArg, &command); err != nil {
return err
return nil, err
}
if err = OptArg(args, KubeTaskNamespaceArg, &namespace, ""); err != nil {
return err
return nil, err
}
return kubeTask(ctx, namespace, image, command)
return nil, kubeTask(ctx, namespace, image, command)
}

func (*kubeTaskFunc) RequiredArgs() []string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/kube_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) {
phases, err := kanister.GetPhases(*bp, action, tp)
c.Assert(err, IsNil)
for _, p := range phases {
err := p.Exec(ctx, tp)
_, err = p.Exec(ctx, *bp, action, tp)
c.Assert(err, IsNil)
}
}
Loading

0 comments on commit 0d05fec

Please sign in to comment.