Skip to content

Commit

Permalink
Kanister function to backup data on all pods in parallel (#6096)
Browse files Browse the repository at this point in the history
* WIP: func to backup data on all pods in parallel

* Refactor and fix errors

* Address review suggestions

* avoid errors

* Update blueprint after testing with k10

* nit:remove commented code

* Add unit test

* Add context cancellation

* Revert context cancellation & remove podChan
  • Loading branch information
SupriyaKasten authored and Ilya Kislenko committed Aug 1, 2019
1 parent 81b2730 commit 23d62f9
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 63 deletions.
36 changes: 22 additions & 14 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,30 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
backupID, backupTag, err := backupData(ctx, cli, namespace, pod, container, backupArtifactPrefix, includePath, encryptionKey, tp)
if err != nil {
return nil, errors.Wrapf(err, "Failed to backup data")
}
output := map[string]interface{}{
BackupDataOutputBackupID: backupID,
BackupDataOutputBackupTag: backupTag,
}
return output, nil
}

func (*backupDataFunc) RequiredArgs() []string {
return []string{BackupDataNamespaceArg, BackupDataPodArg, BackupDataContainerArg,
BackupDataIncludePathArg, BackupDataBackupArtifactPrefixArg}
}

func backupData(ctx context.Context, cli kubernetes.Interface, namespace, pod, container, backupArtifactPrefix, includePath, encryptionKey string, tp param.TemplateParams) (string, string, error) {
pw, err := getPodWriter(cli, ctx, namespace, pod, container, tp.Profile)
if err != nil {
return nil, err
return "", "", err
}
defer cleanUpCredsFile(ctx, pw, namespace, pod, container)
if err = restic.GetOrCreateRepository(cli, namespace, pod, container, backupArtifactPrefix, encryptionKey, tp.Profile); err != nil {
return nil, err
return "", "", err
}

// Create backup and dump it on the object store
Expand All @@ -116,23 +133,14 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
format.Log(pod, container, stdout)
format.Log(pod, container, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create and upload backup")
return "", "", errors.Wrapf(err, "Failed to create and upload backup")
}
// Get the snapshot ID from log
backupID := restic.SnapshotIDFromBackupLog(stdout)
if backupID == "" {
return nil, errors.New("Failed to parse the backup ID from logs")
}
output := map[string]interface{}{
BackupDataOutputBackupID: backupID,
BackupDataOutputBackupTag: backupTag,
return "", "", errors.New("Failed to parse the backup ID from logs")
}
return output, nil
}

func (*backupDataFunc) RequiredArgs() []string {
return []string{BackupDataNamespaceArg, BackupDataPodArg, BackupDataContainerArg,
BackupDataIncludePathArg, BackupDataBackupArtifactPrefixArg}
return backupID, backupTag, nil
}

func getPodWriter(cli kubernetes.Interface, ctx context.Context, namespace, podName, containerName string, profile *param.Profile) (*kube.PodWriter, error) {
Expand Down
133 changes: 133 additions & 0 deletions pkg/function/backup_data_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package function

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/restic"
)

const (
// BackupDataAllNamespaceArg provides the namespace
BackupDataAllNamespaceArg = "namespace"
// BackupDataAllPodsArg provides the pods connected to the data volumes
BackupDataAllPodsArg = "pods"
// BackupDataAllContainerArg provides the container on which the backup is taken
BackupDataAllContainerArg = "container"
// BackupDataAllIncludePathArg provides the path of the volume or sub-path for required backup
BackupDataAllIncludePathArg = "includePath"
// BackupDataAllBackupArtifactPrefixArg provides the path to store artifacts on the object store
BackupDataAllBackupArtifactPrefixArg = "backupArtifactPrefix"
// BackupDataAllEncryptionKeyArg provides the encryption key to be used for backups
BackupDataAllEncryptionKeyArg = "encryptionKey"
// BackupDataAllOutput is the key name of the output generated by BackupDataAll func
BackupDataAllOutput = "BackupAllInfo"
)

type BackupInfo struct {
PodName string
BackupID string
BackupTag string
}

func init() {
kanister.Register(&backupDataAllFunc{})
}

var _ kanister.Func = (*backupDataAllFunc)(nil)

type backupDataAllFunc struct{}

func (*backupDataAllFunc) Name() string {
return "BackupDataAll"
}

