Skip to content

Commit

Permalink
Refactor functions with pod runner (#2308)
Browse files Browse the repository at this point in the history
* Simplify `DeleteData` - validate arguments before pod creation.

* Minor refactoring of `WriteCredsToPod`

Rename to `MaybeWriteProfileCredentials`

* In `CopyVolumeData` function, get rid of redundant `namespace` argument.

It could be taken from `pod`

* Refactor `BackupDataStats` to use `PodRunner.RunEx`

* Refactor `CheckRepository` to use `PodRunner.RunEx`

* Refactor `DeleteData` to use `PodRunner.RunEx`

* Refactor `DeleteDataUsingKopiaServer` to use `PodRunner.RunEx`

* Refactor `DescribeBackups` to use `PodRunner.RunEx`

* Refactor `PrepareData` to use `PodRunner.RunEx`

* Refactor `RestoreData` to use `PodRunner.RunEx`

* Refactor `RestoreDataUsingKopiaServer` to use `PodRunner.RunEx`

* Apply suggestions from code review - improve formatting / readability

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Fix bug with forgotten old way pod readiness check

---------

Co-authored-by: chaitalisg <chaitali@kasten.io>
Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
4 people committed Sep 20, 2023
1 parent bf03057 commit bef2520
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 126 deletions.
43 changes: 31 additions & 12 deletions pkg/function/backup_data_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package function

import (
"bytes"
"context"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -71,33 +71,52 @@ func backupDataStats(ctx context.Context, cli kubernetes.Interface, tp param.Tem
PodOverride: podOverride,
}
pr := kube.NewPodRunner(cli, options)
podFunc := backupDataStatsPodFunc(cli, tp, namespace, encryptionKey, backupArtifactPrefix, backupID, mode)
return pr.Run(ctx, podFunc)
podFunc := backupDataStatsPodFunc(tp, encryptionKey, backupArtifactPrefix, backupID, mode)
return pr.RunEx(ctx, podFunc)
}

func backupDataStatsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, backupID, mode string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
func backupDataStatsPodFunc(
tp param.TemplateParams,
encryptionKey,
backupArtifactPrefix,
backupID,
mode string,
) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
pod := pc.Pod()

// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
if err != nil {
return nil, err
}
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)

// Parent context could already be dead, so removing file within new context
defer remover.Remove(context.Background()) //nolint:errcheck

cmd, err := restic.StatsCommandByID(tp.Profile, backupArtifactPrefix, backupID, mode, encryptionKey)
if err != nil {
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)

commandExecutor, err := pc.GetCommandExecutor()
if err != nil {
return nil, errors.Wrap(err, "Unable to get pod command executor")
}

var stdout, stderr bytes.Buffer
err = commandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to get backup stats")
}
// Get File Count and Size from Stats
mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout)
mode, fc, size := restic.SnapshotStatsFromStatsLog(stdout.String())
if fc == "" || size == "" {
return nil, errors.New("Failed to parse snapshot stats from logs")
}
Expand Down
36 changes: 27 additions & 9 deletions pkg/function/checkRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"strings"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -55,22 +54,41 @@ func CheckRepository(ctx context.Context, cli kubernetes.Interface, tp param.Tem
PodOverride: podOverride,
}
pr := kube.NewPodRunner(cli, options)
podFunc := CheckRepositoryPodFunc(cli, tp, namespace, encryptionKey, targetPaths)
return pr.Run(ctx, podFunc)
podFunc := CheckRepositoryPodFunc(cli, tp, encryptionKey, targetPaths)
return pr.RunEx(ctx, podFunc)
}

func CheckRepositoryPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, targetPath string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
func CheckRepositoryPodFunc(
cli kubernetes.Interface,
tp param.TemplateParams,
encryptionKey,
targetPath string,
) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
pod := pc.Pod()

// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
if err != nil {
return nil, err
}
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
err = restic.CheckIfRepoIsReachable(tp.Profile, targetPath, encryptionKey, cli, namespace, pod.Name, pod.Spec.Containers[0].Name)

// Parent context could already be dead, so removing file within new context
defer remover.Remove(context.Background()) //nolint:errcheck

err = restic.CheckIfRepoIsReachable(
tp.Profile,
targetPath,
encryptionKey,
cli,
pod.Namespace,
pod.Name,
pod.Spec.Containers[0].Name,
)
switch {
case err == nil:
break
Expand Down
31 changes: 20 additions & 11 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
package function

import (
"bytes"
"context"
"fmt"

"github.com/pkg/errors"
"go.uber.org/zap/buffer"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -80,22 +80,24 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp
PodOverride: podOverride,
}
pr := kube.NewPodRunner(cli, options)
podFunc := copyVolumeDataPodFunc(cli, tp, namespace, mountPoint, targetPath, encryptionKey)
podFunc := copyVolumeDataPodFunc(cli, tp, mountPoint, targetPath, encryptionKey)
return pr.RunEx(ctx, podFunc)
}

