Skip to content

Commit

Permalink
Split SnapshotCreate into Create/Wait methods (#4569)
Browse files Browse the repository at this point in the history
* Split SnapshotCreate into Create/Wait methods

This allows the caller to initiate snapshot creation and wait on
completion separately

* Cleanup

* Add `skipWait` optional parameter

The `CreateVolumeSnapshot` method will not wait for snapshot
completion if this is set

* Add a Snapshot Wait function (#4570)

Allows the caller to wait for completion of a previously
initiated snapshot operation.

Also updated the e2e test blueprint
  • Loading branch information
Vaibhav Kamra authored and Ilya Kislenko committed Dec 17, 2018
1 parent 977a152 commit 6ba94b9
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 37 deletions.
16 changes: 16 additions & 0 deletions docs/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ Arguments:

`namespace`, Yes, `string`, namespace in which to execute
`pvcs`, No, `[]string`, list of names of PVCs to be backed up
`skipWait`, No, `bool`, initiate but do not wait for the snapshot operation to complete

When no PVCs are specified in the `pvcs` argument above, all PVCs in use by a
Deployment or StatefulSet will be backed up.
Expand Down Expand Up @@ -490,6 +491,21 @@ of this phase is saved to an Artifact named `backupInfo`, shown below:
args:
namespace: "{{ .Deployment.Namespace }}"
WaitForSnapshotCompletion
-------------------------

This function is used to wait for completion of snapshot operations
initiated using the :ref:`createvolumesnapshot` function.

Arguments:

.. csv-table::
:header: "Argument", "Required", "Type", "Description"
:align: left
:widths: 5,5,5,15

`snapshots`, Yes, `string`, snapshot info generated as output in CreateVolumeSnapshot function

CreateVolumeFromSnapshot
------------------------

Expand Down
44 changes: 20 additions & 24 deletions pkg/blockstorage/awsebs/awsebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,47 +273,43 @@ func (s *ebsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna
func (s *ebsStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
// Snapshot the EBS volume
csi := (&ec2.CreateSnapshotInput{}).SetVolumeId(volume.ID)
var snapID string
alltags := ktags.GetTags(tags)
csi.SetTagSpecifications([]*ec2.TagSpecification{
&ec2.TagSpecification{
ResourceType: aws.String(ec2.ResourceTypeSnapshot),
Tags: mapToEC2Tags(ktags.GetTags(tags)),
},
})
log.Infof("Snapshotting EBS volume: %s", *csi.VolumeId)
csi.SetDryRun(s.ec2Cli.DryRun)
snap, err := s.ec2Cli.CreateSnapshotWithContext(ctx, csi)
if isDryRunErr(err) {
snapID = ""
} else {
if err != nil {
return nil, errors.Wrapf(err, "Failed to create snapshot, volume_id: %s", *csi.VolumeId)
}
if err = setResourceTags(ctx, s.ec2Cli, aws.StringValue(snap.SnapshotId), alltags); err != nil {
return nil, err
}
err = waitOnSnapshot(ctx, s.ec2Cli, snap)
if err != nil {
return nil, errors.Wrapf(err, "Waiting on snapshot %v", snap)
}
snapID = aws.StringValue(snap.SnapshotId)
}

snaps, err := getSnapshots(ctx, s.ec2Cli, []*string{&snapID})
if err != nil {
return nil, err
if err != nil && !isDryRunErr(err) {
return nil, errors.Wrapf(err, "Failed to create snapshot, volume_id: %s", *csi.VolumeId)
}

ebssnap := snaps[0]
region, err := availabilityZoneToRegion(ctx, s.ec2Cli, volume.Az)
if err != nil {
return nil, err
}

ms := s.snapshotParse(ctx, ebssnap)
ms := s.snapshotParse(ctx, snap)
ms.Region = region
for _, tag := range ebssnap.Tags {
for _, tag := range snap.Tags {
ms.Tags = append(ms.Tags, &blockstorage.KeyValue{Key: aws.StringValue(tag.Key), Value: aws.StringValue(tag.Value)})
}
ms.Volume = &volume
return ms, nil
}

func (s *ebsStorage) SnapshotCreateWaitForCompletion(ctx context.Context, snap *blockstorage.Snapshot) error {
if s.ec2Cli.DryRun {
return nil
}
if err := waitOnSnapshotID(ctx, s.ec2Cli, snap.ID); err != nil {
return errors.Wrapf(err, "Waiting on snapshot %v", snap)
}
return nil
}

func (s *ebsStorage) SnapshotDelete(ctx context.Context, snapshot *blockstorage.Snapshot) error {
log.Infof("EBS Snapshot ID %s", snapshot.ID)
rmsi := &ec2.DeleteSnapshotInput{}
Expand Down
1 change: 1 addition & 0 deletions pkg/blockstorage/blockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Provider interface {
// Snapshot operations
SnapshotCopy(ctx context.Context, from Snapshot, to Snapshot) (*Snapshot, error)
SnapshotCreate(ctx context.Context, volume Volume, tags map[string]string) (*Snapshot, error)
SnapshotCreateWaitForCompletion(context.Context, *Snapshot) error
SnapshotDelete(context.Context, *Snapshot) error
SnapshotGet(ctx context.Context, id string) (*Snapshot, error)
// Others
Expand Down
1 change: 1 addition & 0 deletions pkg/blockstorage/blockstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (s *BlockStorageProviderSuite) createSnapshot(c *C) *blockstorage.Snapshot
c.Assert(err, IsNil)
s.snapshots = append(s.snapshots, ret)
s.checkTagsExist(c, blockstorage.KeyValueToMap(ret.Tags), tags)
c.Assert(s.provider.SnapshotCreateWaitForCompletion(context.Background(), ret), IsNil)
return ret
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/function/create_volume_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
const (
CreateVolumeSnapshotNamespaceArg = "namespace"
CreateVolumeSnapshotPVCsArg = "pvcs"
CreateVolumeSnapshotSkipWaitArg = "skipWait"
)

type createVolumeSnapshotFunc struct{}
Expand Down Expand Up @@ -83,7 +84,7 @@ func ValidateProfile(profile *param.Profile) error {
return nil
}

func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kubernetes.Interface, namespace string, pvcs []string, getter getter.Getter) (map[string]interface{}, error) {
func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kubernetes.Interface, namespace string, pvcs []string, getter getter.Getter, skipWait bool) (map[string]interface{}, error) {
vols := make([]volumeInfo, 0, len(pvcs))
for _, pvc := range pvcs {
volInfo, err := getPVCInfo(ctx, cli, namespace, pvc, tp, getter)
Expand All @@ -100,7 +101,7 @@ func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kube
wg.Add(1)
go func(volInfo volumeInfo) {
defer wg.Done()
volSnapInfo, err := snapshotVolume(ctx, volInfo, namespace)
volSnapInfo, err := snapshotVolume(ctx, volInfo, namespace, skipWait)
if err != nil {
errstrings = append(errstrings, err.Error())
} else {
Expand All @@ -124,7 +125,7 @@ func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kube
return map[string]interface{}{"volumeSnapshotInfo": string(manifestData)}, nil
}

func snapshotVolume(ctx context.Context, volume volumeInfo, namespace string) (*VolumeSnapshotInfo, error) {
func snapshotVolume(ctx context.Context, volume volumeInfo, namespace string, skipWait bool) (*VolumeSnapshotInfo, error) {
provider := volume.provider
vol, err := provider.VolumeGet(ctx, volume.volumeID, volume.volZone)
if err != nil {
Expand All @@ -145,6 +146,11 @@ func snapshotVolume(ctx context.Context, volume volumeInfo, namespace string) (*
if err != nil {
return nil, err
}
if !skipWait {
if err := provider.SnapshotCreateWaitForCompletion(ctx, snap); err != nil {
return nil, errors.Wrap(err, "Snapshot creation did not complete")
}
}
return &VolumeSnapshotInfo{SnapshotID: snap.ID, Type: volume.sType, Region: volume.region, PVCName: volume.pvc, Az: snap.Volume.Az, Tags: snap.Volume.Tags, VolumeType: snap.Volume.VolumeType}, nil
}

Expand Down Expand Up @@ -229,20 +235,24 @@ func (kef *createVolumeSnapshotFunc) Exec(ctx context.Context, tp param.Template
}
var namespace string
var pvcs []string
var skipWait bool
if err = Arg(args, CreateVolumeSnapshotNamespaceArg, &namespace); err != nil {
return nil, err
}
if err = OptArg(args, CreateVolumeSnapshotPVCsArg, &pvcs, nil); err != nil {
return nil, err
}
if err = OptArg(args, CreateVolumeSnapshotSkipWaitArg, &skipWait, nil); err != nil {
return nil, err
}
if len(pvcs) == 0 {
// Fetch Volumes
pvcs, err = getPVCList(tp)
if err != nil {
return nil, err
}
}
return createVolumeSnapshot(ctx, tp, cli, namespace, pvcs, getter.New())
return createVolumeSnapshot(ctx, tp, cli, namespace, pvcs, getter.New(), skipWait)
}

func (*createVolumeSnapshotFunc) RequiredArgs() []string {
Expand Down
22 changes: 13 additions & 9 deletions pkg/function/e2e_volume_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ func newVolumeSnapshotBlueprint() *crv1alpha1.Blueprint {
Func: "CreateVolumeSnapshot",
Args: map[string]interface{}{
CreateVolumeSnapshotNamespaceArg: "{{ .StatefulSet.Namespace }}",
CreateVolumeSnapshotSkipWaitArg: true,
},
},
{
Name: "waitOnSnapshots",
Func: "WaitForSnapshotCompletion",
Args: map[string]interface{}{
WaitForSnapshotCompletionSnapshotsArg: "{{ .Phases.testBackupVolume.Output.volumeSnapshotInfo }}",
},
},
},
Expand Down Expand Up @@ -288,21 +296,17 @@ func (s *VolumeSnapshotTestSuite) TestVolumeSnapshot(c *C) {
phases, err := kanister.GetPhases(*bp, action, *s.tp)
c.Assert(err, IsNil)
for _, p := range phases {
c.Assert(param.InitPhaseParams(ctx, s.cli, s.tp, p.Name(), p.Objects()), IsNil)
output, err := p.Exec(ctx, *bp, action, *s.tp)
if err != nil && strings.Contains(err.Error(), skipTestErrorMsg) {
c.Skip("Skipping the test since storage type not supported")
}
c.Assert(err, IsNil)
param.UpdatePhaseParams(ctx, s.tp, p.Name(), output)
if action == "backup" {
keyval := make(map[string]string)
c.Assert(output, NotNil)
c.Assert(output[volumeSnapshotInfoKey], NotNil)
keyval[manifestKey] = output[volumeSnapshotInfoKey].(string)
artifact := crv1alpha1.Artifact{
KeyValue: keyval,
}
s.tp.ArtifactsIn = make(map[string]crv1alpha1.Artifact)
s.tp.ArtifactsIn[backupInfoKey] = artifact
arts, err := param.RenderArtifacts(bp.Actions[action].OutputArtifacts, *s.tp)
c.Assert(err, IsNil)
s.tp.ArtifactsIn = arts
}
}
}
Expand Down
78 changes: 78 additions & 0 deletions pkg/function/wait_for_snapshot_completion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package function

import (
"context"
"encoding/json"

"github.com/pkg/errors"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/blockstorage/awsebs"
"github.com/kanisterio/kanister/pkg/blockstorage/getter"
"github.com/kanisterio/kanister/pkg/param"
)

func init() {
kanister.Register(&waitForSnapshotCompletionFunc{})
}

var (
_ kanister.Func = (*waitForSnapshotCompletionFunc)(nil)
)

const (
WaitForSnapshotCompletionSnapshotsArg = "snapshots"
)

type waitForSnapshotCompletionFunc struct{}

func (*waitForSnapshotCompletionFunc) Name() string {
return "WaitForSnapshotCompletion"
}

func (*waitForSnapshotCompletionFunc) RequiredArgs() []string {
return []string{WaitForSnapshotCompletionSnapshotsArg}
}

func (kef *waitForSnapshotCompletionFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var snapshotinfo string
if err := Arg(args, WaitForSnapshotCompletionSnapshotsArg, &snapshotinfo); err != nil {
return nil, err
}
return nil, waitForSnapshotsCompletion(ctx, snapshotinfo, tp.Profile, getter.New())
}

func waitForSnapshotsCompletion(ctx context.Context, snapshotinfo string, profile *param.Profile, getter getter.Getter) error {
PVCData := []VolumeSnapshotInfo{}
err := json.Unmarshal([]byte(snapshotinfo), &PVCData)
if err != nil {
return errors.Wrapf(err, "Could not decode JSON data")
}
for _, pvcInfo := range PVCData {
config := make(map[string]string)
switch pvcInfo.Type {
case blockstorage.TypeEBS:
if err = ValidateProfile(profile); err != nil {
return errors.Wrap(err, "Profile validation failed")
}
config[awsebs.ConfigRegion] = pvcInfo.Region
config[awsebs.AccessKeyID] = profile.Credential.KeyPair.ID
config[awsebs.SecretAccessKey] = profile.Credential.KeyPair.Secret
default:
return errors.New("Storage provider not supported " + string(pvcInfo.Type))
}
provider, err := getter.Get(pvcInfo.Type, config)
if err != nil {
return errors.Wrapf(err, "Could not get storage provider %v", pvcInfo.Type)
}
snapshot, err := provider.SnapshotGet(ctx, pvcInfo.SnapshotID)
if err != nil {
return errors.Wrapf(err, "Failed to get Snapshot from Provider")
}
if err = provider.SnapshotCreateWaitForCompletion(ctx, snapshot); err != nil {
return errors.Wrap(err, "Snapshot creation did not complete "+snapshot.ID)
}
}
return nil
}
55 changes: 55 additions & 0 deletions pkg/function/wait_for_snapshot_completion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package function

import (
"context"
"encoding/json"

. "gopkg.in/check.v1"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/testutil/mockblockstorage"
)

type WaitForSnapshotCompletionTestSuite struct{}

var _ = Suite(&WaitForSnapshotCompletionTestSuite{})

func (s *WaitForSnapshotCompletionTestSuite) TestWait(c *C) {
ctx := context.Background()
mockGetter := mockblockstorage.NewGetter()
profile := &param.Profile{
Location: crv1alpha1.Location{
Type: crv1alpha1.LocationTypeS3Compliant,
S3Compliant: &crv1alpha1.S3CompliantLocation{
Region: "us-west-2"},
},
Credential: param.Credential{
Type: param.CredentialTypeKeyPair,
KeyPair: &param.KeyPair{
ID: "foo",
Secret: "bar",
},
},
}
pvcData1 := []VolumeSnapshotInfo{
VolumeSnapshotInfo{SnapshotID: "snap-1", Type: blockstorage.TypeEBS, Region: "us-west-2", PVCName: "pvc-1", Az: "us-west-2a", VolumeType: "ssd"},
VolumeSnapshotInfo{SnapshotID: "snap-2", Type: blockstorage.TypeEBS, Region: "us-west-2", PVCName: "pvc-2", Az: "us-west-2a", VolumeType: "ssd"},
}
info, err := json.Marshal(pvcData1)
c.Assert(err, IsNil)
snapinfo := string(info)
for _, tc := range []struct {
snapshotinfo string
check Checker
}{
{
snapshotinfo: snapinfo,
check: IsNil,
},
} {
err := waitForSnapshotsCompletion(ctx, tc.snapshotinfo, profile, mockGetter)
c.Assert(err, tc.check)
}
}
5 changes: 5 additions & 0 deletions pkg/testutil/mockblockstorage/mockblockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func (p *Provider) SnapshotCreate(ctx context.Context, volume blockstorage.Volum
return p.MockSnapshot(), nil
}

// SnapshotCreateWaitForCompletion mock
func (p *Provider) SnapshotCreateWaitForCompletion(context.Context, *blockstorage.Snapshot) error {
return nil
}

// SnapshotDelete mock
func (p *Provider) SnapshotDelete(ctx context.Context, snapshot *blockstorage.Snapshot) error {
p.AddDeletedSnapID(snapshot.ID)
Expand Down

0 comments on commit 6ba94b9

Please sign in to comment.