From a7c7e1ba1fd4a43571700e7fcb872f5ab3401a4e Mon Sep 17 00:00:00 2001 From: Hakan Memisoglu Date: Tue, 30 Jul 2019 11:37:17 -0700 Subject: [PATCH] EFS restore from snapshot (#6173) * EFS restore snapshot * Update with suggestions * Add suggestions --- pkg/blockstorage/awsefs/awsefs.go | 168 +++++++++++++++++++++++++- pkg/blockstorage/awsefs/conversion.go | 19 +++ pkg/blockstorage/awsefs/error.go | 7 ++ pkg/blockstorage/awsefs/wait.go | 48 ++++++++ 4 files changed, 241 insertions(+), 1 deletion(-) diff --git a/pkg/blockstorage/awsefs/awsefs.go b/pkg/blockstorage/awsefs/awsefs.go index 5a11b8170e..858d9fe034 100644 --- a/pkg/blockstorage/awsefs/awsefs.go +++ b/pkg/blockstorage/awsefs/awsefs.go @@ -3,6 +3,7 @@ package awsefs import ( "context" "fmt" + "strings" "github.com/aws/aws-sdk-go/aws" awsarn "github.com/aws/aws-sdk-go/aws/arn" @@ -12,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go/service/iam" "github.com/pkg/errors" uuid "github.com/satori/go.uuid" + "k8s.io/apimachinery/pkg/util/rand" "github.com/kanisterio/kanister/pkg/blockstorage" "github.com/kanisterio/kanister/pkg/blockstorage/awsebs" @@ -36,6 +38,7 @@ const ( provisionedThroughputMode = awsefs.ThroughputModeProvisioned defaultThroughputMode = burstingThroughputMode + efsType = "EFS" k10BackupVaultName = "k10vault" dummyMarker = "" ) @@ -101,7 +104,164 @@ func (e *efs) VolumeCreate(ctx context.Context, volume blockstorage.Volume) (*bl } func (e *efs) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) { - return nil, errors.New("Not implemented") + reqM := &backup.GetRecoveryPointRestoreMetadataInput{} + reqM.SetBackupVaultName(k10BackupVaultName) + reqM.SetRecoveryPointArn(snapshot.ID) + + respM, err := e.GetRecoveryPointRestoreMetadataWithContext(ctx, reqM) + if err != nil { + return nil, errors.Wrap(err, "Failed to get backup tag from recovery point directly") + } + rpTags := convertFromBackupTags(respM.RestoreMetadata) + rp2Tags, err := e.getBackupTags(ctx, snapshot.ID) + if err != nil { + return nil, errors.Wrap(err, "Failed to get backup tag from recovery point") + } + rpTags = kantags.Union(rpTags, rp2Tags) + // RestorePoint tags has some tags to describe saved mount targets. + // We need to get them and remove them from the tags + filteredTags, mountTargets, err := filterAndGetMountTargetsFromTags(rpTags) + if err != nil { + return nil, errors.Wrap(err, "Failed to get filtered tags and mount targets") + } + // Add some metadata which are necessary for EFS restore to function properly. + filteredTags = kantags.Union(filteredTags, efsRestoreTags()) + + req := &backup.StartRestoreJobInput{} + req.SetIamRoleArn(awsDefaultServiceBackupRole(e.accountID)) + req.SetMetadata(convertToBackupTags(filteredTags)) + req.SetRecoveryPointArn(snapshot.ID) + req.SetResourceType(efsType) + + resp, err := e.StartRestoreJobWithContext(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "Failed to start the restore job") + } + if resp.RestoreJobId == nil { + return nil, errors.New("Empty restore job ID") + } + restoreID := *resp.RestoreJobId + if err = e.waitUntilRestoreComplete(ctx, restoreID); err != nil { + return nil, errors.Wrap(err, "Restore job failed to complete") + } + respD := &backup.DescribeRestoreJobInput{} + respD.SetRestoreJobId(restoreID) + descJob, err := e.DescribeRestoreJobWithContext(ctx, respD) + if err != nil { + return nil, errors.Wrap(err, "Failed to get description for the restore job") + } + fsID, err := efsIDFromResourceARN(*descJob.CreatedResourceArn) + if err != nil { + return nil, errors.Wrap(err, "Failed to get filesystem ID") + } + if err = e.createMountTargets(ctx, fsID, mountTargets); err != nil { + return nil, errors.Wrap(err, "Failed to create mount targets") + } + return e.VolumeGet(ctx, fsID, "") +} + +func efsRestoreTags() map[string]string { + return map[string]string{ + "newFileSystem": "true", + "CreationToken": rand.String(16), + "Encrypted": "false", + "PerformanceMode": generalPurposePerformanceMode, + } +} + +type mountTarget struct { + subnetID string + securityGroups []string +} + +type mountTargets map[string]*mountTarget + +func (e *efs) createMountTargets(ctx context.Context, fsID string, mts mountTargets) error { + created := make([]*awsefs.MountTargetDescription, 0) + for _, v := range mts { + req := &awsefs.CreateMountTargetInput{} + req.SetFileSystemId(fsID) + req.SetSubnetId(v.subnetID) + req.SetSecurityGroups(convertListOfStrings(v.securityGroups)) + + mtd, err := e.CreateMountTargetWithContext(ctx, req) + if err != nil { + return errors.Wrap(err, "Failed to create mount target") + } + created = append(created, mtd) + } + + for _, desc := range created { + if err := e.waitUntilMountTargetReady(ctx, *desc.MountTargetId); err != nil { + return errors.Wrap(err, "Failed while waiting for Mount target to be ready") + } + } + return nil +} + +func parseMountTargetKey(key string) (string, error) { + if !strings.HasPrefix(key, mountTargetKeyPrefix) { + return "", errors.New("Malformed string for mount target key") + } + return key[len(mountTargetKeyPrefix):], nil +} + +func parseMountTargetValue(value string) (*mountTarget, error) { + // Format: + // String until the first "+" is subnetID + // After that "+" separates security groups + // Example value: + // subnet-123+securityGroup-1+securityGroup-2 + tokens := strings.Split(value, securityGroupSeperator) + if len(tokens) <= 1 { + return nil, errors.New("Malformed string for mount target values") + } + subnetID := tokens[0] + sgs := make([]string, 0) + if len(tokens[1]) != 0 { + sgs = append(sgs, tokens[1:]...) + } + return &mountTarget{ + subnetID: subnetID, + securityGroups: sgs, + }, nil +} + +func filterAndGetMountTargetsFromTags(tags map[string]string) (map[string]string, mountTargets, error) { + filteredTags := make(map[string]string) + mts := make(mountTargets) + for k, v := range tags { + if strings.HasPrefix(k, mountTargetKeyPrefix) { + id, err := parseMountTargetKey(k) + if err != nil { + return nil, nil, err + } + mt, err := parseMountTargetValue(v) + if err != nil { + return nil, nil, err + } + mts[id] = mt + } else { + // It is not a mount target tag, so pass it + filteredTags[k] = v + } + } + return filteredTags, mts, nil +} + +func (e *efs) getBackupTags(ctx context.Context, arn string) (map[string]string, error) { + result := make(map[string]string) + for resp, req := emptyResponseRequestForListTags(); resp.NextToken != nil; req.NextToken = resp.NextToken { + var err error + req.SetResourceArn(arn) + resp, err = e.ListTagsWithContext(ctx, req) + if err != nil { + return nil, err + } + tags := convertFromBackupTags(resp.Tags) + result = kantags.Union(result, tags) + } + return result, nil } func (e *efs) VolumeDelete(ctx context.Context, volume *blockstorage.Volume) error { @@ -271,6 +431,12 @@ func emptyResponseRequestForFilesystems() (*awsefs.DescribeFileSystemsOutput, *a return resp, req } +func emptyResponseRequestForListTags() (*backup.ListTagsOutput, *backup.ListTagsInput) { + resp := (&backup.ListTagsOutput{}).SetNextToken(dummyMarker) + req := &backup.ListTagsInput{} + return resp, req +} + func emptyResponseRequestForMountTargets() (*awsefs.DescribeMountTargetsOutput, *awsefs.DescribeMountTargetsInput) { resp := (&awsefs.DescribeMountTargetsOutput{}).SetNextMarker(dummyMarker) req := &awsefs.DescribeMountTargetsInput{} diff --git a/pkg/blockstorage/awsefs/conversion.go b/pkg/blockstorage/awsefs/conversion.go index 3001303373..a6f9ee4a65 100644 --- a/pkg/blockstorage/awsefs/conversion.go +++ b/pkg/blockstorage/awsefs/conversion.go @@ -74,6 +74,15 @@ func snapshotFromRecoveryPoint(rp *backup.DescribeRecoveryPointOutput, volume *b }, nil } +// convertFromBackupTags converts an AWS Backup compliant tag structure to a flattenned map. +func convertFromBackupTags(tags map[string]*string) map[string]string { + result := make(map[string]string) + for k, v := range tags { + result[k] = *v + } + return result +} + // convertToBackupTags converts a flattened map to AWS Backup compliant tag structure. func convertToBackupTags(tags map[string]string) map[string]*string { backupTags := make(map[string]*string) @@ -94,6 +103,16 @@ func convertToEFSTags(tags map[string]string) []*awsefs.Tag { return efsTags } +// convertListOfStrings converts a flattend list to a list where each +// element is a pointer to original elements. +func convertListOfStrings(strs []string) []*string { + result := make([]*string, 0) + for i := range strs { + result = append(result, &strs[i]) + } + return result +} + // volumeFromEFSDescription converts an AWS EFS filesystem description to Kanister blockstorage Volume type // using the information in the description. // diff --git a/pkg/blockstorage/awsefs/error.go b/pkg/blockstorage/awsefs/error.go index 10b38b3778..a9bd8a38fb 100644 --- a/pkg/blockstorage/awsefs/error.go +++ b/pkg/blockstorage/awsefs/error.go @@ -26,3 +26,10 @@ func isRecoveryPointNotFound(err error) bool { } return false } + +func isMountTargetNotFound(err error) bool { + if awsErr, ok := err.(awserr.Error); ok { + return awsErr.Code() == awsefs.ErrCodeMountTargetNotFound + } + return false +} diff --git a/pkg/blockstorage/awsefs/wait.go b/pkg/blockstorage/awsefs/wait.go index 07e40f85f6..dc0c3537af 100644 --- a/pkg/blockstorage/awsefs/wait.go +++ b/pkg/blockstorage/awsefs/wait.go @@ -5,6 +5,7 @@ import ( "github.com/aws/aws-sdk-go/service/backup" awsefs "github.com/aws/aws-sdk-go/service/efs" + "github.com/pkg/errors" "github.com/kanisterio/kanister/pkg/poll" ) @@ -74,3 +75,50 @@ func (e *efs) waitUntilRecoveryPointVisible(ctx context.Context, id string) erro return true, nil }) } + +func (e *efs) waitUntilMountTargetReady(ctx context.Context, mountTargetID string) error { + return poll.Wait(ctx, func(ctx context.Context) (bool, error) { + req := &awsefs.DescribeMountTargetsInput{} + req.SetMountTargetId(mountTargetID) + + desc, err := e.DescribeMountTargetsWithContext(ctx, req) + if isMountTargetNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + if len(desc.MountTargets) != 1 { + return false, errors.New("Returned list must have 1 entry") + } + mt := desc.MountTargets[0] + state := mt.LifeCycleState + if state == nil { + return false, nil + } + return *state == awsefs.LifeCycleStateAvailable, nil + }) +} + +func (e *efs) waitUntilRestoreComplete(ctx context.Context, restoreJobID string) error { + return poll.Wait(ctx, func(ctx context.Context) (bool, error) { + req := &backup.DescribeRestoreJobInput{} + req.SetRestoreJobId(restoreJobID) + + resp, err := e.DescribeRestoreJobWithContext(ctx, req) + if err != nil { + return false, err + } + if resp.Status == nil { + return false, errors.New("Failed to get restore job status") + } + switch *resp.Status { + case backup.RestoreJobStatusCompleted: + return true, nil + case backup.RestoreJobStatusAborted, backup.RestoreJobStatusFailed: + return false, errors.New("Restore job is not completed successfully") + default: + return false, nil + } + }) +}