Skip to content

Commit

Permalink
Add new kanister function Wait (#1091)
Browse files Browse the repository at this point in the history
* Add helper function to resolve jsonpath

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* New jsonpath pkg

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Add new kanister function Wait

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Fix wait unit tests

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Increase timeout in tests

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Improve error message

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Happy golint

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Use kanister-tools image for test deploy

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

Co-authored-by: Pavan Navarathna <pavan@kasten.io>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 22, 2021
1 parent d78a543 commit 049fcaf
Show file tree
Hide file tree
Showing 3 changed files with 374 additions and 0 deletions.
176 changes: 176 additions & 0 deletions pkg/function/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2021 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package function

import (
"bytes"
"context"
"fmt"
"strings"
"text/template"
"time"

"github.com/Masterminds/sprig"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"

kanister "github.com/kanisterio/kanister/pkg"
crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/jsonpath"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/log"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/poll"
)

type WaitConditions struct {
AnyOf []Condition
AllOf []Condition
}

type Condition struct {
ObjectReference crv1alpha1.ObjectReference
Condition string
}

const (
// WaitFuncName specifies the function name
WaitFuncName = "Wait"
WaitTimeoutArg = "timeout"
WaitConditionsArg = "conditions"
)

func init() {
_ = kanister.Register(&waitFunc{})
}

var _ kanister.Func = (*waitFunc)(nil)

type waitFunc struct{}

func (*waitFunc) Name() string {
return WaitFuncName
}

func (ktf *waitFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var timeout string
var conditions WaitConditions
var err error
if err = Arg(args, WaitTimeoutArg, &timeout); err != nil {
return nil, err
}
if err = Arg(args, WaitConditionsArg, &conditions); err != nil {
return nil, err
}
dynCli, err := kube.NewDynamicClient()
if err != nil {
return nil, err
}
timeoutDur, err := time.ParseDuration(timeout)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse timeout")
}
err = waitForCondition(ctx, dynCli, conditions, timeoutDur)
return nil, err
}

func (*waitFunc) RequiredArgs() []string {
return []string{WaitTimeoutArg, WaitConditionsArg}
}

// waitForCondition wait till the condition satisfies within the timeout duration
func waitForCondition(ctx context.Context, dynCli dynamic.Interface, waitCond WaitConditions, timeout time.Duration) error {
ctxTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
var evalErr error
result := false
err := poll.Wait(ctxTimeout, func(ctx context.Context) (bool, error) {
for _, cond := range waitCond.AnyOf {
result, evalErr = evaluateCondition(ctx, dynCli, cond)
if evalErr != nil {
// TODO: Fail early if the error is due to jsonpath syntax
log.Debug().WithError(evalErr).Print("Failed to evaluate the condition")
return false, nil
}
if result {
return true, nil
}
}
for _, cond := range waitCond.AllOf {
result, evalErr = evaluateCondition(ctx, dynCli, cond)
if evalErr != nil {
// TODO: Fail early if the error is due to jsonpath syntax
log.Debug().WithError(evalErr).Print("Failed to evaluate the condition")
return false, nil
}
if !result {
return false, nil
}
}
return false, nil
})
err = errors.Wrap(err, "Failed to wait for the condition to be met")
if evalErr != nil {
return errors.Wrap(err, evalErr.Error())
}
return err
}

// evaluateCondition evaluate the go template condition
func evaluateCondition(ctx context.Context, dynCli dynamic.Interface, cond Condition) (bool, error) {
obj, err := fetchObjectFromRef(ctx, dynCli, cond.ObjectReference)
if err != nil {
return false, err
}
rcondition, err := resolveJsonpath(obj, cond.Condition)
if err != nil {
return false, err
}
log.Debug().Print(fmt.Sprintf("Resolved jsonpath: %s", rcondition))
t, err := template.New("config").Option("missingkey=zero").Funcs(sprig.TxtFuncMap()).Parse(rcondition)
if err != nil {
return false, errors.WithStack(err)
}
buf := bytes.NewBuffer(nil)
if err = t.Execute(buf, nil); err != nil {
return false, errors.WithStack(err)
}
return strings.TrimSpace(buf.String()) == "true", nil
}

func fetchObjectFromRef(ctx context.Context, dynCli dynamic.Interface, objRef crv1alpha1.ObjectReference) (runtime.Object, error) {
gvr := schema.GroupVersionResource{Group: objRef.Group, Version: objRef.APIVersion, Resource: objRef.Resource}
if objRef.Namespace != "" {
return dynCli.Resource(gvr).Namespace(objRef.Namespace).Get(ctx, objRef.Name, metav1.GetOptions{})
}
return dynCli.Resource(gvr).Get(ctx, objRef.Name, metav1.GetOptions{})
}

// resolveJsonpath resolves jsonpath fields and replaces the occurrences with actual values
func resolveJsonpath(obj runtime.Object, condStr string) (string, error) {
resolvedCondStr := condStr
for s, match := range jsonpath.FindJsonpathArgs(condStr) {
transCond := fmt.Sprintf("{%s}", strings.TrimSpace(match))
value, err := jsonpath.ResolveJsonpathToString(obj, transCond)
if err != nil {
return "", err
}
resolvedCondStr = strings.ReplaceAll(resolvedCondStr, s, value)
}
return resolvedCondStr, nil
}
192 changes: 192 additions & 0 deletions pkg/function/wait_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2021 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package function

