Skip to content

Commit

Permalink
Refactor helpers (#374)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavannd1 authored and mergify[bot] committed Oct 29, 2019
1 parent 0f922ac commit 673f595
Show file tree
Hide file tree
Showing 18 changed files with 231 additions and 228 deletions.
44 changes: 3 additions & 41 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
package function

import (
"bytes"
"context"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/consts"
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/format"
Expand Down Expand Up @@ -68,23 +66,6 @@ func (*backupDataFunc) Name() string {
return "BackupData"
}

func validateProfile(profile *param.Profile) error {
if profile == nil {
return errors.New("Profile must be non-nil")
}
if err := ValidateCredentials(&profile.Credential); err != nil {
return err
}
switch profile.Location.Type {
case crv1alpha1.LocationTypeS3Compliant:
case crv1alpha1.LocationTypeGCS:
case crv1alpha1.LocationTypeAzure:
default:
return errors.New("Location type not supported")
}
return nil
}

func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var namespace, pod, container, includePath, backupArtifactPrefix, encryptionKey string
var err error
Expand All @@ -108,8 +89,7 @@ func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
}
ctx = field.Context(ctx, consts.PodNameKey, pod)
ctx = field.Context(ctx, consts.ContainerNameKey, container)
// Validate the Profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
}
cli, err := kube.NewClient()
Expand Down Expand Up @@ -142,11 +122,11 @@ type backupDataParsedOutput struct {
}

func backupData(ctx context.Context, cli kubernetes.Interface, namespace, pod, container, backupArtifactPrefix, includePath, encryptionKey string, tp param.TemplateParams) (backupDataParsedOutput, error) {
pw, err := getPodWriter(cli, ctx, namespace, pod, container, tp.Profile)
pw, err := GetPodWriter(cli, ctx, namespace, pod, container, tp.Profile)
if err != nil {
return backupDataParsedOutput{}, err
}
defer cleanUpCredsFile(ctx, pw, namespace, pod, container)
defer CleanUpCredsFile(ctx, pw, namespace, pod, container)
if err = restic.GetOrCreateRepository(cli, namespace, pod, container, backupArtifactPrefix, encryptionKey, tp.Profile); err != nil {
return backupDataParsedOutput{}, err
}
Expand Down Expand Up @@ -179,21 +159,3 @@ func backupData(ctx context.Context, cli kubernetes.Interface, namespace, pod, c
fileCount: fileCount,
backupSize: backupSize}, nil
}

