Skip to content

Commit

Permalink
Add a CopyVolumeData function (#4074)
Browse files Browse the repository at this point in the history
* Add a CopyVolumeData function

This Kanister function can be used to copy data from a PVC
to an objectstore location. It leverages the restic pkg and the
artifacts should be compatible with the `RestoreData` function

* Address review comments
  • Loading branch information
Vaibhav Kamra authored and Ilya Kislenko committed Oct 10, 2018
1 parent cfa7d63 commit a3c277a
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 0 deletions.
97 changes: 97 additions & 0 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package function

import (
"context"
"fmt"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/restic"
)

const (
kanisterToolsImage = "kanisterio/kanister-tools:0.12.0"
copyVolumeDataMountPoint = "/mnt/vol_data/%s"
copyVolumeDataJobPrefix = "copy-vol-data-"
CopyVolumeDataNamespaceArg = "namespace"
CopyVolumeDataVolumeArg = "volume"
CopyVolumeDataArtifactPrefixArg = "dataArtifactPrefix"
)

func init() {
kanister.Register(&copyVolumeDataFunc{})
}

var _ kanister.Func = (*copyVolumeDataFunc)(nil)

type copyVolumeDataFunc struct{}

func (*copyVolumeDataFunc) Name() string {
return "CopyVolumeData"
}

func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, namespace, pvc, targetPath string) (map[string]interface{}, error) {
// Validate PVC exists
if _, err := cli.CoreV1().PersistentVolumeClaims(namespace).Get(pvc, metav1.GetOptions{}); err != nil {
return nil, errors.Wrapf(err, "Failed to retrieve PVC. Namespace %s, Name %s", namespace, pvc)
}
// Create a pod with PVCs attached
mountPoint := fmt.Sprintf(copyVolumeDataMountPoint, pvc)
pod, err := kube.CreatePod(ctx, cli, &kube.PodOptions{
Namespace: namespace,
GenerateName: copyVolumeDataJobPrefix,
Image: kanisterToolsImage,
Command: []string{"sh", "-c", "tail -f /dev/null"},
Volumes: map[string]string{pvc: mountPoint},
})
if err != nil {
return nil, errors.Wrapf(err, "Failed to create pod to copy volume data")
}
defer kube.DeletePod(context.Background(), cli, pod)

// Get restic repository
if err = restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, tp.Profile); err != nil {
return nil, err
}

// Copy data to object store
backupIdentifier := rand.String(10)
cmd := restic.BackupCommand(tp.Profile, targetPath, backupIdentifier, mountPoint)
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create and upload backup")
}
return map[string]interface{}{"backupID": backupIdentifier}, nil
}

func (*copyVolumeDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, vol, targetPath string
var err error
if err = Arg(args, CopyVolumeDataNamespaceArg, &namespace); err != nil {
return nil, err
}
if err = Arg(args, CopyVolumeDataVolumeArg, &vol); err != nil {
return nil, err
}
if err = Arg(args, CopyVolumeDataArtifactPrefixArg, &targetPath); err != nil {
return nil, err
}
cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
return copyVolumeData(ctx, cli, tp, namespace, vol, targetPath)
}

func (*copyVolumeDataFunc) RequiredArgs() []string {
return []string{CopyVolumeDataNamespaceArg, CopyVolumeDataVolumeArg, CopyVolumeDataArtifactPrefixArg}
}
52 changes: 52 additions & 0 deletions pkg/function/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *DataSuite) SetUpSuite(c *C) {
s.crCli = crCli

ns := testutil.NewTestNamespace()
ns.GenerateName = "kanister-datatest-"

cns, err := s.cli.Core().Namespaces().Create(ns)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -152,3 +153,54 @@ func (s *DataSuite) TestBackupRestoreData(c *C) {
}
}
}

func newCopyDataBlueprint() crv1alpha1.Blueprint {
return crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"copy": &crv1alpha1.BlueprintAction{
Phases: []crv1alpha1.BlueprintPhase{
crv1alpha1.BlueprintPhase{
Name: "testCopy",
Func: "CopyVolumeData",
Args: map[string]interface{}{
CopyVolumeDataNamespaceArg: "{{ .PVC.Namespace }}",
CopyVolumeDataVolumeArg: "{{ .PVC.Name }}",
CopyVolumeDataArtifactPrefixArg: "{{ .Profile.Location.S3Compliant.Bucket }}/{{ .Profile.Location.S3Compliant.Prefix }}/{{ .PVC.Namespace }}/{{ .PVC.Name }}",
},
},
},
},
},
}
}
func (s *DataSuite) TestCopyData(c *C) {
ctx := context.Background()
pvc := testutil.NewTestPVC()
pvc, err := s.cli.CoreV1().PersistentVolumeClaims(s.namespace).Create(pvc)
c.Assert(err, IsNil)

as := crv1alpha1.ActionSpec{
Object: crv1alpha1.ObjectReference{
Kind: param.PVCKind,
Name: pvc.Name,
Namespace: pvc.Namespace,
},
Profile: &crv1alpha1.ObjectReference{
Name: testutil.TestProfileName,
Namespace: s.namespace,
},
}

tp, err := param.New(ctx, s.cli, s.crCli, as)
c.Assert(err, IsNil)

tp.Profile = testutil.ObjectStoreProfileOrSkip(c)

bp := newCopyDataBlueprint()
phases, err := kanister.GetPhases(bp, "copy", *tp)
c.Assert(err, IsNil)
for _, p := range phases {
_, err = p.Exec(context.Background(), bp, "copy", *tp)
c.Assert(err, IsNil)
}
}

0 comments on commit a3c277a

Please sign in to comment.