func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
func copyVolumeDataPodFunc(
cli kubernetes.Interface,
tp param.TemplateParams,
mountPoint,
targetPath,
encryptionKey string,
) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
// Wait for pod to reach running state
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pc.PodName())
}
pw1, err := pc.GetFileWriter()
if err != nil {
return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName())
}

remover, err := WriteCredsToPod(ctx, pw1, tp.Profile)
remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
if err != nil {
return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName())
}
Expand All @@ -105,7 +107,15 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na

pod := pc.Pod()
// Get restic repository
if err := restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
if err := restic.GetOrCreateRepository(
cli,
pod.Namespace,
pod.Name,
pod.Spec.Containers[0].Name,
targetPath,
encryptionKey,
tp.Profile,
); err != nil {
return nil, err
}
// Copy data to object store
Expand All @@ -118,8 +128,7 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na
if err != nil {
return nil, err
}
var stdout buffer.Buffer
var stderr buffer.Buffer
var stdout, stderr bytes.Buffer
err = ex.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
Expand Down
92 changes: 69 additions & 23 deletions pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package function

import (
"bytes"
"context"
"fmt"
"strings"
Expand Down Expand Up @@ -66,7 +67,23 @@ func (*deleteDataFunc) Name() string {
return DeleteDataFuncName
}

func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string, jobPrefix string, podOverride crv1alpha1.JSONMap) (map[string]interface{}, error) {
func deleteData(
ctx context.Context,
cli kubernetes.Interface,
tp param.TemplateParams,
reclaimSpace bool,
namespace,
encryptionKey string,
targetPaths,
deleteTags,
deleteIdentifiers []string,
jobPrefix string,
podOverride crv1alpha1.JSONMap,
) (map[string]interface{}, error) {
if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) {
return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg)
}

options := &kube.PodOptions{
Namespace: namespace,
GenerateName: jobPrefix,
Expand All @@ -75,37 +92,55 @@ func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.Template
PodOverride: podOverride,
}
pr := kube.NewPodRunner(cli, options)
podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, namespace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers)
return pr.Run(ctx, podFunc)
podFunc := deleteDataPodFunc(tp, reclaimSpace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers)
return pr.RunEx(ctx, podFunc)
}

//nolint:gocognit
func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
func deleteDataPodFunc(
tp param.TemplateParams,
reclaimSpace bool,
encryptionKey string,
targetPaths,
deleteTags,
deleteIdentifiers []string,
) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
pod := pc.Pod()

// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) {
return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg)

remover, err := MaybeWriteProfileCredentials(ctx, pc, tp.Profile)
if err != nil {
return nil, err
}
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)

// Parent context could already be dead, so removing file within new context
defer remover.Remove(context.Background()) //nolint:errcheck

// Get command executor
podCommandExecutor, err := pc.GetCommandExecutor()
if err != nil {
return nil, err
}
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)

for i, deleteTag := range deleteTags {
cmd, err := restic.SnapshotsCommandByTag(tp.Profile, targetPaths[i], deleteTag, encryptionKey)
if err != nil {
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)

var stdout, stderr bytes.Buffer
err = podCommandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag)
}
deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout)
deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag)
}
Expand All @@ -117,14 +152,16 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai
if err != nil {
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)

var stdout, stderr bytes.Buffer
err = podCommandExecutor.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data")
}
if reclaimSpace {
spaceFreedStr, err := pruneData(cli, tp, pod, namespace, encryptionKey, targetPaths[i])
spaceFreedStr, err := pruneData(tp, pod, podCommandExecutor, encryptionKey, targetPaths[i])
if err != nil {
return nil, errors.Wrapf(err, "Error executing prune command")
}
Expand All @@ -138,15 +175,24 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai
}
}

func pruneData(cli kubernetes.Interface, tp param.TemplateParams, pod *v1.Pod, namespace, encryptionKey, targetPath string) (string, error) {
func pruneData(
tp param.TemplateParams,
pod *v1.Pod,
podCommandExecutor kube.PodCommandExecutor,
encryptionKey,
targetPath string,
) (string, error) {
cmd, err := restic.PruneCommand(tp.Profile, targetPath, encryptionKey)
if err != nil {
return "", err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
spaceFreed := restic.SpaceFreedFromPruneLog(stdout)

var stdout, stderr bytes.Buffer
err = podCommandExecutor.Exec(context.Background(), cmd, nil, &stdout, &stderr)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr.String())

spaceFreed := restic.SpaceFreedFromPruneLog(stdout.String())
return spaceFreed, errors.Wrapf(err, "Failed to prune data after forget")
}

Expand Down
Loading

0 comments on commit bef2520

Please sign in to comment.