Skip to content

Commit

Permalink
Merge pull request #170 from kanisterio/sync
Browse files Browse the repository at this point in the history
GCS Support for Restic
  • Loading branch information
SupriyaKasten authored May 7, 2019
2 parents 0c6ecf2 + 5e80e7a commit f970ed8
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 39 deletions.
38 changes: 36 additions & 2 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package function

import (
"bytes"
"context"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
Expand Down Expand Up @@ -49,7 +52,16 @@ func validateProfile(profile *param.Profile) error {
if profile == nil {
return errors.New("Profile must be non-nil")
}
if profile.Location.Type != crv1alpha1.LocationTypeS3Compliant {
if profile.Credential.Type != param.CredentialTypeKeyPair {
return errors.New("Credential type not supported")
}
if len(profile.Credential.KeyPair.ID) == 0 {
return errors.New("Access key ID is not set")
}
if len(profile.Credential.KeyPair.Secret) == 0 {
return errors.New("Secret access key is not set")
}
if profile.Location.Type != crv1alpha1.LocationTypeS3Compliant && profile.Location.Type != crv1alpha1.LocationTypeGCS {
return errors.New("Location type not supported")
}
return nil
Expand Down Expand Up @@ -84,7 +96,11 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}

pw, err := getPodWriter(cli, ctx, namespace, pod, container, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, namespace, pod, container)
if err = restic.GetOrCreateRepository(cli, namespace, pod, container, backupArtifactPrefix, encryptionKey, tp.Profile); err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,3 +130,21 @@ func (*backupDataFunc) RequiredArgs() []string {
return []string{BackupDataNamespaceArg, BackupDataPodArg, BackupDataContainerArg,
BackupDataIncludePathArg, BackupDataBackupArtifactPrefixArg}
}

func getPodWriter(cli kubernetes.Interface, ctx context.Context, namespace, podName, containerName string, profile *param.Profile) (*kube.PodWriter, error) {
if profile.Location.Type == crv1alpha1.LocationTypeGCS {
pw := kube.NewPodWriter(cli, restic.GoogleCloudCredsFilePath, bytes.NewBufferString(profile.Credential.KeyPair.Secret))
if err := pw.Write(ctx, namespace, podName, containerName); err != nil {
return nil, err
}
return pw, nil
}
return nil, nil
}
func cleanUpCredsFile(ctx context.Context, pw *kube.PodWriter, namespace, podName, containerName string) {
if pw != nil {
if err := pw.Remove(ctx, namespace, podName, containerName); err != nil {
log.Errorf("Could not delete the temp file")
}
}
}
5 changes: 5 additions & 0 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
// Get restic repository
if err := restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
return nil, err
Expand Down
6 changes: 5 additions & 1 deletion pkg/function/create_volume_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ func ValidateProfile(profile *param.Profile, sType blockstorage.Type) error {
switch sType {
case blockstorage.TypeEBS:
if profile.Location.Type != crv1alpha1.LocationTypeS3Compliant {
return errors.New("Location type not supported")
return errors.Errorf("Location type %s not supported", profile.Location.Type)
}
if len(profile.Location.Region) == 0 {
return errors.New("Region is not set")
}
case blockstorage.TypeGPD:
if profile.Location.Type != crv1alpha1.LocationTypeGCS {
return errors.Errorf("Location type %s not supported ", profile.Location.Type)
}
}
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai
if (deleteIdentifier != "") == (deleteTag != "") {
return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
if deleteTag != "" {
cmd := restic.SnapshotsCommandByTag(tp.Profile, targetPath, deleteTag, encryptionKey)
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
Expand Down
34 changes: 18 additions & 16 deletions pkg/function/e2e_volume_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,17 @@ func (s *VolumeSnapshotTestSuite) SetUpTest(c *C) {
volToPvc := kube.StatefulSetVolumes(s.cli, ss, &pods[0])
pvc, _ := volToPvc[pods[0].Spec.Containers[0].VolumeMounts[0].Name]
c.Assert(len(pvc) > 0, Equals, true)
id, secret, err := s.getCreds(c, s.cli, s.namespace, pvc)
id, secret, locationType, err := s.getCreds(c, s.cli, s.namespace, pvc)
c.Assert(err, IsNil)
if id == "" || secret == "" {
c.Skip("Skipping the test since storage type not supported")
}

serviceKey, err := getServiceKey(c)
c.Assert(err, IsNil)
sec := NewTestProfileSecret(serviceKey, id, secret)
sec := NewTestProfileSecret(id, secret)
sec, err = s.cli.Core().Secrets(s.namespace).Create(sec)
c.Assert(err, IsNil)

p := NewTestProfile(s.namespace, sec.GetName())
p := NewTestProfile(s.namespace, sec.GetName(), locationType)
_, err = s.crCli.CrV1alpha1().Profiles(s.namespace).Create(p)
c.Assert(err, IsNil)

Expand All @@ -108,7 +106,7 @@ func (s *VolumeSnapshotTestSuite) SetUpTest(c *C) {
}

// NewTestProfileSecret function returns a pointer to a new Secret test object.
func NewTestProfileSecret(serviceKey string, id string, secret string) *v1.Secret {
func NewTestProfileSecret(id string, secret string) *v1.Secret {
return &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-secret-",
Expand All @@ -122,15 +120,19 @@ func NewTestProfileSecret(serviceKey string, id string, secret string) *v1.Secre

// NewTestProfile function returns a pointer to a new Profile test object that
// passes validation.
func NewTestProfile(namespace string, secretName string) *crv1alpha1.Profile {
func NewTestProfile(namespace string, secretName string, locationType crv1alpha1.LocationType) *crv1alpha1.Profile {
region := ""
if locationType == crv1alpha1.LocationTypeS3Compliant {
region = os.Getenv(AWSRegion)
}
return &crv1alpha1.Profile{
ObjectMeta: metav1.ObjectMeta{
Name: testutil.TestProfileName,
Namespace: namespace,
},
Location: crv1alpha1.Location{
Type: crv1alpha1.LocationTypeS3Compliant,
Region: os.Getenv(AWSRegion),
Type: locationType,
Region: region,
},
Credential: crv1alpha1.Credential{
Type: crv1alpha1.CredentialTypeKeyPair,
Expand Down Expand Up @@ -312,29 +314,29 @@ func (s *VolumeSnapshotTestSuite) TestVolumeSnapshot(c *C) {
}
}

func (s *VolumeSnapshotTestSuite) getCreds(c *C, cli kubernetes.Interface, namespace string, pvcname string) (string, string, error) {
func (s *VolumeSnapshotTestSuite) getCreds(c *C, cli kubernetes.Interface, namespace string, pvcname string) (string, string, crv1alpha1.LocationType, error) {
pvc, err := cli.Core().PersistentVolumeClaims(namespace).Get(pvcname, metav1.GetOptions{})
if err != nil {
return "", "", err
return "", "", "", err
}
pvName := pvc.Spec.VolumeName
pv, err := cli.Core().PersistentVolumes().Get(pvName, metav1.GetOptions{})
if err != nil {
return "", "", err
return "", "", "", err
}
switch {
case pv.Spec.AWSElasticBlockStore != nil:
_ = GetEnvOrSkip(c, AWSRegion)
return GetEnvOrSkip(c, awsebs.AccessKeyID), GetEnvOrSkip(c, awsebs.SecretAccessKey), nil
return GetEnvOrSkip(c, awsebs.AccessKeyID), GetEnvOrSkip(c, awsebs.SecretAccessKey), crv1alpha1.LocationTypeS3Compliant, nil

case pv.Spec.GCEPersistentDisk != nil:
serviceKey, err := getServiceKey(c)
if err != nil {
return "", "", err
return "", "", "", err
}
return "test_project_id", serviceKey, nil
return "test_project_id", serviceKey, crv1alpha1.LocationTypeGCS, nil
}
return "", "", nil
return "", "", "", nil
}

func getServiceKey(c *C) (string, error) {
Expand Down
63 changes: 54 additions & 9 deletions pkg/function/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ import (
"context"

"github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/restic"
)

const (
restoreDataJobPrefix = "restore-data-"
// RestoreDataNamespaceArg provides the namespace
RestoreDataNamespaceArg = "namespace"
// RestoreDataImageArg provides the image of the container with required tools
Expand Down Expand Up @@ -95,9 +100,56 @@ func fetchPodVolumes(pod string, tp param.TemplateParams) (map[string]string, er
}
}

func restoreData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID string, vols map[string]string) (map[string]interface{}, error) {
// Validate volumes
for pvc := range vols {
if _, err := cli.CoreV1().PersistentVolumeClaims(namespace).Get(pvc, metav1.GetOptions{}); err != nil {
return nil, errors.Wrapf(err, "Failed to retrieve PVC. Namespace %s, Name %s", namespace, pvc)
}
}
options := &kube.PodOptions{
Namespace: namespace,
GenerateName: restoreDataJobPrefix,
Image: kanisterToolsImage,
Command: []string{"sh", "-c", "tail -f /dev/null"},
Volumes: vols,
}
pr := kube.NewPodRunner(cli, options)
podFunc := restoreDataPodFunc(cli, tp, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID)
return pr.Run(ctx, podFunc)
}

func restoreDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
var cmd []string
// Generate restore command based on the identifier passed
if backupTag != "" {
cmd = restic.RestoreCommandByTag(tp.Profile, backupArtifactPrefix, backupTag, restorePath, encryptionKey)
} else if backupID != "" {
cmd = restic.RestoreCommandByID(tp.Profile, backupArtifactPrefix, backupID, restorePath, encryptionKey)
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create and upload backup")
}
out, err := parseLogAndCreateOutput(stdout)
return out, errors.Wrap(err, "Failed to parse phase output")
}
}

func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, image, backupArtifactPrefix, backupTag, backupID string
var cmd []string
var err error
if err = Arg(args, RestoreDataNamespaceArg, &namespace); err != nil {
return nil, err
Expand All @@ -117,25 +169,18 @@ func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args
if err = validateProfile(tp.Profile); err != nil {
return nil, err
}
// Generate restore command based on the identifier passed
if backupTag != "" {
cmd = restic.RestoreCommandByTag(tp.Profile, backupArtifactPrefix, backupTag, restorePath, encryptionKey)
} else if backupID != "" {
cmd = restic.RestoreCommandByID(tp.Profile, backupArtifactPrefix, backupID, restorePath, encryptionKey)
}
if len(vols) == 0 {
// Fetch Volumes
vols, err = fetchPodVolumes(pod, tp)
if err != nil {
return nil, err
}
}
// Call PrepareData with generated command
cli, err := kube.NewClient()
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
return prepareData(ctx, cli, namespace, "", image, vols, cmd...)
return restoreData(ctx, cli, tp, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID, vols)
}

func (*restoreDataFunc) RequiredArgs() []string {
Expand Down
8 changes: 7 additions & 1 deletion pkg/kube/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
. "gopkg.in/check.v1"
batch "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
Expand All @@ -20,7 +21,8 @@ type JobSuite struct{}

var _ = Suite(&JobSuite{})

const testJobName = "kanister-test-job"
var testJobName = "kanister-test-job"

const testJobNamespace = "default"
const testJobImage = "busybox"
const testJobServiceAccount = "default"
Expand All @@ -29,6 +31,10 @@ func (s *JobSuite) SetUpSuite(c *C) {
// c.Skip("Too slow")
}

func (s *JobSuite) SetUpTest(c *C) {
testJobName = testJobName + rand.String(5)
}

// Verifies that the Job object is not created if the job name is not specified.
func (s *JobSuite) TestJobsNoName(c *C) {
clientset, err := NewClient()
Expand Down
1 change: 1 addition & 0 deletions pkg/location/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
AWSAccessKeyID = "AWS_ACCESS_KEY_ID"
AWSSecretAccessKey = "AWS_SECRET_ACCESS_KEY"
GoogleCloudCreds = "GOOGLE_APPLICATION_CREDENTIALS"
GoogleProjectId = "GOOGLE_PROJECT_ID"
)

// Write pipes data from `in` into the location specified by `profile` and `suffix`.
Expand Down
37 changes: 27 additions & 10 deletions pkg/restic/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import (
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
)

const (
GoogleCloudCredsFilePath = "/tmp/creds.txt"
)

func shCommand(command string) []string {
return []string{"bash", "-o", "errexit", "-o", "pipefail", "-c", command}
}
Expand Down Expand Up @@ -107,17 +112,29 @@ const (
)

func resticArgs(profile *param.Profile, repository, encryptionKey string) []string {
s3Endpoint := awsS3Endpoint
if profile.Location.Endpoint != "" {
s3Endpoint = profile.Location.Endpoint
}
return []string{
fmt.Sprintf("export %s=%s\n", location.AWSAccessKeyID, profile.Credential.KeyPair.ID),
fmt.Sprintf("export %s=%s\n", location.AWSSecretAccessKey, profile.Credential.KeyPair.Secret),
fmt.Sprintf("export %s=%s\n", ResticPassword, encryptionKey),
fmt.Sprintf("export %s=s3:%s/%s\n", ResticRepository, s3Endpoint, repository),
ResticCommand,
switch profile.Location.Type {
case crv1alpha1.LocationTypeS3Compliant:
s3Endpoint := awsS3Endpoint
if profile.Location.Endpoint != "" {
s3Endpoint = profile.Location.Endpoint
}
return []string{
fmt.Sprintf("export %s=%s\n", location.AWSAccessKeyID, profile.Credential.KeyPair.ID),
fmt.Sprintf("export %s=%s\n", location.AWSSecretAccessKey, profile.Credential.KeyPair.Secret),
fmt.Sprintf("export %s=%s\n", ResticPassword, encryptionKey),
fmt.Sprintf("export %s=s3:%s/%s\n", ResticRepository, s3Endpoint, repository),
ResticCommand,
}
case crv1alpha1.LocationTypeGCS:
return []string{
fmt.Sprintf("export %s=%s\n", location.GoogleProjectId, profile.Credential.KeyPair.ID),
fmt.Sprintf("export %s=%s\n", location.GoogleCloudCreds, GoogleCloudCredsFilePath),
fmt.Sprintf("export %s=%s\n", ResticPassword, encryptionKey),
fmt.Sprintf("export %s=gs:%s/\n", ResticRepository, strings.Replace(repository, "/", ":/", 1)),
ResticCommand,
}
}
return nil
}

// GetOrCreateRepository will check if the repository already exists and initialize one if not
Expand Down

0 comments on commit f970ed8

Please sign in to comment.