func getPodWriter(cli kubernetes.Interface, ctx context.Context, namespace, podName, containerName string, profile *param.Profile) (*kube.PodWriter, error) {
if profile.Location.Type == crv1alpha1.LocationTypeGCS {
pw := kube.NewPodWriter(cli, restic.GoogleCloudCredsFilePath, bytes.NewBufferString(profile.Credential.KeyPair.Secret))
if err := pw.Write(ctx, namespace, podName, containerName); err != nil {
return nil, err
}
return pw, nil
}
return nil, nil
}
func cleanUpCredsFile(ctx context.Context, pw *kube.PodWriter, namespace, podName, containerName string) {
if pw != nil {
if err := pw.Remove(ctx, namespace, podName, containerName); err != nil {
log.Error().WithContext(ctx).Print("Could not delete the temp file")
}
}
}
3 changes: 1 addition & 2 deletions pkg/function/backup_data_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ func (*backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, arg
return nil, err
}
ctx = field.Context(ctx, consts.ContainerNameKey, container)
// Validate the Profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
}
cli, err := kube.NewClient()
Expand Down
9 changes: 4 additions & 5 deletions pkg/function/backup_data_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/kanisterio/kanister/pkg"
kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
Expand Down Expand Up @@ -76,11 +76,11 @@ func backupDataStatsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, n
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
cmd, err := restic.StatsCommandByID(tp.Profile, backupArtifactPrefix, backupID, mode, encryptionKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -123,8 +123,7 @@ func (*BackupDataStatsFunc) Exec(ctx context.Context, tp param.TemplateParams, a
if err = OptArg(args, BackupDataStatsEncryptionKeyArg, &encryptionKey, restic.GeneratePassword()); err != nil {
return nil, err
}
// Validate the Profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
}
cli, err := kube.NewClient()
Expand Down
4 changes: 2 additions & 2 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
// Get restic repository
if err := restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/create_volume_from_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func createVolumeFromSnapshot(ctx context.Context, cli kubernetes.Interface, nam
if len(pvcNames) > 0 {
pvcName = pvcNames[i]
}
if err = ValidateProfile(profile, pvcInfo.Type); err != nil {
if err = ValidateLocationForBlockstorage(profile, pvcInfo.Type); err != nil {
return nil, errors.Wrap(err, "Profile validation failed")
}
config := getConfig(profile, pvcInfo.Type)
Expand Down
10 changes: 5 additions & 5 deletions pkg/function/create_volume_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"sync"

"github.com/pkg/errors"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -79,8 +79,8 @@ type volumeInfo struct {
region string
}

func ValidateProfile(profile *param.Profile, sType blockstorage.Type) error {
if err := validateProfile(profile); err != nil {
func ValidateLocationForBlockstorage(profile *param.Profile, sType blockstorage.Type) error {
if err := ValidateProfile(profile); err != nil {
return errors.Wrapf(err, "Profile Validation failed")
}
switch sType {
Expand Down Expand Up @@ -195,7 +195,7 @@ func getPVCInfo(ctx context.Context, kubeCli kubernetes.Interface, namespace str
switch {
case pv.Spec.AWSElasticBlockStore != nil:
ebs := pv.Spec.AWSElasticBlockStore
if err = ValidateProfile(tp.Profile, blockstorage.TypeEBS); err != nil {
if err = ValidateLocationForBlockstorage(tp.Profile, blockstorage.TypeEBS); err != nil {
return nil, errors.Wrap(err, "Profile validation failed")
}
// Get Region from PV label or EC2 metadata
Expand All @@ -221,7 +221,7 @@ func getPVCInfo(ctx context.Context, kubeCli kubernetes.Interface, namespace str
case pv.Spec.GCEPersistentDisk != nil:
gpd := pv.Spec.GCEPersistentDisk
region = ""
if err = ValidateProfile(tp.Profile, blockstorage.TypeGPD); err != nil {
if err = ValidateLocationForBlockstorage(tp.Profile, blockstorage.TypeGPD); err != nil {
return nil, errors.Wrap(err, "Profile validation failed")
}
if pvZone, ok := pvLabels[kubevolume.PVZoneLabelName]; ok {
Expand Down
7 changes: 3 additions & 4 deletions pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai
if (len(deleteIdentifiers) == 0) == (len(deleteTags) == 0) {
return nil, errors.Errorf("Require one argument: %s or %s", DeleteDataBackupIdentifierArg, DeleteDataBackupTagArg)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
for i, deleteTag := range deleteTags {
cmd, err := restic.SnapshotsCommandByTag(tp.Profile, targetPaths[i], deleteTag, encryptionKey)
if err != nil {
Expand Down Expand Up @@ -165,8 +165,7 @@ func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
return nil, err
}

// Validate profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, err
}
cli, err := kube.NewClient()
Expand Down
3 changes: 1 addition & 2 deletions pkg/function/delete_data_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func (*deleteDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, arg
return nil, err
}

// Validate profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, err
}
cli, err := kube.NewClient()
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/delete_volume_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func deleteVolumeSnapshot(ctx context.Context, cli kubernetes.Interface, namespa
// providerList required for unit testing
providerList := make(map[string]blockstorage.Provider)
for _, pvcInfo := range PVCData {
if err = ValidateProfile(profile, pvcInfo.Type); err != nil {
if err = ValidateLocationForBlockstorage(profile, pvcInfo.Type); err != nil {
return nil, errors.Wrap(err, "Profile validation failed")
}
config := getConfig(profile, pvcInfo.Type)
Expand Down
7 changes: 3 additions & 4 deletions pkg/function/describe_backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, n
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
snapshotIDs, err := restic.GetSnapshotIDs(tp.Profile, cli, targetPath, encryptionKey, namespace, pod.Name, pod.Spec.Containers[0].Name)
switch {
case err == nil:
Expand Down Expand Up @@ -153,8 +153,7 @@ func (*DescribeBackupsFunc) Exec(ctx context.Context, tp param.TemplateParams, a
return nil, err
}

// Validate profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, err
}
cli, err := kube.NewClient()
Expand Down
31 changes: 0 additions & 31 deletions pkg/function/helpers.go

This file was deleted.

3 changes: 1 addition & 2 deletions pkg/function/location_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func (*locationDeleteFunc) Exec(ctx context.Context, tp param.TemplateParams, ar
if err = Arg(args, LocationDeleteArtifactArg, &artifact); err != nil {
return nil, err
}
// Validate the Profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
}
return nil, location.Delete(ctx, *tp.Profile, strings.TrimPrefix(artifact, tp.Profile.Location.Bucket))
Expand Down
26 changes: 4 additions & 22 deletions pkg/function/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,6 @@ func validateAndGetOptArgs(args map[string]interface{}, tp param.TemplateParams)
return restorePath, encryptionKey, pod, vols, tag, id, podOverride, nil
}

func fetchPodVolumes(pod string, tp param.TemplateParams) (map[string]string, error) {
switch {
case tp.Deployment != nil:
if pvcToMountPath, ok := tp.Deployment.PersistentVolumeClaims[pod]; ok {
return pvcToMountPath, nil
}
return nil, errors.New("Failed to find volumes for the Pod: " + pod)
case tp.StatefulSet != nil:
if pvcToMountPath, ok := tp.StatefulSet.PersistentVolumeClaims[pod]; ok {
return pvcToMountPath, nil
}
return nil, errors.New("Failed to find volumes for the Pod: " + pod)
default:
return nil, errors.New("Invalid Template Params")
}
}

func restoreData(ctx context.Context, cli kubernetes.Interface, tp param.TemplateParams, namespace, encryptionKey, backupArtifactPrefix, restorePath, backupTag, backupID, jobPrefix, image string,
vols map[string]string, podOverride sp.JSONMap) (map[string]interface{}, error) {
// Validate volumes
Expand Down Expand Up @@ -150,11 +133,11 @@ func restoreDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, names
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}
pw, err := getPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
if err != nil {
return nil, err
}
defer cleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
var cmd []string
// Generate restore command based on the identifier passed
if backupTag != "" {
Expand Down Expand Up @@ -208,13 +191,12 @@ func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args
}
}

// Validate profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, err
}
if len(vols) == 0 {
// Fetch Volumes
vols, err = fetchPodVolumes(pod, tp)
vols, err = FetchPodVolumes(pod, tp)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/function/restore_data_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ func (*restoreDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, ar
return nil, err
}

// Validate profile
if err = validateProfile(tp.Profile); err != nil {
if err = ValidateProfile(tp.Profile); err != nil {
return nil, err
}
cli, err := kube.NewClient()
Expand All @@ -139,7 +138,7 @@ func (*restoreDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, ar
output := make(map[string]interface{})
for _, pod := range pods {
go func(pod string) {
vols, err := fetchPodVolumes(pod, tp)
vols, err := FetchPodVolumes(pod, tp)
var out map[string]interface{}
if err != nil {
errChan <- errors.Wrapf(err, "Failed to get volumes of pod %s", pod)
Expand Down
Loading

0 comments on commit 673f595

Please sign in to comment.