Skip to content

Commit

Permalink
ActionSet handling perf tweaks (#1650)
Browse files Browse the repository at this point in the history
* ActionSet handling perf tweaks

* Fixed test

* Fixed unit tests

* Fixed unit tests

* Addressed review comments

* Addressed review comments

* Addressed review comments

* Addressed review comments

* Addressed review comments
  • Loading branch information
ed-shilo committed Oct 5, 2022
1 parent 3d8e59a commit c2a8e32
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 52 deletions.
117 changes: 73 additions & 44 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"strings"
"sync"

customresource "github.com/kanisterio/kanister/pkg/customresource"
"github.com/kanisterio/kanister/pkg/customresource"
"github.com/pkg/errors"
"gopkg.in/tomb.v2"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -146,9 +146,14 @@ func (c *Controller) onAdd(obj interface{}) {
o = o.DeepCopyObject()
switch v := o.(type) {
case *crv1alpha1.ActionSet:
if err := c.onAddActionSet(v); err != nil {
log.Error().WithError(err).Print("Callback onAddActionSet() failed")
}
t, ctx := c.LoadOrStoreTomb(context.Background(), v.Name)
t.Go(func() error {
if err := c.onAddActionSet(ctx, t, v); err != nil {
log.Error().WithError(err).Print("Callback onAddActionSet() failed")
}
return nil
})

case *crv1alpha1.Blueprint:
c.onAddBlueprint(v)
default:
Expand Down Expand Up @@ -191,25 +196,43 @@ func (c *Controller) onDelete(obj interface{}) {
}
}

func (c *Controller) onAddActionSet(as *crv1alpha1.ActionSet) error {
as, err := c.crClient.CrV1alpha1().ActionSets(as.GetNamespace()).Get(context.TODO(), as.GetName(), v1.GetOptions{})
func (c *Controller) createBpCache(ctx context.Context, ns string) (map[string]*crv1alpha1.Blueprint, error) {
bps, err := c.crClient.CrV1alpha1().Blueprints(ns).List(ctx, v1.ListOptions{})
if err != nil {
return nil, err
}

bpMap := make(map[string]*crv1alpha1.Blueprint)
for _, bp := range bps.Items {
bpMap[bp.Name] = bp
}

return bpMap, nil
}

func (c *Controller) onAddActionSet(ctx context.Context, t *tomb.Tomb, as *crv1alpha1.ActionSet) error {
as, err := c.crClient.CrV1alpha1().ActionSets(as.GetNamespace()).Get(ctx, as.GetName(), v1.GetOptions{})
if err != nil {
return errors.WithStack(err)
}
if err := validate.ActionSet(as); err != nil {
return err
}
bps, err := c.createBpCache(ctx, as.GetNamespace())
if err != nil {
return errors.WithStack(err)
}
if as.Status == nil {
c.initActionSetStatus(as)
c.initActionSetStatus(ctx, as, bps)
}
as, err = c.crClient.CrV1alpha1().ActionSets(as.GetNamespace()).Get(context.TODO(), as.GetName(), v1.GetOptions{})
as, err = c.crClient.CrV1alpha1().ActionSets(as.GetNamespace()).Get(ctx, as.GetName(), v1.GetOptions{})
if err != nil {
return errors.WithStack(err)
}
if err := validate.ActionSet(as); err != nil {
return err
}
return c.handleActionSet(as)
return c.handleActionSet(ctx, t, as, bps)
}

func (c *Controller) onAddBlueprint(bp *crv1alpha1.Blueprint) {
Expand Down Expand Up @@ -283,8 +306,7 @@ func (c *Controller) onDeleteBlueprint(bp *crv1alpha1.Blueprint) {
log.Print("Deleted Blueprint ", field.M{"BlueprintName": bp.GetName()})
}

func (c *Controller) initActionSetStatus(as *crv1alpha1.ActionSet) {
ctx := context.Background()
func (c *Controller) initActionSetStatus(ctx context.Context, as *crv1alpha1.ActionSet, bps map[string]*crv1alpha1.Blueprint) {
ctx = field.Context(ctx, consts.ActionsetNameKey, as.GetName())
if as.Spec == nil {
log.Error().WithContext(ctx).Print("Cannot initialize an ActionSet without a spec.")
Expand All @@ -294,10 +316,20 @@ func (c *Controller) initActionSetStatus(as *crv1alpha1.ActionSet) {
actions := make([]crv1alpha1.ActionStatus, 0, len(as.Spec.Actions))
var err error
for _, a := range as.Spec.Actions {
var actionStatus *crv1alpha1.ActionStatus
actionStatus, err = c.initialActionStatus(as.GetNamespace(), a)
if a.Blueprint == "" {
// TODO: If no blueprint is specified, we should consider a default.
err = errors.New("Blueprint is not specified for action")
c.logAndErrorEvent(ctx, "Could not get blueprint:", "Blueprint not specified", err, as)
break
}
bp, ok := bps[a.Blueprint]
if !ok {
err = errors.Errorf("Failed to retrieve blueprint %s", a.Blueprint)
c.logAndErrorEvent(ctx, "Could not get blueprint:", "Blueprint not found", err, as)
break
}
actionStatus, err := c.initialActionStatus(a, bp)
if err != nil {
bp, _ := c.crClient.CrV1alpha1().Blueprints(as.GetNamespace()).Get(ctx, a.Blueprint, v1.GetOptions{})
reason := fmt.Sprintf("ActionSetFailed Action: %s", a.Name)
c.logAndErrorEvent(ctx, "Could not get initial action:", reason, err, as, bp)
break
Expand All @@ -318,15 +350,7 @@ func (c *Controller) initActionSetStatus(as *crv1alpha1.ActionSet) {
}
}

func (c *Controller) initialActionStatus(namespace string, a crv1alpha1.ActionSpec) (*crv1alpha1.ActionStatus, error) {
if a.Blueprint == "" {
// TODO: If no blueprint is specified, we should consider a default.
return nil, errors.New("Blueprint not specified")
}
bp, err := c.crClient.CrV1alpha1().Blueprints(namespace).Get(context.TODO(), a.Blueprint, v1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "Failed to query blueprint")
}
func (c *Controller) initialActionStatus(a crv1alpha1.ActionSpec, bp *crv1alpha1.Blueprint) (*crv1alpha1.ActionStatus, error) {
bpa, ok := bp.Actions[a.Name]
if !ok {
return nil, errors.Errorf("Action %s for object kind %s not found in blueprint %s", a.Name, a.Object.Kind, a.Blueprint)
Expand Down Expand Up @@ -357,18 +381,17 @@ func (c *Controller) initialActionStatus(namespace string, a crv1alpha1.ActionSp
return actionStatus, nil
}

func (c *Controller) handleActionSet(as *crv1alpha1.ActionSet) (err error) {
func (c *Controller) handleActionSet(ctx context.Context, t *tomb.Tomb, as *crv1alpha1.ActionSet, bps map[string]*crv1alpha1.Blueprint) (err error) {
if as.Status == nil {
return errors.New("ActionSet was not initialized")
}
if as.Status.State != crv1alpha1.StatePending {
return nil
}
as.Status.State = crv1alpha1.StateRunning
if as, err = c.crClient.CrV1alpha1().ActionSets(as.GetNamespace()).Update(context.TODO(), as, v1.UpdateOptions{}); err != nil {
if as, err = c.crClient.CrV1alpha1().ActionSets(as.GetNamespace()).Update(ctx, as, v1.UpdateOptions{}); err != nil {
return errors.WithStack(err)
}
ctx := context.Background()
ctx = field.Context(ctx, consts.ActionsetNameKey, as.GetName())
// adding labels with prefix "kanister.io/" in the context as field for better logging
for key, value := range as.GetLabels() {
Expand All @@ -386,11 +409,15 @@ func (c *Controller) handleActionSet(as *crv1alpha1.ActionSet) (err error) {
}()

for i := range as.Status.Actions {
if err = c.runAction(ctx, as, i); err != nil {
bp, ok := bps[as.Spec.Actions[i].Blueprint]
if !ok {
err = errors.Errorf("Failed to retrieve blueprint %s", as.Spec.Actions[i].Blueprint)
c.logAndErrorEvent(ctx, "Could not get blueprint:", "Blueprint not found", err, as)
return err
}
if err = c.runAction(ctx, t, as, i, bp); err != nil {
// If runAction returns an error, it is a failure in the synchronous
// part of running the action.
bpName := as.Spec.Actions[i].Blueprint
bp, _ := c.crClient.CrV1alpha1().Blueprints(as.GetNamespace()).Get(ctx, bpName, v1.GetOptions{})
reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Status.Actions[i].Name)
c.logAndErrorEvent(ctx, fmt.Sprintf("Failed to launch Action %s:", as.GetName()), reason, err, as, bp)
as.Status.State = crv1alpha1.StateFailed
Expand All @@ -406,15 +433,21 @@ func (c *Controller) handleActionSet(as *crv1alpha1.ActionSet) (err error) {
return nil
}

func (c *Controller) LoadOrStoreTomb(ctx context.Context, asName string) (*tomb.Tomb, context.Context) {
var t *tomb.Tomb
if v, ok := c.actionSetTombMap.Load(asName); ok {
t = v.(*tomb.Tomb)
return t, ctx
}
t, ctx = tomb.WithContext(ctx)
c.actionSetTombMap.Store(asName, t)
return t, ctx
}

// nolint:gocognit
func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aIDX int) error {
func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1.ActionSet, aIDX int, bp *crv1alpha1.Blueprint) error {
action := as.Spec.Actions[aIDX]
c.logAndSuccessEvent(ctx, fmt.Sprintf("Executing action %s", action.Name), "Started Action", as)
bpName := as.Spec.Actions[aIDX].Blueprint
bp, err := c.crClient.CrV1alpha1().Blueprints(as.GetNamespace()).Get(ctx, bpName, v1.GetOptions{})
if err != nil {
return errors.WithStack(err)
}
tp, err := param.New(ctx, c.clientset, c.dynClient, c.crClient, c.osClient, action)
if err != nil {
return err
Expand All @@ -431,10 +464,6 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI
return err
}

ns, name := as.GetNamespace(), as.GetName()
var t *tomb.Tomb
t, ctx = tomb.WithContext(ctx)
c.actionSetTombMap.Store(as.Name, t)
ctx = field.Context(ctx, consts.ActionsetNameKey, as.GetName())
t.Go(func() error {
var coreErr error
Expand All @@ -445,7 +474,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI
}
// render artifacts only if all the phases are run successfully
if deferErr == nil && coreErr == nil {
c.renderActionsetArtifacts(ctx, as, aIDX, ns, name, action.Name, bp, tp, coreErr, deferErr)
c.renderActionsetArtifacts(ctx, as, aIDX, as.Namespace, as.Name, action.Name, bp, tp, coreErr, deferErr)
}
}()

Expand Down Expand Up @@ -481,7 +510,7 @@ func (c *Controller) runAction(ctx context.Context, as *crv1alpha1.ActionSet, aI
}
}

if rErr := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), ns, name, rf); rErr != nil {
if rErr := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), as.Namespace, as.Name, rf); rErr != nil {
reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name)
msg := fmt.Sprintf("Failed to update phase: %#v:", as.Status.Actions[aIDX].Phases[i])
c.logAndErrorEvent(ctx, msg, reason, rErr, as, bp)
Expand Down Expand Up @@ -524,7 +553,7 @@ func (c *Controller) executeDeferPhase(ctx context.Context,
ctx = field.Context(ctx, consts.PhaseNameKey, as.Status.Actions[aIDX].DeferPhase.Name)
c.logAndSuccessEvent(ctx, fmt.Sprintf("Executing deferPhase %s", as.Status.Actions[aIDX].DeferPhase.Name), "Started deferPhase", as)

output, err := deferPhase.Exec(context.Background(), *bp, actionName, *tp)
output, err := deferPhase.Exec(ctx, *bp, actionName, *tp)
var rf func(*crv1alpha1.ActionSet) error
if err != nil {
rf = func(as *crv1alpha1.ActionSet) error {
Expand All @@ -543,7 +572,7 @@ func (c *Controller) executeDeferPhase(ctx context.Context,
}
}
var msg string
if rErr := reconcile.ActionSet(context.Background(), c.crClient.CrV1alpha1(), actionsetNS, actionsetName, rf); rErr != nil {
if rErr := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), actionsetNS, actionsetName, rf); rErr != nil {
reason := fmt.Sprintf("ActionSetFailed Action: %s", as.Spec.Actions[aIDX].Name)
msg := fmt.Sprintf("Failed to update defer phase: %#v:", as.Status.Actions[aIDX].DeferPhase)
c.logAndErrorEvent(ctx, msg, reason, rErr, as, bp)
Expand All @@ -560,7 +589,7 @@ func (c *Controller) executeDeferPhase(ctx context.Context,
}

c.logAndSuccessEvent(ctx, fmt.Sprintf("Completed deferPhase %s", as.Status.Actions[aIDX].DeferPhase.Name), "Ended deferPhase", as)
param.UpdateDeferPhaseParams(context.Background(), tp, output)
param.UpdateDeferPhaseParams(ctx, tp, output)
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,6 @@ func (s *ControllerSuite) TestExecActionSet(c *C) {
as, err = s.crCli.ActionSets(s.namespace).Create(ctx, as, metav1.CreateOptions{})
c.Assert(err, IsNil, Commentf("Failed case: %s", tc.name))

err = s.waitOnActionSetState(c, as, crv1alpha1.StateRunning)
c.Assert(err, IsNil, Commentf("Failed case: %s", tc.name))

final := crv1alpha1.StateComplete
cancel := false
Loop:
Expand All @@ -561,6 +558,7 @@ func (s *ControllerSuite) TestExecActionSet(c *C) {
case testutil.OutputFuncName:
c.Assert(testutil.OutputFuncOut(), DeepEquals, map[string]interface{}{"key": "myValue"}, Commentf("Failed case: %s", tc.name))
case testutil.CancelFuncName:
testutil.CancelFuncStarted()
err = s.crCli.ActionSets(s.namespace).Delete(context.TODO(), as.GetName(), metav1.DeleteOptions{})
c.Assert(err, IsNil)
c.Assert(testutil.CancelFuncOut().Error(), DeepEquals, "context canceled")
Expand Down
17 changes: 12 additions & 5 deletions pkg/testutil/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ const (
)

var (
failFuncCh chan error
waitFuncCh chan struct{}
argFuncCh chan map[string]interface{}
outputFuncCh chan map[string]interface{}
cancelFuncCh chan error
failFuncCh chan error
waitFuncCh chan struct{}
argFuncCh chan map[string]interface{}
outputFuncCh chan map[string]interface{}
cancelFuncStartedCh chan struct{}
cancelFuncCh chan error
)

func failFunc(context.Context, param.TemplateParams, map[string]interface{}) (map[string]interface{}, error) {
Expand All @@ -63,6 +64,7 @@ func outputFunc(ctx context.Context, tp param.TemplateParams, args map[string]in
}

func cancelFunc(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
cancelFuncStartedCh <- struct{}{}
<-ctx.Done()
cancelFuncCh <- ctx.Err()
return nil, ctx.Err()
Expand All @@ -77,6 +79,7 @@ func init() {
waitFuncCh = make(chan struct{})
argFuncCh = make(chan map[string]interface{})
outputFuncCh = make(chan map[string]interface{})
cancelFuncStartedCh = make(chan struct{})
cancelFuncCh = make(chan error)
registerMockKanisterFunc(FailFuncName, failFunc)
registerMockKanisterFunc(WaitFuncName, waitFunc)
Expand Down Expand Up @@ -134,6 +137,10 @@ func (mf *mockKanisterFunc) Arguments() []string {
return []string{testBPArg}
}

func CancelFuncStarted() struct{} {
return <-cancelFuncStartedCh
}

func CancelFuncOut() error {
return <-cancelFuncCh
}
1 change: 1 addition & 0 deletions pkg/testutil/func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (s *FuncSuite) TestCancelFunc(c *C) {
c.Assert(strings.Contains(err.Error(), "context canceled"), Equals, true)
close(done)
}()
c.Assert(CancelFuncStarted(), NotNil)
select {
case <-done:
c.FailNow()
Expand Down

0 comments on commit c2a8e32

Please sign in to comment.