Skip to content

Commit

Permalink
Add ExecutionProgress() method to function interface to report exec p…
Browse files Browse the repository at this point in the history
…rogress (#2023)

* Add phase level progress fields in actionset object

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Change weight based progress calc with func implementation

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Refactor and add unit tests for phase progress

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Update generated deepcopy methods

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Fix import cycle issue in test

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Update pkg/apis/cr/v1alpha1/types.go

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Update pkg/apis/cr/v1alpha1/types.go

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Update pkg/apis/cr/v1alpha1/types.go

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Update pkg/apis/cr/v1alpha1/types.go

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Use correct action index while updating action phase state

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Implement ExecutionProgress method on Kanister functions (#2117)

* Implement ExecutionProgress method on Kanister functions

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Set LastTransitionTime in ExecutionProgress return

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Update pkg/function/backup_data.go

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

---------

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>
Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Implement ExecProgress on test function

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Remove unused mockPhase in test

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Remove unnecessary whitespace in updateActionProgress func

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

---------

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>
Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>
Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Oct 3, 2023
1 parent b7b6a78 commit f9d0fdc
Show file tree
Hide file tree
Showing 49 changed files with 1,178 additions and 642 deletions.
25 changes: 24 additions & 1 deletion pkg/apis/cr/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ type ActionStatus struct {
DeferPhase Phase `json:"deferPhase,omitempty"`
}

// ActionProgress provides information on the progress of an action.
// ActionProgress provides information on the combined progress
// of all the phases in the action.
type ActionProgress struct {
// RunningPhase represents which phase of the action is being run
RunningPhase string `json:"runningPhase,omitempty"`
Expand Down Expand Up @@ -187,6 +188,28 @@ type Phase struct {
State State `json:"state"`
// Output is the map of output artifacts produced by the Blueprint phase.
Output map[string]interface{} `json:"output,omitempty"`
// Progress represents the phase execution progress.
Progress PhaseProgress `json:"progress,omitempty"`
}

// PhaseProgress represents the execution state of the phase.
type PhaseProgress struct {
// ProgressPercent represents the execution progress in percentage.
ProgressPercent string `json:"progressPercent,omitempty"`
// SizeUploadedB represents the size of data uploaded in Bytes at a given time during phase execution.
// This field will be empty for phases which do not involve data movement.
SizeUploadedB int64 `json:"sizeUploadedB,omitempty"`
// EstimatedUploadSizeB represents the total estimated size of data in Bytes
// that will be uploaded during the phase execution.
// This field will be empty for phases which do not involve data movement.
EstimatedUploadSizeB int64 `json:"estinatedUploadSizeB,omitempty"`
// EstimatedTimeSeconds is the estimated time required in seconds to upload the
// remaining data estimated with EstimatedUploadSizeB.
// This field will be empty for phases which do not involve data movement.
EstimatedTimeSeconds int64 `json:"estinatedTimeSeconds,omitempty"`
// LastTransitionTime represents the last date time when the progress status
// was received.
LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty"`
}

// k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/cr/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/blueprint/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,9 @@ func blueprint() *crv1alpha1.Blueprint {
}
}

type nonDefaultVersionFunc struct{}
type nonDefaultVersionFunc struct {
progressPercent string
}

func (nd *nonDefaultVersionFunc) Name() string {
return "NonDefaultVersionFunc"
Expand All @@ -453,9 +455,15 @@ func (nd *nonDefaultVersionFunc) Arguments() []string {
}

func (nd *nonDefaultVersionFunc) Exec(context.Context, param.TemplateParams, map[string]interface{}) (map[string]interface{}, error) {
nd.progressPercent = "0"
defer func() { nd.progressPercent = "100" }()
return nil, nil
}

func (nd *nonDefaultVersionFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
return crv1alpha1.PhaseProgress{ProgressPercent: nd.progressPercent}, nil
}

var _ kanister.Func = (*nonDefaultVersionFunc)(nil)

func init() {
Expand Down
40 changes: 29 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,6 @@ func (c *Controller) handleActionSet(ctx context.Context, t *tomb.Tomb, as *crv1
}
}

go func() {
// progress update is computed on a best-effort basis.
// if it exits with error, we will just log it.
if err := progress.TrackActionsProgress(ctx, c.crClient, as.GetName(), as.GetNamespace()); err != nil {
log.Error().WithError(err)
}
}()

for i, a := range as.Status.Actions {
var bp *crv1alpha1.Blueprint
if bp, err = c.crClient.CrV1alpha1().Blueprints(as.GetNamespace()).Get(ctx, a.Blueprint, v1.GetOptions{}); err != nil {
Expand Down Expand Up @@ -471,7 +463,7 @@ func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1
defer func() {
var deferErr error
if deferPhase != nil {
c.updateActionSetRunningPhase(ctx, as, deferPhase.Name())
c.updateActionSetRunningPhase(ctx, aIDX, as, deferPhase.Name())
deferErr = c.executeDeferPhase(ctx, deferPhase, tp, bp, action.Name, aIDX, as)
}
// render artifacts only if all the phases are run successfully
Expand All @@ -490,11 +482,22 @@ func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1
var output map[string]interface{}
var msg string
if err == nil {
c.updateActionSetRunningPhase(ctx, as, p.Name())
c.updateActionSetRunningPhase(ctx, aIDX, as, p.Name())
progressTrackCtx, doneProgressTrack := context.WithCancel(ctx)
defer doneProgressTrack()
go func() {
// progress update is computed on a best-effort basis.
// if it exits with error, we will just log it.
if err := progress.UpdateActionSetsProgress(progressTrackCtx, aIDX, c.crClient, as.GetName(), as.GetNamespace(), p); err != nil {
log.Error().WithError(err)
}
}()
output, err = p.Exec(ctx, *bp, action.Name, *tp)
doneProgressTrack()
} else {
msg = fmt.Sprintf("Failed to init phase params: %#v:", as.Status.Actions[aIDX].Phases[i])
}

var rf func(*crv1alpha1.ActionSet) error
if err != nil {
coreErr = err
Expand All @@ -511,8 +514,17 @@ func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1
coreErr = nil
rf = func(ras *crv1alpha1.ActionSet) error {
ras.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateComplete
pp, err := p.Progress()
if err != nil {
log.Error().WithError(err)
return nil
}
ras.Status.Actions[aIDX].Phases[i].Progress = pp
// this updates the phase output in the actionset status
ras.Status.Actions[aIDX].Phases[i].Output = output
if err := progress.SetActionSetPercentCompleted(ras); err != nil {
log.Error().WithError(err)
}
return nil
}
}
Expand Down Expand Up @@ -545,9 +557,15 @@ func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1
// updateActionSetRunningPhase updates the actionset's `status.Progress.RunningPhase` with the phase name
// that is being run currently. It doesn't fail if there was a problem updating the actionset. It just logs
// the failure.
func (c *Controller) updateActionSetRunningPhase(ctx context.Context, as *crv1alpha1.ActionSet, phase string) {
func (c *Controller) updateActionSetRunningPhase(ctx context.Context, aIDX int, as *crv1alpha1.ActionSet, phase string) {
err := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), as.Namespace, as.Name, func(as *crv1alpha1.ActionSet) error {
as.Status.Progress.RunningPhase = phase
// Iterate through all the phases and set current phase state to running
for i := 0; i < len(as.Status.Actions[aIDX].Phases); i++ {
if as.Status.Actions[aIDX].Phases[i].Name == phase {
as.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateRunning
}
}
return nil
})
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions pkg/customresource/actionset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ spec:
type: object
state:
type: string
progress:
properties:
progressPercent:
type: string
sizeUploadedB:
type: integer
estimatedTimeSeconds:
type: integer
estimatedUploadSizeB:
type: integer
lastTransitionTime:
type: string
format: date-time
type: object
type: object
phases:
description: Phases are sub-actions an are executed sequentially.
Expand All @@ -279,6 +293,20 @@ spec:
type: object
state:
type: string
progress:
properties:
progressPercent:
type: string
sizeUploadedB:
type: integer
estimatedTimeSeconds:
type: integer
estimatedUploadSizeB:
type: integer
lastTransitionTime:
type: string
format: date-time
type: object
type: object
type: array
type: object
Expand Down
22 changes: 20 additions & 2 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ package function

import (
"context"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/consts"
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/log"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/progress"
"github.com/kanisterio/kanister/pkg/restic"
)

Expand Down Expand Up @@ -64,13 +68,19 @@ func init() {

var _ kanister.Func = (*backupDataFunc)(nil)

type backupDataFunc struct{}
type backupDataFunc struct {
progressPercent string
}

func (*backupDataFunc) Name() string {
return BackupDataFuncName
}

func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
func (b *backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
// Set progress percent
b.progressPercent = progress.StartedPercent
defer func() { b.progressPercent = progress.CompletedPercent }()

var namespace, pod, container, includePath, backupArtifactPrefix, encryptionKey string
var err error
if err = Arg(args, BackupDataNamespaceArg, &namespace); err != nil {
Expand Down Expand Up @@ -188,3 +198,11 @@ func backupData(ctx context.Context, cli kubernetes.Interface, namespace, pod, c
phySize: phySize,
}, nil
}

func (b *backupDataFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
metav1Time := metav1.NewTime(time.Now())
return crv1alpha1.PhaseProgress{
ProgressPercent: b.progressPercent,
LastTransitionTime: &metav1Time,
}, nil
}
22 changes: 20 additions & 2 deletions pkg/function/backup_data_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/consts"
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/progress"
"github.com/kanisterio/kanister/pkg/restic"
)

Expand Down Expand Up @@ -62,13 +66,19 @@ func init() {

var _ kanister.Func = (*backupDataAllFunc)(nil)

type backupDataAllFunc struct{}
type backupDataAllFunc struct {
progressPercent string
}

func (*backupDataAllFunc) Name() string {
return BackupDataAllFuncName
}

func (*backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
func (b *backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
// Set progress percent
b.progressPercent = progress.StartedPercent
defer func() { b.progressPercent = progress.CompletedPercent }()

var namespace, pods, container, includePath, backupArtifactPrefix, encryptionKey string
var err error
if err = Arg(args, BackupDataAllNamespaceArg, &namespace); err != nil {
Expand Down Expand Up @@ -172,3 +182,11 @@ func backupDataAll(ctx context.Context, cli kubernetes.Interface, namespace stri
FunctionOutputVersion: kanister.DefaultVersion,
}, nil
}

func (b *backupDataAllFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
metav1Time := metav1.NewTime(time.Now())
return crv1alpha1.PhaseProgress{
ProgressPercent: b.progressPercent,
LastTransitionTime: &metav1Time,
}, nil
}
21 changes: 19 additions & 2 deletions pkg/function/backup_data_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package function
import (
"bytes"
"context"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/progress"
"github.com/kanisterio/kanister/pkg/restic"
)

Expand Down Expand Up @@ -56,7 +59,9 @@ func init() {

var _ kanister.Func = (*BackupDataStatsFunc)(nil)

type BackupDataStatsFunc struct{}
type BackupDataStatsFunc struct {
progressPercent string
}

func (*BackupDataStatsFunc) Name() string {
return BackupDataStatsFuncName
Expand Down Expand Up @@ -130,7 +135,11 @@ func backupDataStatsPodFunc(
}
}

func (*BackupDataStatsFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
func (b *BackupDataStatsFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
// Set progress percent
b.progressPercent = progress.StartedPercent
defer func() { b.progressPercent = progress.CompletedPercent }()

var namespace, backupArtifactPrefix, backupID, mode, encryptionKey string
var err error
if err = Arg(args, BackupDataStatsNamespaceArg, &namespace); err != nil {
Expand Down Expand Up @@ -183,3 +192,11 @@ func (*BackupDataStatsFunc) Arguments() []string {
BackupDataStatsEncryptionKeyArg,
}
}

func (b *BackupDataStatsFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
metav1Time := metav1.NewTime(time.Now())
return crv1alpha1.PhaseProgress{
ProgressPercent: b.progressPercent,
LastTransitionTime: &metav1Time,
}, nil
}
Loading

0 comments on commit f9d0fdc

Please sign in to comment.