Skip to content

Commit

Permalink
Merge pull request #146 from kanisterio/sync
Browse files Browse the repository at this point in the history
Improve volume region handling;  Split SnapshotCreate into Create/Wait methods
  • Loading branch information
pavannd1 committed Jan 2, 2019
2 parents 66782f3 + ba7fc7d commit f803628
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 40 deletions.
148 changes: 148 additions & 0 deletions docs/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,154 @@ Example:
namespace: "{{ .Deployment.Namespace }}"
artifact: s3://bucket/path/artifact
.. _createvolumesnapshot:

CreateVolumeSnapshot
--------------------

This function is used to create snapshots of one or more PVCs
associated with an application. It takes individual snapshot
of each PVC which can be then restored later. It generates an
output that contains the Snapshot info required for restoring PVCs.

.. note::
Currently we only support PVC snapshots on AWS EBS. Support for more storage
providers is coming soon!

Arguments:

.. csv-table::
:header: "Argument", "Required", "Type", "Description"
:align: left
:widths: 5,5,5,15

`namespace`, Yes, `string`, namespace in which to execute
`pvcs`, No, `[]string`, list of names of PVCs to be backed up
`skipWait`, No, `bool`, initiate but do not wait for the snapshot operation to complete

When no PVCs are specified in the `pvcs` argument above, all PVCs in use by a
Deployment or StatefulSet will be backed up.

Outputs:

.. csv-table::
:header: "Output", "Type", "Description"
:align: left
:widths: 5,5,15

`volumeSnapshotInfo`,`string`, Snapshot info required while restoring the PVCs

Example:

Consider a scenario where you wish to backup all PVCs of a deployment. The output
of this phase is saved to an Artifact named `backupInfo`, shown below:

.. code-block:: yaml
:linenos:
actions:
backup:
type: Deployment
outputArtifacts:
backupInfo:
keyValue:
manifest: "{{ .Phases.backupVolume.Output.volumeSnapshotInfo }}"
phases:
- func: CreateVolumeSnapshot
name: backupVolume
args:
namespace: "{{ .Deployment.Namespace }}"
WaitForSnapshotCompletion
-------------------------

This function is used to wait for completion of snapshot operations
initiated using the :ref:`createvolumesnapshot` function.

Arguments:

.. csv-table::
:header: "Argument", "Required", "Type", "Description"
:align: left
:widths: 5,5,5,15

`snapshots`, Yes, `string`, snapshot info generated as output in CreateVolumeSnapshot function

CreateVolumeFromSnapshot
------------------------

This function is used to restore one or more PVCs of an application from the snapshots
taken using the :ref:`createvolumesnapshot` function. It deletes old PVCs,
if present and creates new PVCs from the snapshots taken earlier.

Arguments:

.. csv-table::
:header: "Argument", "Required", "Type", "Description"
:align: left
:widths: 5,5,5,20

`namespace`, Yes, `string`, namespace in which to execute
`snapshots`, Yes, `string`, snapshot info generated as output in CreateVolumeSnapshot function

Example:

Consider a scenario where you wish to restore all PVCs of a deployment.
We will first scale down the application, restore PVCs and then scale up.
For this phase, we will make use of the backupInfo Artifact provided by
the :ref:`createvolumesnapshot` function.

.. code-block:: yaml
:linenos:
- func: ScaleWorkload
name: shutdownPod
args:
namespace: "{{ .Deployment.Namespace }}"
name: "{{ .Deployment.Name }}"
kind: Deployment
replicas: 0
- func: CreateVolumeFromSnapshot
name: restoreVolume
args:
namespace: "{{ .Deployment.Namespace }}"
snapshots: "{{ .ArtifactsIn.backupInfo.KeyValue.manifest }}"
- func: ScaleWorkload
name: bringupPod
args:
namespace: "{{ .Deployment.Namespace }}"
name: "{{ .Deployment.Name }}"
kind: Deployment
replicas: 1
DeleteVolumeSnapshot
--------------------

This function is used to delete snapshots of PVCs taken using the
:ref:`createvolumesnapshot` function.

Arguments:

.. csv-table::
:header: "Argument", "Required", "Type", "Description"
:align: left
:widths: 5,5,5,20

`namespace`, Yes, `string`, namespace in which to execute
`snapshots`, Yes, `string`, snapshot info generated as output in CreateVolumeSnapshot function

Example:

