Skip to content

Commit

Permalink
EFS SnapshotCreate (#6166)
Browse files Browse the repository at this point in the history
* EFS SnapshotCreate

* Address comments and remove duplicate
  • Loading branch information
Hakan Memisoglu authored and Ilya Kislenko committed Jul 30, 2019
1 parent c01c11c commit 5cc1f10
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 4 deletions.
106 changes: 102 additions & 4 deletions pkg/blockstorage/awsefs/awsefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package awsefs

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/aws"
awsarn "github.com/aws/aws-sdk-go/aws/arn"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/blockstorage/awsebs"
kantags "github.com/kanisterio/kanister/pkg/blockstorage/tags"
)

type efs struct {
Expand Down Expand Up @@ -126,7 +128,58 @@ func (e *efs) SnapshotCopy(ctx context.Context, from blockstorage.Snapshot, to b
}

func (e *efs) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Not implemented")
err := e.createK10DefaultBackupVault()
if err != nil {
return nil, errors.Wrap(err, "Failed to setup K10 vault for AWS Backup")
}
desc, err := e.getFileSystemDescriptionWithID(ctx, volume.ID)
if err != nil {
return nil, errors.Wrap(err, "Failed to get corresponding description")
}

req := &backup.StartBackupJobInput{}
req.SetBackupVaultName(k10BackupVaultName)
req.SetIamRoleArn(awsDefaultServiceBackupRole(e.accountID))
req.SetResourceArn(resourceARNForEFS(e.region, *desc.OwnerId, *desc.FileSystemId))

// Save mount points and security groups as tags
infraTags, err := e.getMountPointAndSecurityGroupTags(ctx, volume.ID)
if err != nil {
return nil, errors.Wrap(err, "Failed to get mount points and security groups")
}
allTags := kantags.Union(tags, infraTags)
req.SetRecoveryPointTags(convertToBackupTags(allTags))
resp, err := e.StartBackupJob(req)
if err != nil {
return nil, errors.Wrap(err, "Failed to start a backup job")
}
if err = e.waitUntilRecoveryPointVisible(ctx, *resp.RecoveryPointArn); err != nil {
return nil, errors.Wrap(err, "Failed to fetch recovery point")
}
if err = e.setBackupTags(ctx, *resp.RecoveryPointArn, infraTags); err != nil {
return nil, errors.Wrap(err, "Failed to set backup tags")
}
return &blockstorage.Snapshot{
CreationTime: blockstorage.TimeStamp(*resp.CreationDate),
Encrypted: volume.Encrypted,
ID: *resp.RecoveryPointArn,
Region: e.region,
Size: volume.Size,
Tags: blockstorage.MapToKeyValue(allTags),
Volume: &volume,
Type: blockstorage.TypeEFS,
}, nil
}

func (e *efs) createK10DefaultBackupVault() error {
req := &backup.CreateBackupVaultInput{}
req.SetBackupVaultName(k10BackupVaultName)

_, err := e.CreateBackupVault(req)
if isBackupVaultAlreadyExists(err) {
return nil
}
return err
}

