Skip to content

Commit

Permalink
EFS VolumeCreate (#6082)
Browse files Browse the repository at this point in the history
Change Overview
Implements VolumeCreate for EFS

* Defines constants for throughput and performance modes for EFS
* Waiting method for the created EFS volume to be available
  • Loading branch information
Hakan Memisoglu committed Jul 29, 2019
1 parent 1356242 commit 906d4cb
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 3 deletions.
35 changes: 32 additions & 3 deletions pkg/blockstorage/awsefs/awsefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
awsefs "github.com/aws/aws-sdk-go/service/efs"
"github.com/aws/aws-sdk-go/service/iam"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"

"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/blockstorage/awsebs"
Expand All @@ -25,8 +26,16 @@ type efs struct {
var _ blockstorage.Provider = (*efs)(nil)

const (
generalPurposePerformanceMode = awsefs.PerformanceModeGeneralPurpose
maximumIOPerformanceMode = awsefs.PerformanceModeMaxIo
defaultPerformanceMode = generalPurposePerformanceMode

burstingThroughputMode = awsefs.ThroughputModeBursting
provisionedThroughputMode = awsefs.ThroughputModeProvisioned
defaultThroughputMode = burstingThroughputMode

k10BackupVaultName = "k10vault"

dummyMarker = ""
)

Expand Down Expand Up @@ -66,8 +75,28 @@ func (e *efs) Type() blockstorage.Type {
return blockstorage.TypeEFS
}

func (e *efs) VolumeCreate(context.Context, blockstorage.Volume) (*blockstorage.Volume, error) {
return nil, errors.New("Not implemented")
// VolumeCreate implements interface method for EFS. It sends EFS volume create request
// to AWS EFS and waits until the file system is available. Eventually, it returns the
// volume info that is sent back from the AWS EFS.
func (e *efs) VolumeCreate(ctx context.Context, volume blockstorage.Volume) (*blockstorage.Volume, error) {
req := &awsefs.CreateFileSystemInput{}
req.SetCreationToken(uuid.NewV4().String())
req.SetPerformanceMode(defaultPerformanceMode)
req.SetThroughputMode(defaultThroughputMode)
req.SetTags(convertToEFSTags(blockstorage.KeyValueToMap(volume.Tags)))

fd, err := e.CreateFileSystemWithContext(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "Failed to create EFS instance")
}
if err = e.waitUntilFileSystemAvailable(ctx, *fd.FileSystemId); err != nil {
return nil, errors.Wrap(err, "EFS instance is not available")
}
vol, err := e.VolumeGet(ctx, volume.ID, volume.Az)
if err != nil {
return nil, errors.Wrap(err, "Failed to get recently create EFS instance")
}
return vol, nil
}

func (e *efs) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/blockstorage/awsefs/conversion.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package awsefs

import (
"github.com/aws/aws-sdk-go/aws"
awsefs "github.com/aws/aws-sdk-go/service/efs"

"github.com/kanisterio/kanister/pkg/blockstorage"
)

Expand All @@ -22,6 +24,15 @@ func convertFromEFSTags(efsTags []*awsefs.Tag) map[string]string {
return tags
}

// convertToEFSTags converts a map to AWS EFS tags.
func convertToEFSTags(tags map[string]string) []*awsefs.Tag {
efsTags := make([]*awsefs.Tag, 0, len(tags))
for k, v := range tags {
efsTags = append(efsTags, &awsefs.Tag{Key: aws.String(k), Value: aws.String(v)})
}
return efsTags
}

// volumeFromEFSDescription converts an AWS EFS filesystem description to Kanister blockstorage Volume type
// using the information in the description.
//
Expand Down
33 changes: 33 additions & 0 deletions pkg/blockstorage/awsefs/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package awsefs

import (
"context"

awsefs "github.com/aws/aws-sdk-go/service/efs"

"github.com/kanisterio/kanister/pkg/poll"
)

const (
maxNumErrorRetries = 3
)

func (e *efs) waitUntilFileSystemAvailable(ctx context.Context, id string) error {
return poll.WaitWithRetries(ctx, maxNumErrorRetries, poll.IsAlwaysRetryable, func(ctx context.Context) (bool, error) {
req := &awsefs.DescribeFileSystemsInput{}
req.SetFileSystemId(id)

desc, err := e.DescribeFileSystemsWithContext(ctx, req)
if err != nil {
return false, err
}
if len(desc.FileSystems) == 0 {
return false, nil
}
state := desc.FileSystems[0].LifeCycleState
if state == nil {
return false, nil
}
return *state == awsefs.LifeCycleStateAvailable, nil
})
}
56 changes: 56 additions & 0 deletions pkg/poll/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ import (
// should be aborted.
type Func func(context.Context) (bool, error)

// IsRetryableFunc is the signature for functions that return true if we should
// retry an error
type IsRetryableFunc func(error) bool

// IsAlwaysRetryable instructs WaitWithRetries to retry until time expires.
func IsAlwaysRetryable(error) bool {
return true
}

// IsNeverRetryable instructs WaitWithRetries not to retry.
func IsNeverRetryable(error) bool {
return false
}

// Wait calls WaitWithBackoff with default backoff parameters. The defaults are
// handled by the "github.com/jpillora/backoff" and are:
// min = 100 * time.Millisecond
Expand Down Expand Up @@ -43,6 +57,48 @@ func WaitWithBackoff(ctx context.Context, b backoff.Backoff, f Func) error {
}
}

// WaitWithRetries will invoke a function `f` until it returns true or the
// context `ctx` is done. If `f` returns an error, WaitWithRetries will tolerate
// up to `numRetries` errors.
func WaitWithRetries(ctx context.Context, numRetries int, r IsRetryableFunc, f Func) error {
return WaitWithBackoffWithRetries(ctx, backoff.Backoff{}, numRetries, r, f)
}

// WaitWithBackoffWithRetries will invoke a function `f` until it returns true or the
// context `ctx` is done. If `f` returns an error, WaitWithBackoffWith retries will tolerate
// up to `numRetries` errors. If returned error is not retriable according to `r`, then
// it will bait out immediately. The wait time between retries will be decided by backoff
// parameters `b`.
func WaitWithBackoffWithRetries(ctx context.Context, b backoff.Backoff, numRetries int, r IsRetryableFunc, f Func) error {
if numRetries < 0 {
return errors.New("numRetries must be non-negative")
}

retries := 0
for {
ok, err := f(ctx)
if err != nil {
if !r(err) || retries >= numRetries {
return err
}
retries++
} else if ok {
return nil
}
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "Context done while polling")
default:
}
sleep := b.Duration()
if deadline, ok := ctx.Deadline(); ok {
ctxSleep := deadline.Sub(time.Now())
sleep = minDuration(sleep, ctxSleep)
}
time.Sleep(sleep)
}
}

func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
Expand Down

0 comments on commit 906d4cb

Please sign in to comment.