.. code-block:: yaml
:linenos:
- func: DeleteVolumeSnapshot
name: deleteVolumeSnapshot
args:
namespace: "{{ .Deployment.Namespace }}"
snapshots: "{{ .ArtifactsIn.backupInfo.KeyValue.manifest }}"
Registering Functions
---------------------

Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ PersistentVolumeClaim
actionset
objectstore
Elasticsearch
backupInfo
62 changes: 38 additions & 24 deletions pkg/blockstorage/awsebs/awsebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/jpillora/backoff"
Expand Down Expand Up @@ -272,47 +273,43 @@ func (s *ebsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna
func (s *ebsStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
// Snapshot the EBS volume
csi := (&ec2.CreateSnapshotInput{}).SetVolumeId(volume.ID)
var snapID string
alltags := ktags.GetTags(tags)
csi.SetTagSpecifications([]*ec2.TagSpecification{
&ec2.TagSpecification{
ResourceType: aws.String(ec2.ResourceTypeSnapshot),
Tags: mapToEC2Tags(ktags.GetTags(tags)),
},
})
log.Infof("Snapshotting EBS volume: %s", *csi.VolumeId)
csi.SetDryRun(s.ec2Cli.DryRun)
snap, err := s.ec2Cli.CreateSnapshotWithContext(ctx, csi)
if isDryRunErr(err) {
snapID = ""
} else {
if err != nil {
return nil, errors.Wrapf(err, "Failed to create snapshot, volume_id: %s", *csi.VolumeId)
}
if err = setResourceTags(ctx, s.ec2Cli, aws.StringValue(snap.SnapshotId), alltags); err != nil {
return nil, err
}
err = waitOnSnapshot(ctx, s.ec2Cli, snap)
if err != nil {
return nil, errors.Wrapf(err, "Waiting on snapshot %v", snap)
}
snapID = aws.StringValue(snap.SnapshotId)
}

snaps, err := getSnapshots(ctx, s.ec2Cli, []*string{&snapID})
if err != nil {
return nil, err
if err != nil && !isDryRunErr(err) {
return nil, errors.Wrapf(err, "Failed to create snapshot, volume_id: %s", *csi.VolumeId)
}

ebssnap := snaps[0]
region, err := availabilityZoneToRegion(ctx, s.ec2Cli, volume.Az)
if err != nil {
return nil, err
}

ms := s.snapshotParse(ctx, ebssnap)
ms := s.snapshotParse(ctx, snap)
ms.Region = region
for _, tag := range ebssnap.Tags {
for _, tag := range snap.Tags {
ms.Tags = append(ms.Tags, &blockstorage.KeyValue{Key: aws.StringValue(tag.Key), Value: aws.StringValue(tag.Value)})
}
ms.Volume = &volume
return ms, nil
}

func (s *ebsStorage) SnapshotCreateWaitForCompletion(ctx context.Context, snap *blockstorage.Snapshot) error {
if s.ec2Cli.DryRun {
return nil
}
if err := waitOnSnapshotID(ctx, s.ec2Cli, snap.ID); err != nil {
return errors.Wrapf(err, "Waiting on snapshot %v", snap)
}
return nil
}

func (s *ebsStorage) SnapshotDelete(ctx context.Context, snapshot *blockstorage.Snapshot) error {
log.Infof("EBS Snapshot ID %s", snapshot.ID)
rmsi := &ec2.DeleteSnapshotInput{}
Expand Down Expand Up @@ -543,3 +540,20 @@ func waitOnSnapshotID(ctx context.Context, ec2Cli *EC2, snapID string) error {
return false, nil
})
}

// GetRegionFromEC2Metadata retrieves the region from the EC2 metadata service.
// Only works when the call is performed from inside AWS
func GetRegionFromEC2Metadata() (string, error) {
log.Debug("Retrieving region from metadata")
conf := aws.Config{
HTTPClient: &http.Client{
Transport: http.DefaultTransport,
Timeout: 2 * time.Second,
},
MaxRetries: aws.Int(1),
}
ec2MetaData := ec2metadata.New(session.Must(session.NewSession()), &conf)

awsRegion, err := ec2MetaData.Region()
return awsRegion, errors.Wrap(err, "Failed to get AWS Region")
}
1 change: 1 addition & 0 deletions pkg/blockstorage/blockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Provider interface {
// Snapshot operations
SnapshotCopy(ctx context.Context, from Snapshot, to Snapshot) (*Snapshot, error)
SnapshotCreate(ctx context.Context, volume Volume, tags map[string]string) (*Snapshot, error)
SnapshotCreateWaitForCompletion(context.Context, *Snapshot) error
SnapshotDelete(context.Context, *Snapshot) error
SnapshotGet(ctx context.Context, id string) (*Snapshot, error)
// Others
Expand Down
1 change: 1 addition & 0 deletions pkg/blockstorage/blockstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (s *BlockStorageProviderSuite) createSnapshot(c *C) *blockstorage.Snapshot
c.Assert(err, IsNil)
s.snapshots = append(s.snapshots, ret)
s.checkTagsExist(c, blockstorage.KeyValueToMap(ret.Tags), tags)
c.Assert(s.provider.SnapshotCreateWaitForCompletion(context.Background(), ret), IsNil)
return ret
}

Expand Down
30 changes: 24 additions & 6 deletions pkg/function/create_volume_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
const (
CreateVolumeSnapshotNamespaceArg = "namespace"
CreateVolumeSnapshotPVCsArg = "pvcs"
CreateVolumeSnapshotSkipWaitArg = "skipWait"
)

type createVolumeSnapshotFunc struct{}
Expand Down Expand Up @@ -83,7 +84,7 @@ func ValidateProfile(profile *param.Profile) error {
return nil
}

func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kubernetes.Interface, namespace string, pvcs []string, getter getter.Getter) (map[string]interface{}, error) {
func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kubernetes.Interface, namespace string, pvcs []string, getter getter.Getter, skipWait bool) (map[string]interface{}, error) {
vols := make([]volumeInfo, 0, len(pvcs))
for _, pvc := range pvcs {
volInfo, err := getPVCInfo(ctx, cli, namespace, pvc, tp, getter)
Expand All @@ -100,7 +101,7 @@ func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kube
wg.Add(1)
go func(volInfo volumeInfo) {
defer wg.Done()
volSnapInfo, err := snapshotVolume(ctx, volInfo, namespace)
volSnapInfo, err := snapshotVolume(ctx, volInfo, namespace, skipWait)
if err != nil {
errstrings = append(errstrings, err.Error())
} else {
Expand All @@ -124,7 +125,7 @@ func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kube
return map[string]interface{}{"volumeSnapshotInfo": string(manifestData)}, nil
}

func snapshotVolume(ctx context.Context, volume volumeInfo, namespace string) (*VolumeSnapshotInfo, error) {
func snapshotVolume(ctx context.Context, volume volumeInfo, namespace string, skipWait bool) (*VolumeSnapshotInfo, error) {
provider := volume.provider
vol, err := provider.VolumeGet(ctx, volume.volumeID, volume.volZone)
if err != nil {
Expand All @@ -145,6 +146,11 @@ func snapshotVolume(ctx context.Context, volume volumeInfo, namespace string) (*
if err != nil {
return nil, err
}
if !skipWait {
if err := provider.SnapshotCreateWaitForCompletion(ctx, snap); err != nil {
return nil, errors.Wrap(err, "Snapshot creation did not complete")
}
}
return &VolumeSnapshotInfo{SnapshotID: snap.ID, Type: volume.sType, Region: volume.region, PVCName: volume.pvc, Az: snap.Volume.Az, Tags: snap.Volume.Tags, VolumeType: snap.Volume.VolumeType}, nil
}

Expand Down Expand Up @@ -176,7 +182,15 @@ func getPVCInfo(ctx context.Context, kubeCli kubernetes.Interface, namespace str
if err = ValidateProfile(tp.Profile); err != nil {
return nil, errors.Wrap(err, "Profile validation failed")
}
region = tp.Profile.Location.S3Compliant.Region
// Get Region from PV label or EC2 metadata
if pvRegion, ok := pvLabels[kube.PVRegionLabelName]; ok {
region = pvRegion
} else {
region, err = awsebs.GetRegionFromEC2Metadata()
if err != nil {
return nil, err
}
}
if pvZone, ok := pvLabels[kube.PVZoneLabelName]; ok {
config[awsebs.ConfigRegion] = region
config[awsebs.AccessKeyID] = tp.Profile.Credential.KeyPair.ID
Expand All @@ -189,7 +203,7 @@ func getPVCInfo(ctx context.Context, kubeCli kubernetes.Interface, namespace str
}
return nil, errors.Errorf("PV zone label is empty, pvName: %s, namespace: %s", pvName, namespace)
}
return nil, errors.New("Storage type not supported!")
return nil, errors.New("Storage type not supported")
}

func getPVCList(tp param.TemplateParams) ([]string, error) {
Expand Down Expand Up @@ -221,20 +235,24 @@ func (kef *createVolumeSnapshotFunc) Exec(ctx context.Context, tp param.Template
}
var namespace string
var pvcs []string
var skipWait bool
if err = Arg(args, CreateVolumeSnapshotNamespaceArg, &namespace); err != nil {
return nil, err
}
if err = OptArg(args, CreateVolumeSnapshotPVCsArg, &pvcs, nil); err != nil {
return nil, err
}
if err = OptArg(args, CreateVolumeSnapshotSkipWaitArg, &skipWait, nil); err != nil {
return nil, err
}
if len(pvcs) == 0 {
// Fetch Volumes
pvcs, err = getPVCList(tp)
if err != nil {
return nil, err
}
}
return createVolumeSnapshot(ctx, tp, cli, namespace, pvcs, getter.New())
return createVolumeSnapshot(ctx, tp, cli, namespace, pvcs, getter.New(), skipWait)
}

func (*createVolumeSnapshotFunc) RequiredArgs() []string {
Expand Down
3 changes: 2 additions & 1 deletion pkg/function/create_volume_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (s *CreateVolumeSnapshotTestSuite) TestGetPVCInfo(c *C) {
ObjectMeta: metav1.ObjectMeta{
Name: "pv-test-1",
Labels: map[string]string{
kube.PVZoneLabelName: "us-west-2a",
kube.PVZoneLabelName: "us-west-2a",
kube.PVRegionLabelName: "us-west-2",
},
},
Spec: v1.PersistentVolumeSpec{
Expand Down
Loading

0 comments on commit f803628

Please sign in to comment.