func (e *efs) SnapshotCreateWaitForCompletion(ctx context.Context, snapshot *blockstorage.Snapshot) error {
Expand Down Expand Up @@ -218,6 +271,20 @@ func emptyResponseRequestForFilesystems() (*awsefs.DescribeFileSystemsOutput, *a
return resp, req
}

func emptyResponseRequestForMountTargets() (*awsefs.DescribeMountTargetsOutput, *awsefs.DescribeMountTargetsInput) {
resp := (&awsefs.DescribeMountTargetsOutput{}).SetNextMarker(dummyMarker)
req := &awsefs.DescribeMountTargetsInput{}
return resp, req
}

func awsDefaultServiceBackupRole(accountID string) string {
return fmt.Sprintf("arn:aws:iam::%s:role/service-role/AWSBackupDefaultServiceRole", accountID)
}

func resourceARNForEFS(region string, accountID string, fileSystemID string) string {
return fmt.Sprintf("arn:aws:elasticfilesystem:%s:%s:file-system/%s", region, accountID, fileSystemID)
}

func (e *efs) getFileSystemDescriptionWithID(ctx context.Context, id string) (*awsefs.FileSystemDescription, error) {
req := &awsefs.DescribeFileSystemsInput{}
req.SetFileSystemId(id)
Expand All @@ -227,14 +294,45 @@ func (e *efs) getFileSystemDescriptionWithID(ctx context.Context, id string) (*a
return nil, errors.Wrap(err, "Failed to get filesystem description")
}
availables := filterAvailable(descs.FileSystems)
var desc *awsefs.FileSystemDescription
switch len(availables) {
case 0:
return nil, errors.New("Failed to find volume")
case 1:
desc = descs.FileSystems[0]
return descs.FileSystems[0], nil
default:
return nil, errors.New("Unexpected condition, multiple filesystems with same ID")
}
return desc, nil
}

func (e *efs) getMountPointAndSecurityGroupTags(ctx context.Context, id string) (map[string]string, error) {
mts := make([]*awsefs.MountTargetDescription, 0)
for resp, req := emptyResponseRequestForMountTargets(); resp.NextMarker != nil; req.Marker = resp.NextMarker {
var err error
req.SetFileSystemId(id)
resp, err = e.DescribeMountTargetsWithContext(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "Failed to get mount targets")
}
mts = append(mts, resp.MountTargets...)
}
resultTags := make(map[string]string)
for _, mt := range mts {
req := &awsefs.DescribeMountTargetSecurityGroupsInput{}
req.SetMountTargetId(*mt.MountTargetId)

resp, err := e.DescribeMountTargetSecurityGroupsWithContext(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "Failed to get security group")
}
if mt.SubnetId == nil {
return nil, errors.New("Empty subnet ID in mount target entry")
}
value := mountTargetValue(*mt.SubnetId, resp.SecurityGroups)
if mt.MountTargetId == nil {
return nil, errors.New("Empty ID in mount target entry")
}
key := mountTargetKey(*mt.MountTargetId)
resultTags[key] = value
}
return resultTags, nil
}
21 changes: 21 additions & 0 deletions pkg/blockstorage/awsefs/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/kanisterio/kanister/pkg/blockstorage"
)

const (
securityGroupSeperator = "+"
mountTargetKeyPrefix = "kasten.io/aws-mount-target/"
)

// bytesInGiB calculates how many GiB is equal to the given bytes by rounding up
// the intermediate result.
func bytesInGiB(bytes int64) int64 {
Expand Down Expand Up @@ -118,3 +123,19 @@ func volumesFromEFSDescriptions(descriptions []*awsefs.FileSystemDescription, zo
}
return volumes
}

func mergeSecurityGroups(securityGroups []*string) string {
dereferenced := make([]string, 0, len(securityGroups))
for _, d := range securityGroups {
dereferenced = append(dereferenced, *d)
}
return strings.Join(dereferenced, securityGroupSeperator)
}

func mountTargetKey(mountTargetID string) string {
return mountTargetKeyPrefix + mountTargetID
}

func mountTargetValue(subnetID string, securityGroups []*string) string {
return subnetID + securityGroupSeperator + mergeSecurityGroups(securityGroups)
}
19 changes: 19 additions & 0 deletions pkg/blockstorage/awsefs/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,22 @@ func (e *efs) waitUntilRecoveryPointCompleted(ctx context.Context, id string) er
return *status == backup.RecoveryPointStatusCompleted, nil
})
}

func (e *efs) waitUntilRecoveryPointVisible(ctx context.Context, id string) error {
return poll.WaitWithRetries(ctx, maxNumErrorRetries, poll.IsAlwaysRetryable, func(ctx context.Context) (bool, error) {
req := &backup.DescribeRecoveryPointInput{}
req.SetBackupVaultName(k10BackupVaultName)
req.SetRecoveryPointArn(id)

_, err := e.DescribeRecoveryPointWithContext(ctx, req)
if isRecoveryPointNotFound(err) {
// Recovery point doesn't appear when the backup jobs finishes.
// Since this case is special, it will be counted as non-error.
return false, nil
}
if err != nil {
return false, err
}
return true, nil
})
}

0 comments on commit 5cc1f10

Please sign in to comment.