Skip to content

Commit

Permalink
Kanister function to restore from multiple pods in parallel (#6156)
Browse files Browse the repository at this point in the history
* WIP:func to restore from multiple pods in parallel

* Refactor

* Unit test

* Address reviews

* nit: uncomment code

* address review suggestions

* Refactore to reduce channle length

* use channel for output

* bug fix

* minor:Refactor
  • Loading branch information
SupriyaKasten authored and Ilya Kislenko committed Aug 2, 2019
1 parent cc0cc78 commit f2159f1
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 41 deletions.
120 changes: 82 additions & 38 deletions pkg/function/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,46 @@ func newBackupDataAllBlueprint() *crv1alpha1.Blueprint {
}
}

func (s *DataSuite) getTemplateParamsAndPVCName(c *C, replicas int32) (*param.TemplateParams, string) {
func newRestoreDataAllBlueprint() *crv1alpha1.Blueprint {
return &crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"restore": &crv1alpha1.BlueprintAction{
Kind: param.StatefulSetKind,
SecretNames: []string{
"backupKey",
},
Phases: []crv1alpha1.BlueprintPhase{
crv1alpha1.BlueprintPhase{
Name: "testRestoreDataAll",
Func: "RestoreDataAll",
Args: map[string]interface{}{
RestoreDataAllNamespaceArg: "{{ .StatefulSet.Namespace }}",
RestoreDataAllImageArg: "kanisterio/kanister-tools:0.20.0",
RestoreDataAllBackupArtifactPrefixArg: "{{ .Profile.Location.Bucket }}/{{ .Profile.Location.Prefix }}",
RestoreDataAllBackupInfo: fmt.Sprintf("{{ .Options.%s }}", BackupDataAllOutput),
RestoreDataAllRestorePathArg: "/mnt/data",
},
},
},
},
},
}
}