func (*backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, pods, container, includePath, backupArtifactPrefix, encryptionKey string
var err error
if err = Arg(args, BackupDataAllNamespaceArg, &namespace); err != nil {
return nil, err
}
if err = Arg(args, BackupDataAllContainerArg, &container); err != nil {
return nil, err
}
if err = Arg(args, BackupDataAllIncludePathArg, &includePath); err != nil {
return nil, err
}
if err = Arg(args, BackupDataAllBackupArtifactPrefixArg, &backupArtifactPrefix); err != nil {
return nil, err
}
if err = OptArg(args, BackupDataAllPodsArg, &pods, ""); err != nil {
return nil, err
}
if err = OptArg(args, BackupDataAllEncryptionKeyArg, &encryptionKey, restic.GeneratePassword()); err != nil {
return nil, err
}
// Validate the Profile
if err = validateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
}
cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
var ps []string
if pods == "" {
switch {
case tp.Deployment != nil:
ps = tp.Deployment.Pods
case tp.StatefulSet != nil:
ps = tp.StatefulSet.Pods
default:
return nil, errors.New("Failed to get pods")
}
} else {
ps = strings.Fields(pods)
}
return backupDataAll(ctx, cli, namespace, ps, container, backupArtifactPrefix, includePath, encryptionKey, tp)
}

func (*backupDataAllFunc) RequiredArgs() []string {
return []string{BackupDataAllNamespaceArg, BackupDataAllContainerArg,
BackupDataAllIncludePathArg, BackupDataAllBackupArtifactPrefixArg}
}

func backupDataAll(ctx context.Context, cli kubernetes.Interface, namespace string, ps []string, container string, backupArtifactPrefix, includePath, encryptionKey string, tp param.TemplateParams) (map[string]interface{}, error) {
errChan := make(chan error, len(ps))
outChan := make(chan BackupInfo, len(ps))
Output := make(map[string]BackupInfo)
// Run the command
for _, pod := range ps {
go func(pod string, container string) {
backupID, backupTag, err := backupData(ctx, cli, namespace, pod, container, fmt.Sprintf("%s/%s", backupArtifactPrefix, pod), includePath, encryptionKey, tp)
errChan <- errors.Wrapf(err, "Failed to backup data for pod %s", pod)
outChan <- BackupInfo{PodName: pod, BackupID: backupID, BackupTag: backupTag}
}(pod, container)
}
errs := make([]string, 0, len(ps))
for i := 0; i < len(ps); i++ {
err := <-errChan
output := <-outChan
if err != nil {
errs = append(errs, err.Error())
} else {
Output[output.PodName] = output
}
}
if len(errs) != 0 {
return nil, errors.New(strings.Join(errs, "\n"))
}
manifestData, err := json.Marshal(Output)
if err != nil {
return nil, errors.Wrapf(err, "Failed to encode JSON data")
}
return map[string]interface{}{BackupDataAllOutput: string(manifestData)}, nil
}
102 changes: 53 additions & 49 deletions pkg/function/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package function

import (
"context"
"encoding/json"
"fmt"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -167,9 +168,31 @@ func newLocationDeleteBlueprint() *crv1alpha1.Blueprint {
}
}

func (s *DataSuite) TestBackupRestoreDeleteData(c *C) {
func newBackupDataAllBlueprint() *crv1alpha1.Blueprint {
return &crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"backup": &crv1alpha1.BlueprintAction{
Kind: param.StatefulSetKind,
Phases: []crv1alpha1.BlueprintPhase{
crv1alpha1.BlueprintPhase{
Name: "testBackupDataAll",
Func: "BackupDataAll",
Args: map[string]interface{}{
BackupDataAllNamespaceArg: "{{ .StatefulSet.Namespace }}",
BackupDataAllContainerArg: "{{ index .StatefulSet.Containers 0 0 }}",
BackupDataAllIncludePathArg: "/etc",
BackupDataAllBackupArtifactPrefixArg: "{{ .Profile.Location.Bucket }}/{{ .Profile.Location.Prefix }}",
},
},
},
},
},
}
}

