Skip to content

Commit

Permalink
DeleteDataAll kanister func to delete all snapshots at once using res…
Browse files Browse the repository at this point in the history
…tic (#6225)

* func to delete all snapshots at once

* Unit tests and error fix

* Remove unused params

* minor: set jobprefix

* Address review suggestion
  • Loading branch information
SupriyaKasten authored and Ilya Kislenko committed Aug 7, 2019
1 parent ec16da9 commit 8684b3e
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 23 deletions.
31 changes: 27 additions & 4 deletions pkg/function/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ func newRestoreDataAllBlueprint() *crv1alpha1.Blueprint {
Actions: map[string]*crv1alpha1.BlueprintAction{
"restore": &crv1alpha1.BlueprintAction{
Kind: param.StatefulSetKind,
SecretNames: []string{
"backupKey",
},
Phases: []crv1alpha1.BlueprintPhase{
crv1alpha1.BlueprintPhase{
Name: "testRestoreDataAll",
Expand All @@ -216,6 +213,28 @@ func newRestoreDataAllBlueprint() *crv1alpha1.Blueprint {
}
}

func newDeleteDataAllBlueprint() *crv1alpha1.Blueprint {
return &crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"delete": &crv1alpha1.BlueprintAction{
Kind: param.StatefulSetKind,
Phases: []crv1alpha1.BlueprintPhase{
crv1alpha1.BlueprintPhase{
Name: "testDelete",
Func: "DeleteDataAll",
Args: map[string]interface{}{
DeleteDataAllNamespaceArg: "{{ .StatefulSet.Namespace }}",
DeleteDataAllBackupArtifactPrefixArg: "{{ .Profile.Location.Bucket }}/{{ .Profile.Location.Prefix }}",
DeleteDataAllBackupInfo: fmt.Sprintf("{{ .Options.%s }}", BackupDataAllOutput),
DeleteDataAllReclaimSpace: true,
},
},
},
},
},
}
}

func (s *DataSuite) getTemplateParamsAndPVCName(c *C, replicas int32) (*param.TemplateParams, []string) {
ctx := context.Background()
ss, err := s.cli.AppsV1().StatefulSets(s.namespace).Create(testutil.NewTestStatefulSet(replicas))
Expand Down Expand Up @@ -317,7 +336,7 @@ func (s *DataSuite) TestBackupRestoreDataWithSnapshotID(c *C) {
}
}

func (s *DataSuite) TestBackupDataAll(c *C) {
func (s *DataSuite) TestBackupRestoreDeleteDataAll(c *C) {
var replicas int32
replicas = 2
tp, pvcs := s.getTemplateParamsAndPVCName(c, replicas)
Expand All @@ -343,6 +362,10 @@ func (s *DataSuite) TestBackupDataAll(c *C) {
bp = *newRestoreDataAllBlueprint()
_ = runAction(c, bp, "restore", tp)

// Test delete
bp = *newDeleteDataAllBlueprint()
_ = runAction(c, bp, "delete", tp)

}

func newCopyDataTestBlueprint() crv1alpha1.Blueprint {
Expand Down
50 changes: 31 additions & 19 deletions pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package function

import (
"context"
"strings"

"github.com/pkg/errors"
"k8s.io/api/core/v1"
Expand Down Expand Up @@ -42,67 +43,74 @@ func (*deleteDataFunc) Name() string {
return "DeleteData"
}

func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, targetPath, deleteTag, deleteIdentifier, encryptionKey string) (map[string]interface{}, error) {
func deleteData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string, jobPrefix string) (map[string]interface{}, error) {
options := &kube.PodOptions{
Namespace: namespace,
GenerateName: deleteDataJobPrefix,
GenerateName: jobPrefix,
Image: kanisterToolsImage,
Command: []string{"sh", "-c", "tail -f /dev/null"},
}
pr := kube.NewPodRunner(cli, options)
podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, namespace, targetPath, deleteTag, deleteIdentifier, encryptionKey)
podFunc := deleteDataPodFunc(cli, tp, reclaimSpace, namespace, encryptionKey, targetPaths, deleteTags, deleteIdentifiers)
return pr.Run(ctx, podFunc)
}

func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, targetPath, deleteTag, deleteIdentifier, encryptionKey string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclaimSpace bool, namespace, encryptionKey string, targetPaths, deleteTags, deleteIdentifiers []string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
if (deleteIdentifier != "") == (deleteTag != "") {
if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) {
return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
if deleteTag != "" {
cmd := restic.SnapshotsCommandByTag(tp.Profile, targetPath, deleteTag, encryptionKey)
for i, deleteTag := range deleteTags {
cmd := restic.SnapshotsCommandByTag(tp.Profile, targetPaths[i], deleteTag, encryptionKey)
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag)
}
deleteIdentifier, err = restic.SnapshotIDFromSnapshotLog(stdout)
deleteIdentifier, err := restic.SnapshotIDFromSnapshotLog(stdout)
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag)
}
deleteIdentifiers = append(deleteIdentifiers, deleteIdentifier)
}
if deleteIdentifier != "" {
cmd := restic.ForgetCommandByID(tp.Profile, targetPath, deleteIdentifier, encryptionKey)
for i, deleteIdentifier := range deleteIdentifiers {
cmd := restic.ForgetCommandByID(tp.Profile, targetPaths[i], deleteIdentifier, encryptionKey)
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data")
}
}
if reclaimSpace {
cmd := restic.PruneCommand(tp.Profile, targetPath, encryptionKey)
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to prune data after forget")
if reclaimSpace {
err := pruneData(cli, tp, pod, namespace, encryptionKey, targetPaths[i])
if err != nil {
return nil, errors.Wrapf(err, "Error executing prune command")
}
}
}

return nil, nil
}
}

func pruneData(cli kubernetes.Interface, tp param.TemplateParams, pod *v1.Pod, namespace, encryptionKey, targetPath string) error {
cmd := restic.PruneCommand(tp.Profile, targetPath, encryptionKey)
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
return errors.Wrapf(err, "Failed to prune data after forget")
}

func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, deleteArtifactPrefix, deleteIdentifier, deleteTag, encryptionKey string
var reclaimSpace bool
Expand All @@ -125,11 +133,15 @@ func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
if err = OptArg(args, DeleteDataReclaimSpace, &reclaimSpace, false); err != nil {
return nil, err
}
// Validate profile
if err = validateProfile(tp.Profile); err != nil {
return nil, err
}
cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
return deleteData(ctx, cli, tp, reclaimSpace, namespace, deleteArtifactPrefix, deleteTag, deleteIdentifier, encryptionKey)
return deleteData(ctx, cli, tp, reclaimSpace, namespace, encryptionKey, strings.Fields(deleteArtifactPrefix), strings.Fields(deleteTag), strings.Fields(deleteIdentifier), deleteDataJobPrefix)
}