func (s *DataSuite) getTemplateParamsAndPVCName(c *C, replicas int32) (*param.TemplateParams, []string) {
ctx := context.Background()
ss, err := s.cli.AppsV1().StatefulSets(s.namespace).Create(testutil.NewTestStatefulSet(replicas))
c.Assert(err, IsNil)
err = kube.WaitOnStatefulSetReady(ctx, s.cli, ss.GetNamespace(), ss.GetName())
c.Assert(err, IsNil)

pvc := testutil.NewTestPVC()
pvc, err = s.cli.CoreV1().PersistentVolumeClaims(s.namespace).Create(pvc)
c.Assert(err, IsNil)
pvcs := []string{}
var i int32
for i = 0; i < replicas; i++ {
pvc := testutil.NewTestPVC()
pvc, err = s.cli.CoreV1().PersistentVolumeClaims(s.namespace).Create(pvc)
c.Assert(err, IsNil)
pvcs = append(pvcs, pvc.GetName())
}

secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -237,56 +267,60 @@ func (s *DataSuite) getTemplateParamsAndPVCName(c *C, replicas int32) (*param.Te
c.Assert(err, IsNil)
tp.Profile = s.profile

return tp, pvc.GetName()
return tp, pvcs
}

func (s *DataSuite) TestBackupRestoreDeleteData(c *C) {
tp, pvc := s.getTemplateParamsAndPVCName(c, 1)
tp, pvcs := s.getTemplateParamsAndPVCName(c, 1)

for _, pvc := range pvcs {
// Test backup
bp := *newBackupDataBlueprint()
out := runAction(c, bp, "backup", tp)
c.Assert(out[BackupDataOutputBackupID].(string), Not(Equals), "")
c.Assert(out[BackupDataOutputBackupTag].(string), Not(Equals), "")

options := map[string]string{
BackupDataOutputBackupID: out[BackupDataOutputBackupID].(string),
BackupDataOutputBackupTag: out[BackupDataOutputBackupTag].(string),
}
tp.Options = options

// Test backup
bp := *newBackupDataBlueprint()
out := runAction(c, bp, "backup", tp)
c.Assert(out[BackupDataOutputBackupID].(string), Not(Equals), "")
c.Assert(out[BackupDataOutputBackupTag].(string), Not(Equals), "")
// Test restore
bp = *newRestoreDataBlueprint(pvc, RestoreDataBackupTagArg, BackupDataOutputBackupTag)
_ = runAction(c, bp, "restore", tp)

options := map[string]string{
BackupDataOutputBackupID: out[BackupDataOutputBackupID].(string),
BackupDataOutputBackupTag: out[BackupDataOutputBackupTag].(string),
bp = *newLocationDeleteBlueprint()
_ = runAction(c, bp, "delete", tp)
}
tp.Options = options

// Test restore
bp = *newRestoreDataBlueprint(pvc, RestoreDataBackupTagArg, BackupDataOutputBackupTag)
_ = runAction(c, bp, "restore", tp)

bp = *newLocationDeleteBlueprint()
_ = runAction(c, bp, "delete", tp)
}

func (s *DataSuite) TestBackupRestoreDataWithSnapshotID(c *C) {
tp, pvc := s.getTemplateParamsAndPVCName(c, 1)

// Test backup
bp := *newBackupDataBlueprint()
out := runAction(c, bp, "backup", tp)
c.Assert(out[BackupDataOutputBackupID].(string), Not(Equals), "")
c.Assert(out[BackupDataOutputBackupTag].(string), Not(Equals), "")
tp, pvcs := s.getTemplateParamsAndPVCName(c, 1)
for _, pvc := range pvcs {
// Test backup
bp := *newBackupDataBlueprint()
out := runAction(c, bp, "backup", tp)
c.Assert(out[BackupDataOutputBackupID].(string), Not(Equals), "")
c.Assert(out[BackupDataOutputBackupTag].(string), Not(Equals), "")

options := map[string]string{
BackupDataOutputBackupID: out[BackupDataOutputBackupID].(string),
BackupDataOutputBackupTag: out[BackupDataOutputBackupTag].(string),
}
tp.Options = options

options := map[string]string{
BackupDataOutputBackupID: out[BackupDataOutputBackupID].(string),
BackupDataOutputBackupTag: out[BackupDataOutputBackupTag].(string),
// Test restore with ID
bp = *newRestoreDataBlueprint(pvc, RestoreDataBackupIdentifierArg, BackupDataOutputBackupID)
_ = runAction(c, bp, "restore", tp)
}
tp.Options = options

// Test restore with ID
bp = *newRestoreDataBlueprint(pvc, RestoreDataBackupIdentifierArg, BackupDataOutputBackupID)
_ = runAction(c, bp, "restore", tp)
}

func (s *DataSuite) TestBackupDataAll(c *C) {
var replicas int32
replicas = 2
tp, _ := s.getTemplateParamsAndPVCName(c, replicas)
tp, pvcs := s.getTemplateParamsAndPVCName(c, replicas)

// Test backup
bp := *newBackupDataAllBlueprint()
Expand All @@ -299,6 +333,16 @@ func (s *DataSuite) TestBackupDataAll(c *C) {
for k := range output {
c.Assert(k, Equals, output[k].PodName)
}
options := map[string]string{BackupDataAllOutput: out[BackupDataAllOutput].(string)}
tp.Options = options

for i, pod := range tp.StatefulSet.Pods {
tp.StatefulSet.PersistentVolumeClaims[pod] = map[string]string{pvcs[i]: "/mnt/data"}
}
// Test restore
bp = *newRestoreDataAllBlueprint()
_ = runAction(c, bp, "restore", tp)

}

func newCopyDataTestBlueprint() crv1alpha1.Blueprint {
Expand Down
6 changes: 3 additions & 3 deletions pkg/function/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func fetchPodVolumes(pod string, tp param.TemplateParams) (map[string]string, er
}
}

func restoreData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID string, vols map[string]string) (map[string]interface{}, error) {
func restoreData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID, jobPrefix string, vols map[string]string) (map[string]interface{}, error) {
// Validate volumes
for pvc := range vols {
if _, err := cli.CoreV1().PersistentVolumeClaims(namespace).Get(pvc, metav1.GetOptions{}); err != nil {
Expand All @@ -109,7 +109,7 @@ func restoreData(ctx context.Context, cli kubernetes.Interface, tp param.Templat
}
options := &kube.PodOptions{
Namespace: namespace,
GenerateName: restoreDataJobPrefix,
GenerateName: jobPrefix,
Image: kanisterToolsImage,
Command: []string{"sh", "-c", "tail -f /dev/null"},
Volumes: vols,
Expand Down Expand Up @@ -180,7 +180,7 @@ func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
return restoreData(ctx, cli, tp, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID, vols)
return restoreData(ctx, cli, tp, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID, restoreDataJobPrefix, vols)
}

func (*restoreDataFunc) RequiredArgs() []string {
Expand Down
150 changes: 150 additions & 0 deletions pkg/function/restore_data_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package function

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/pkg/errors"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/restic"
)

const (
restoreDataAllJobPrefix = "restore-data-all-"
// RestoreDataAllNamespaceArg provides the namespace
RestoreDataAllNamespaceArg = "namespace"
// RestoreDataAllImageArg provides the image of the container with required tools
RestoreDataAllImageArg = "image"
// RestoreDataAllBackupArtifactPrefixArg provides the path of the backed up artifact
RestoreDataAllBackupArtifactPrefixArg = "backupArtifactPrefix"
// RestoreDataAllRestorePathArg provides the path to restore backed up data
RestoreDataAllRestorePathArg = "restorePath"
// RestoreDataAllPodsArg provides the pod connected to the data volume
RestoreDataAllPodsArg = "pods"
// RestoreDataAllEncryptionKeyArg provides the encryption key used during backup
RestoreDataAllEncryptionKeyArg = "encryptionKey"
// RestoreDataAllBackupInfo provides backup info required for restore
RestoreDataAllBackupInfo = "backupInfo"
)

func init() {
kanister.Register(&restoreDataAllFunc{})
}

var _ kanister.Func = (*restoreDataAllFunc)(nil)

type restoreDataAllFunc struct{}

func (*restoreDataAllFunc) Name() string {
return "RestoreDataAll"
}

func validateAndGetRestoreAllOptArgs(args map[string]interface{}, tp param.TemplateParams) (string, string, []string, error) {
var restorePath, encryptionKey, pods string
var ps []string
var err error

if err = OptArg(args, RestoreDataAllRestorePathArg, &restorePath, "/"); err != nil {
return restorePath, encryptionKey, ps, err
}
if err = OptArg(args, RestoreDataAllEncryptionKeyArg, &encryptionKey, restic.GeneratePassword()); err != nil {
return restorePath, encryptionKey, ps, err
}
if err = OptArg(args, RestoreDataAllPodsArg, &pods, ""); err != nil {
return restorePath, encryptionKey, ps, err
}

if pods != "" {
ps = strings.Fields(pods)
} else {
switch {
case tp.Deployment != nil:
ps = tp.Deployment.Pods
case tp.StatefulSet != nil:
ps = tp.StatefulSet.Pods
default:
return restorePath, encryptionKey, ps, errors.New("Unsupported workload type")
}
}

return restorePath, encryptionKey, ps, nil
}

func (*restoreDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, image, backupArtifactPrefix, backupInfo string
var err error
if err = Arg(args, RestoreDataAllNamespaceArg, &namespace); err != nil {
return nil, err
}
if err = Arg(args, RestoreDataAllImageArg, &image); err != nil {
return nil, err
}
if err = Arg(args, RestoreDataAllBackupArtifactPrefixArg, &backupArtifactPrefix); err != nil {
return nil, err
}
if err = Arg(args, RestoreDataAllBackupInfo, &backupInfo); err != nil {
return nil, err
}
// Validate and get optional arguments
restorePath, encryptionKey, pods, err := validateAndGetRestoreAllOptArgs(args, tp)
if err != nil {
return nil, err
}
// Validate profile
if err = validateProfile(tp.Profile); err != nil {
return nil, err
}
cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
input := make(map[string]BackupInfo)
err = json.Unmarshal([]byte(backupInfo), &input)
if err != nil {
return nil, errors.Wrapf(err, "Could not decode JSON data")
}
var chanLen = len(pods)
errChan := make(chan error, chanLen)
outputChan := make(chan map[string]interface{}, chanLen)
output := make(map[string]interface{})
for _, pod := range pods {
go func(pod string) {
vols, err := fetchPodVolumes(pod, tp)
var out map[string]interface{}
if err != nil {
errChan <- errors.Wrapf(err, "Failed to get volumes of pod %s", pod)
outputChan <- out
return
}
out, err = restoreData(ctx, cli, tp, namespace, encryptionKey, fmt.Sprintf("%s/%s", backupArtifactPrefix, pod), restorePath, "", input[pod].BackupID, restoreDataAllJobPrefix, vols)
errChan <- errors.Wrapf(err, "Failed to restore data for pod %s", pod)
outputChan <- out
}(pod)
}
errs := make([]string, 0, chanLen)
for i := 0; i < chanLen; i++ {
err := <-errChan
out := <-outputChan
if err != nil {
errs = append(errs, err.Error())
} else {
for k, v := range out {
output[k] = v
}
}
}
if len(errs) != 0 {
return nil, errors.New(strings.Join(errs, "\n"))
}
return output, nil
}

func (*restoreDataAllFunc) RequiredArgs() []string {
return []string{RestoreDataAllNamespaceArg, RestoreDataAllImageArg,
RestoreDataAllBackupArtifactPrefixArg, RestoreDataAllBackupInfo}
}

0 comments on commit f2159f1

Please sign in to comment.