Skip to content

Commit

Permalink
EFS restore from snapshot (#6173)
Browse files Browse the repository at this point in the history
* EFS restore snapshot

* Update with suggestions

* Add suggestions
  • Loading branch information
Hakan Memisoglu authored and Ilya Kislenko committed Jul 30, 2019
1 parent 57f5651 commit a7c7e1b
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 1 deletion.
168 changes: 167 additions & 1 deletion pkg/blockstorage/awsefs/awsefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package awsefs
import (
"context"
"fmt"
"strings"

"github.com/aws/aws-sdk-go/aws"
awsarn "github.com/aws/aws-sdk-go/aws/arn"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/aws/aws-sdk-go/service/iam"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/blockstorage/awsebs"
Expand All @@ -36,6 +38,7 @@ const (
provisionedThroughputMode = awsefs.ThroughputModeProvisioned
defaultThroughputMode = burstingThroughputMode

efsType = "EFS"
k10BackupVaultName = "k10vault"
dummyMarker = ""
)
Expand Down Expand Up @@ -101,7 +104,164 @@ func (e *efs) VolumeCreate(ctx context.Context, volume blockstorage.Volume) (*bl
}

func (e *efs) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) {
return nil, errors.New("Not implemented")
reqM := &backup.GetRecoveryPointRestoreMetadataInput{}
reqM.SetBackupVaultName(k10BackupVaultName)
reqM.SetRecoveryPointArn(snapshot.ID)

respM, err := e.GetRecoveryPointRestoreMetadataWithContext(ctx, reqM)
if err != nil {
return nil, errors.Wrap(err, "Failed to get backup tag from recovery point directly")
}
rpTags := convertFromBackupTags(respM.RestoreMetadata)
rp2Tags, err := e.getBackupTags(ctx, snapshot.ID)
if err != nil {
return nil, errors.Wrap(err, "Failed to get backup tag from recovery point")
}
rpTags = kantags.Union(rpTags, rp2Tags)
// RestorePoint tags has some tags to describe saved mount targets.
// We need to get them and remove them from the tags
filteredTags, mountTargets, err := filterAndGetMountTargetsFromTags(rpTags)
if err != nil {
return nil, errors.Wrap(err, "Failed to get filtered tags and mount targets")
}
// Add some metadata which are necessary for EFS restore to function properly.
filteredTags = kantags.Union(filteredTags, efsRestoreTags())

req := &backup.StartRestoreJobInput{}
req.SetIamRoleArn(awsDefaultServiceBackupRole(e.accountID))
req.SetMetadata(convertToBackupTags(filteredTags))
req.SetRecoveryPointArn(snapshot.ID)
req.SetResourceType(efsType)

resp, err := e.StartRestoreJobWithContext(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "Failed to start the restore job")
}
if resp.RestoreJobId == nil {
return nil, errors.New("Empty restore job ID")
}
restoreID := *resp.RestoreJobId
if err = e.waitUntilRestoreComplete(ctx, restoreID); err != nil {
return nil, errors.Wrap(err, "Restore job failed to complete")
}
respD := &backup.DescribeRestoreJobInput{}
respD.SetRestoreJobId(restoreID)
descJob, err := e.DescribeRestoreJobWithContext(ctx, respD)
if err != nil {
return nil, errors.Wrap(err, "Failed to get description for the restore job")
}
fsID, err := efsIDFromResourceARN(*descJob.CreatedResourceArn)
if err != nil {
return nil, errors.Wrap(err, "Failed to get filesystem ID")
}
if err = e.createMountTargets(ctx, fsID, mountTargets); err != nil {
return nil, errors.Wrap(err, "Failed to create mount targets")
}
return e.VolumeGet(ctx, fsID, "")
}

func efsRestoreTags() map[string]string {
return map[string]string{
"newFileSystem": "true",
"CreationToken": rand.String(16),
"Encrypted": "false",
"PerformanceMode": generalPurposePerformanceMode,
}
}

type mountTarget struct {
subnetID string
securityGroups []string
}

type mountTargets map[string]*mountTarget

func (e *efs) createMountTargets(ctx context.Context, fsID string, mts mountTargets) error {
created := make([]*awsefs.MountTargetDescription, 0)
for _, v := range mts {
req := &awsefs.CreateMountTargetInput{}
req.SetFileSystemId(fsID)
req.SetSubnetId(v.subnetID)
req.SetSecurityGroups(convertListOfStrings(v.securityGroups))

mtd, err := e.CreateMountTargetWithContext(ctx, req)
if err != nil {
return errors.Wrap(err, "Failed to create mount target")
}
created = append(created, mtd)
}

for _, desc := range created {
if err := e.waitUntilMountTargetReady(ctx, *desc.MountTargetId); err != nil {
return errors.Wrap(err, "Failed while waiting for Mount target to be ready")
}
}
return nil
}

func parseMountTargetKey(key string) (string, error) {
if !strings.HasPrefix(key, mountTargetKeyPrefix) {
return "", errors.New("Malformed string for mount target key")
}
return key[len(mountTargetKeyPrefix):], nil
}

func parseMountTargetValue(value string) (*mountTarget, error) {
// Format:
// String until the first "+" is subnetID
// After that "+" separates security groups
// Example value:
// subnet-123+securityGroup-1+securityGroup-2
tokens := strings.Split(value, securityGroupSeperator)
if len(tokens) <= 1 {
return nil, errors.New("Malformed string for mount target values")
}
subnetID := tokens[0]
sgs := make([]string, 0)
if len(tokens[1]) != 0 {
sgs = append(sgs, tokens[1:]...)
}
return &mountTarget{
subnetID: subnetID,
securityGroups: sgs,
}, nil
}