func (*deleteDataFunc) RequiredArgs() []string {
Expand Down
86 changes: 86 additions & 0 deletions pkg/function/delete_data_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package function

import (
"context"
"encoding/json"
"fmt"

"github.com/pkg/errors"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/restic"
)

const (
// DeleteDataAllNamespaceArg provides the namespace
DeleteDataAllNamespaceArg = "namespace"
// DeleteDataAllBackupArtifactPrefixArg provides the path to restore backed up data
DeleteDataAllBackupArtifactPrefixArg = "backupArtifactPrefix"
// DeleteDataAllEncryptionKeyArg provides the encryption key to be used for deletes
DeleteDataAllEncryptionKeyArg = "encryptionKey"
// DeleteDataAllReclaimSpace provides a way to specify if space should be reclaimed
DeleteDataAllReclaimSpace = "reclaimSpace"
// DeleteDataAllBackupInfo provides backup info required for delete
DeleteDataAllBackupInfo = "backupInfo"
deleteDataAllJobPrefix = "delete-data-all-"
)

func init() {
kanister.Register(&deleteDataAllFunc{})
}

var _ kanister.Func = (*deleteDataAllFunc)(nil)

type deleteDataAllFunc struct{}

func (*deleteDataAllFunc) Name() string {
return "DeleteDataAll"
}

func (*deleteDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, deleteArtifactPrefix, backupInfo, encryptionKey string
var reclaimSpace bool
var err error
if err = Arg(args, DeleteDataAllNamespaceArg, &namespace); err != nil {
return nil, err
}
if err = Arg(args, DeleteDataAllBackupArtifactPrefixArg, &deleteArtifactPrefix); err != nil {
return nil, err
}
if err = Arg(args, DeleteDataAllBackupInfo, &backupInfo); err != nil {
return nil, err
}
if err = OptArg(args, DeleteDataAllEncryptionKeyArg, &encryptionKey, restic.GeneratePassword()); err != nil {
return nil, err
}
if err = OptArg(args, DeleteDataAllReclaimSpace, &reclaimSpace, false); err != nil {
return nil, err
}
// Validate profile
if err = validateProfile(tp.Profile); err != nil {
return nil, err
}
cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
input := make(map[string]BackupInfo)
err = json.Unmarshal([]byte(backupInfo), &input)
if err != nil {
return nil, errors.Wrapf(err, "Could not decode JSON data")
}
var targetPaths []string
var deleteIdentifiers []string
for pod, info := range input {
targetPaths = append(targetPaths, fmt.Sprintf("%s/%s", deleteArtifactPrefix, pod))
deleteIdentifiers = append(deleteIdentifiers, info.BackupID)
}

return deleteData(ctx, cli, tp, reclaimSpace, namespace, encryptionKey, targetPaths, nil, deleteIdentifiers, deleteDataAllJobPrefix)
}

func (*deleteDataAllFunc) RequiredArgs() []string {
return []string{DeleteDataAllNamespaceArg, DeleteDataAllBackupArtifactPrefixArg, DeleteDataAllBackupInfo}
}

0 comments on commit 8684b3e

Please sign in to comment.