func (s *DataSuite) getTemplateParamsAndPVCName(c *C, replicas int32, secretName string) (*param.TemplateParams, string) {
ctx := context.Background()
ss, err := s.cli.AppsV1().StatefulSets(s.namespace).Create(testutil.NewTestStatefulSet(1))
ss, err := s.cli.AppsV1().StatefulSets(s.namespace).Create(testutil.NewTestStatefulSet(replicas))
c.Assert(err, IsNil)
err = kube.WaitOnStatefulSetReady(ctx, s.cli, ss.GetNamespace(), ss.GetName())
c.Assert(err, IsNil)
Expand All @@ -180,7 +203,7 @@ func (s *DataSuite) TestBackupRestoreDeleteData(c *C) {

secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret-datatest",
Name: secretName,
Namespace: s.namespace,
},
Type: "Opaque",
Expand Down Expand Up @@ -214,6 +237,12 @@ func (s *DataSuite) TestBackupRestoreDeleteData(c *C) {
c.Assert(err, IsNil)
tp.Profile = s.profile

return tp, pvc.GetName()
}

func (s *DataSuite) TestBackupRestoreDeleteData(c *C) {
tp, pvc := s.getTemplateParamsAndPVCName(c, 1, "secret-datatest")

// Test backup
bp := *newBackupDataBlueprint()
out := runAction(c, bp, "backup", tp)
Expand All @@ -227,58 +256,15 @@ func (s *DataSuite) TestBackupRestoreDeleteData(c *C) {
tp.Options = options

// Test restore
bp = *newRestoreDataBlueprint(pvc.GetName(), RestoreDataBackupTagArg, BackupDataOutputBackupTag)
bp = *newRestoreDataBlueprint(pvc, RestoreDataBackupTagArg, BackupDataOutputBackupTag)
_ = runAction(c, bp, "restore", tp)

bp = *newLocationDeleteBlueprint()
_ = runAction(c, bp, "delete", tp)
}

func (s *DataSuite) TestBackupRestoreDataWithSnapshotID(c *C) {
ctx := context.Background()
ss, err := s.cli.AppsV1().StatefulSets(s.namespace).Create(testutil.NewTestStatefulSet(1))
c.Assert(err, IsNil)
err = kube.WaitOnStatefulSetReady(ctx, s.cli, ss.GetNamespace(), ss.GetName())
c.Assert(err, IsNil)

pvc := testutil.NewTestPVC()
pvc, err = s.cli.CoreV1().PersistentVolumeClaims(s.namespace).Create(pvc)
c.Assert(err, IsNil)

secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret-datatest-id",
Namespace: s.namespace,
},
Type: "Opaque",
StringData: map[string]string{
"password": "myPassword",
},
}
secret, err = s.cli.CoreV1().Secrets(s.namespace).Create(secret)

as := crv1alpha1.ActionSpec{
Object: crv1alpha1.ObjectReference{
Kind: param.StatefulSetKind,
Name: ss.GetName(),
Namespace: s.namespace,
},
Profile: &crv1alpha1.ObjectReference{
Name: testutil.TestProfileName,
Namespace: s.namespace,
},
Secrets: map[string]crv1alpha1.ObjectReference{
"backupKey": crv1alpha1.ObjectReference{
Kind: "Secret",
Name: secret.GetName(),
Namespace: s.namespace,
},
},
}

tp, err := param.New(ctx, s.cli, s.crCli, as)
c.Assert(err, IsNil)
tp.Profile = s.profile
tp, pvc := s.getTemplateParamsAndPVCName(c, 1, "secret-datatest-id")

// Test backup
bp := *newBackupDataBlueprint()
Expand All @@ -293,10 +279,28 @@ func (s *DataSuite) TestBackupRestoreDataWithSnapshotID(c *C) {
tp.Options = options

// Test restore with ID
bp = *newRestoreDataBlueprint(pvc.GetName(), RestoreDataBackupIdentifierArg, BackupDataOutputBackupID)
bp = *newRestoreDataBlueprint(pvc, RestoreDataBackupIdentifierArg, BackupDataOutputBackupID)
_ = runAction(c, bp, "restore", tp)
}

func (s *DataSuite) TestBackupDataAll(c *C) {
var replicas int32
replicas = 2
tp, _ := s.getTemplateParamsAndPVCName(c, replicas, "secret-datatest")

// Test backup
bp := *newBackupDataAllBlueprint()
out := runAction(c, bp, "backup", tp)
c.Assert(out[BackupDataAllOutput].(string), Not(Equals), "")

output := make(map[string]BackupInfo)
c.Assert(json.Unmarshal([]byte(out[BackupDataAllOutput].(string)), &output), IsNil)
c.Assert(int32(len(output)), Equals, replicas)
for k := range output {
c.Assert(k, Equals, output[k].PodName)
}
}

func newCopyDataTestBlueprint() crv1alpha1.Blueprint {
return crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
Expand Down

0 comments on commit 23d62f9

Please sign in to comment.