import (
"context"

. "gopkg.in/check.v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/testutil"
)

var _ = Suite(&WaitSuite{})

type WaitSuite struct {
cli kubernetes.Interface
namespace string
deploy string
}

func (s *WaitSuite) SetUpSuite(c *C) {
cli, err := kube.NewClient()
c.Assert(err, IsNil)
s.cli = cli

ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "kanisterwaittest-",
},
}
cns, err := s.cli.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{})
c.Assert(err, IsNil)

d, err := s.cli.AppsV1().Deployments(cns.Name).Create(context.TODO(), testutil.NewTestDeployment(int32(1)), metav1.CreateOptions{})
c.Assert(err, IsNil)
s.namespace = cns.Name
s.deploy = d.Name
}

func (s *WaitSuite) TearDownSuite(c *C) {
if s.namespace != "" {
_ = s.cli.CoreV1().Namespaces().Delete(context.TODO(), s.namespace, metav1.DeleteOptions{})
}
}

func waitNsPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "waitNsReady",
Func: WaitFuncName,
Args: map[string]interface{}{
WaitTimeoutArg: "1m",
WaitConditionsArg: map[string]interface{}{
"anyOf": []interface{}{
map[string]interface{}{
"condition": `{{ if (eq "{ $.status.phase }" "Invalid")}}true{{ else }}false{{ end }}`,
"objectReference": map[string]interface{}{
"apiVersion": "v1",
"resource": "namespaces",
"name": namespace,
},
},
map[string]interface{}{
"condition": `{{ if (eq "{ $.status.phase }" "Active")}}true{{ else }}false{{ end }}`,
"objectReference": map[string]interface{}{
"apiVersion": "v1",
"resource": "namespaces",
"name": namespace,
},
},
},
},
},
}
}

func waitNsTimeoutPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "waitNsReady",
Func: WaitFuncName,
Args: map[string]interface{}{
WaitTimeoutArg: "10s",
WaitConditionsArg: map[string]interface{}{
"allOf": []interface{}{
map[string]interface{}{
"condition": `{{ if (eq "{$.status.phase}" "Inactive")}}true{{ else }}false{{ end }}`,
"objectReference": map[string]interface{}{
"apiVersion": "v1",
"resource": "namespaces",
"name": namespace,
},
},
map[string]interface{}{
"condition": `{{ if (eq "{$.status.phase}" "Invalid")}}true{{ else }}false{{ end }}`,
"objectReference": map[string]interface{}{
"apiVersion": "v1",
"resource": "namespaces",
"name": namespace,
},
},
},
},
},
}
}

func waitDeployPhase(namespace, deploy string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "waitDeployReady",
Func: WaitFuncName,
Args: map[string]interface{}{
WaitTimeoutArg: "5m",
WaitConditionsArg: map[string]interface{}{
"anyOf": []interface{}{
map[string]interface{}{
"condition": `{{ if and (eq {$.spec.replicas} {$.status.availableReplicas} )
(and (eq "{$.status.conditions[?(@.type == "Available")].type}" "Available")
(eq "{$.status.conditions[?(@.type == "Available")].status}" "True"))}}
true
{{ else }}
false
{{ end }}`,
"objectReference": map[string]interface{}{
"apiVersion": "v1",
"group": "apps",
"resource": "deployments",
"name": deploy,
"namespace": namespace,
},
},
},
},
},
}
}

func newWaitBlueprint(phases ...crv1alpha1.BlueprintPhase) *crv1alpha1.Blueprint {
return &crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"test": {
Phases: phases,
},
},
}
}

func (s *WaitSuite) TestWait(c *C) {
tp := param.TemplateParams{}
action := "test"
for _, tc := range []struct {
bp *crv1alpha1.Blueprint
checker Checker
}{
{
bp: newWaitBlueprint(waitDeployPhase(s.namespace, s.deploy)),
checker: IsNil,
},
{
bp: newWaitBlueprint(waitNsPhase(s.namespace)),
checker: IsNil,
},
{
bp: newWaitBlueprint(waitNsTimeoutPhase(s.namespace)),
checker: NotNil,
},
} {
phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp)
c.Assert(err, IsNil)
for _, p := range phases {
_, err := p.Exec(context.TODO(), *tc.bp, action, tp)
c.Assert(err, tc.checker)
}
}
}
6 changes: 6 additions & 0 deletions pkg/param/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/jsonpath"
)

const (
Expand Down Expand Up @@ -119,6 +120,11 @@ func RenderArtifacts(arts map[string]crv1alpha1.Artifact, tp TemplateParams) (ma
}

func renderStringArg(arg string, tp TemplateParams) (string, error) {
// Skip render if contains jsonpath arg
matched := jsonpath.FindJsonpathArgs(arg)
if len(matched) != 0 {
return arg, nil
}
t, err := template.New("config").Option("missingkey=error").Funcs(sprig.TxtFuncMap()).Parse(arg)
if err != nil {
return "", errors.WithStack(err)
Expand Down

0 comments on commit 049fcaf

Please sign in to comment.