func filterAndGetMountTargetsFromTags(tags map[string]string) (map[string]string, mountTargets, error) {
filteredTags := make(map[string]string)
mts := make(mountTargets)
for k, v := range tags {
if strings.HasPrefix(k, mountTargetKeyPrefix) {
id, err := parseMountTargetKey(k)
if err != nil {
return nil, nil, err
}
mt, err := parseMountTargetValue(v)
if err != nil {
return nil, nil, err
}
mts[id] = mt
} else {
// It is not a mount target tag, so pass it
filteredTags[k] = v
}
}
return filteredTags, mts, nil
}

func (e *efs) getBackupTags(ctx context.Context, arn string) (map[string]string, error) {
result := make(map[string]string)
for resp, req := emptyResponseRequestForListTags(); resp.NextToken != nil; req.NextToken = resp.NextToken {
var err error
req.SetResourceArn(arn)
resp, err = e.ListTagsWithContext(ctx, req)
if err != nil {
return nil, err
}
tags := convertFromBackupTags(resp.Tags)
result = kantags.Union(result, tags)
}
return result, nil
}

func (e *efs) VolumeDelete(ctx context.Context, volume *blockstorage.Volume) error {
Expand Down Expand Up @@ -271,6 +431,12 @@ func emptyResponseRequestForFilesystems() (*awsefs.DescribeFileSystemsOutput, *a
return resp, req
}

func emptyResponseRequestForListTags() (*backup.ListTagsOutput, *backup.ListTagsInput) {
resp := (&backup.ListTagsOutput{}).SetNextToken(dummyMarker)
req := &backup.ListTagsInput{}
return resp, req
}

func emptyResponseRequestForMountTargets() (*awsefs.DescribeMountTargetsOutput, *awsefs.DescribeMountTargetsInput) {
resp := (&awsefs.DescribeMountTargetsOutput{}).SetNextMarker(dummyMarker)
req := &awsefs.DescribeMountTargetsInput{}
Expand Down
19 changes: 19 additions & 0 deletions pkg/blockstorage/awsefs/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ func snapshotFromRecoveryPoint(rp *backup.DescribeRecoveryPointOutput, volume *b
}, nil
}

// convertFromBackupTags converts an AWS Backup compliant tag structure to a flattenned map.
func convertFromBackupTags(tags map[string]*string) map[string]string {
result := make(map[string]string)
for k, v := range tags {
result[k] = *v
}
return result
}

// convertToBackupTags converts a flattened map to AWS Backup compliant tag structure.
func convertToBackupTags(tags map[string]string) map[string]*string {
backupTags := make(map[string]*string)
Expand All @@ -94,6 +103,16 @@ func convertToEFSTags(tags map[string]string) []*awsefs.Tag {
return efsTags
}

// convertListOfStrings converts a flattend list to a list where each
// element is a pointer to original elements.
func convertListOfStrings(strs []string) []*string {
result := make([]*string, 0)
for i := range strs {
result = append(result, &strs[i])
}
return result
}

// volumeFromEFSDescription converts an AWS EFS filesystem description to Kanister blockstorage Volume type
// using the information in the description.
//
Expand Down
7 changes: 7 additions & 0 deletions pkg/blockstorage/awsefs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,10 @@ func isRecoveryPointNotFound(err error) bool {
}
return false
}

func isMountTargetNotFound(err error) bool {
if awsErr, ok := err.(awserr.Error); ok {
return awsErr.Code() == awsefs.ErrCodeMountTargetNotFound
}
return false
}
48 changes: 48 additions & 0 deletions pkg/blockstorage/awsefs/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/aws/aws-sdk-go/service/backup"
awsefs "github.com/aws/aws-sdk-go/service/efs"
"github.com/pkg/errors"

"github.com/kanisterio/kanister/pkg/poll"
)
Expand Down Expand Up @@ -74,3 +75,50 @@ func (e *efs) waitUntilRecoveryPointVisible(ctx context.Context, id string) erro
return true, nil
})
}

func (e *efs) waitUntilMountTargetReady(ctx context.Context, mountTargetID string) error {
return poll.Wait(ctx, func(ctx context.Context) (bool, error) {
req := &awsefs.DescribeMountTargetsInput{}
req.SetMountTargetId(mountTargetID)

desc, err := e.DescribeMountTargetsWithContext(ctx, req)
if isMountTargetNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
if len(desc.MountTargets) != 1 {
return false, errors.New("Returned list must have 1 entry")
}
mt := desc.MountTargets[0]
state := mt.LifeCycleState
if state == nil {
return false, nil
}
return *state == awsefs.LifeCycleStateAvailable, nil
})
}

func (e *efs) waitUntilRestoreComplete(ctx context.Context, restoreJobID string) error {
return poll.Wait(ctx, func(ctx context.Context) (bool, error) {
req := &backup.DescribeRestoreJobInput{}
req.SetRestoreJobId(restoreJobID)

resp, err := e.DescribeRestoreJobWithContext(ctx, req)
if err != nil {
return false, err
}
if resp.Status == nil {
return false, errors.New("Failed to get restore job status")
}
switch *resp.Status {
case backup.RestoreJobStatusCompleted:
return true, nil
case backup.RestoreJobStatusAborted, backup.RestoreJobStatusFailed:
return false, errors.New("Restore job is not completed successfully")
default:
return false, nil
}
})
}

0 comments on commit a7c7e1b

Please sign in to comment.