Skip to content

Commit

Permalink
Merge pull request #194 from kanisterio/sync
Browse files Browse the repository at this point in the history
EFS SnapshotLIst; DeleteDataAll; chronicle fixes
  • Loading branch information
tdmanv authored Aug 9, 2019
2 parents ed7af31 + b3123a2 commit 8b93bc2
Show file tree
Hide file tree
Showing 14 changed files with 304 additions and 46 deletions.
6 changes: 5 additions & 1 deletion build/local_kubernetes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ start_localkube() {
get_localkube
fi
kind create cluster --name ${LOCAL_CLUSTER_NAME} --image=kindest/node:${KUBE_VERSION}
cp $(kind get kubeconfig-path --name="kanister") ${KUBECONFIG}
if [ -e ${KUBECONFIG} ]; then
cp -fr ${KUBECONFIG} ${HOME}/.kube/config_bk
fi
KUBECONFIG=$(kind get kubeconfig-path --name="kanister")
export KUBECONFIG=${KUBECONFIG}:${HOME}/.kube/config_bk; kubectl config view --flatten > "${HOME}/.kube/config"
wait_for_nodes
wait_for_pods
}
Expand Down
49 changes: 48 additions & 1 deletion pkg/blockstorage/awsefs/awsefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,54 @@ func (e *efs) VolumesList(ctx context.Context, tags map[string]string, zone stri
}

func (e *efs) SnapshotsList(ctx context.Context, tags map[string]string) ([]*blockstorage.Snapshot, error) {
return nil, errors.New("Not implemented")
result := make([]*blockstorage.Snapshot, 0)
for resp, req := emptyResponseRequestForBackups(); resp.NextToken != nil; req.NextToken = resp.NextToken {
var err error
req.SetBackupVaultName(k10BackupVaultName)
resp, err = e.ListRecoveryPointsByBackupVaultWithContext(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "Failed to list recovery points by vault")
}
snaps, err := e.snapshotsFromRecoveryPoints(ctx, resp.RecoveryPoints)
if err != nil {
return nil, errors.Wrap(err, "Failed to get snapshots from recovery points")
}
result = append(result, filterSnapshotsWithTags(snaps, tags)...)
}
return result, nil
}

func (e *efs) snapshotsFromRecoveryPoints(ctx context.Context, rps []*backup.RecoveryPointByBackupVault) ([]*blockstorage.Snapshot, error) {
result := make([]*blockstorage.Snapshot, 0)
for _, rp := range rps {
if rp.RecoveryPointArn == nil {
return nil, errors.New("Empty ARN in recovery point")
}
tags, err := e.getBackupTags(ctx, *rp.RecoveryPointArn)
if err != nil {
return nil, errors.Wrap(err, "Failed to get backup tags")
}
volID, err := efsIDFromResourceARN(*rp.ResourceArn)
if err != nil {
return nil, errors.Wrap(err, "Failed to get volume ID from recovery point ARN")
}
vol, err := e.VolumeGet(ctx, volID, "")
if err != nil {
return nil, errors.Wrap(err, "Failed to get EFS volume")
}
snap, err := snapshotFromRecoveryPointByVault(rp, vol, tags, e.region)
if err != nil {
return nil, errors.Wrap(err, "Failed to get snapshot from the vault")
}
result = append(result, snap)
}
return result, nil
}

func emptyResponseRequestForBackups() (*backup.ListRecoveryPointsByBackupVaultOutput, *backup.ListRecoveryPointsByBackupVaultInput) {
resp := (&backup.ListRecoveryPointsByBackupVaultOutput{}).SetNextToken(dummyMarker)
req := &backup.ListRecoveryPointsByBackupVaultInput{}
return resp, req
}

func emptyResponseRequestForFilesystems() (*awsefs.DescribeFileSystemsOutput, *awsefs.DescribeFileSystemsInput) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/blockstorage/awsefs/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func efsIDFromResourceARN(arn string) (string, error) {
}

func snapshotFromRecoveryPoint(rp *backup.DescribeRecoveryPointOutput, volume *blockstorage.Volume, region string) (*blockstorage.Snapshot, error) {
if rp == nil {
return nil, errors.New("Empty recovery point")
}
if rp.CreationDate == nil {
return nil, errors.New("Recovery point has no CreationDate")
}
Expand All @@ -74,6 +77,34 @@ func snapshotFromRecoveryPoint(rp *backup.DescribeRecoveryPointOutput, volume *b
}, nil
}

func snapshotFromRecoveryPointByVault(rp *backup.RecoveryPointByBackupVault, volume *blockstorage.Volume, tags map[string]string, region string) (*blockstorage.Snapshot, error) {
if rp == nil {
return nil, errors.New("Empty recovery point")
}
if rp.CreationDate == nil {
return nil, errors.New("Recovery point has not CreationDate")
}
if rp.BackupSizeInBytes == nil {
return nil, errors.New("Recovery point has no BackupSizeInBytes")
}
if rp.RecoveryPointArn == nil {
return nil, errors.New("Recovery point has no RecoveryPointArn")
}
if volume == nil {
return nil, errors.New("Nil volume as argument")
}
return &blockstorage.Snapshot{
ID: *rp.RecoveryPointArn,
CreationTime: blockstorage.TimeStamp(*rp.CreationDate),
Size: bytesInGiB(*rp.BackupSizeInBytes),
Region: region,
Type: blockstorage.TypeEFS,
Volume: volume,
Encrypted: volume.Encrypted,
Tags: blockstorage.MapToKeyValue(tags),
}, nil
}

// convertFromBackupTags converts an AWS Backup compliant tag structure to a flattenned map.
func convertFromBackupTags(tags map[string]*string) map[string]string {
result := make(map[string]string)
Expand Down
11 changes: 11 additions & 0 deletions pkg/blockstorage/awsefs/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package awsefs
import (
awsefs "github.com/aws/aws-sdk-go/service/efs"

"github.com/kanisterio/kanister/pkg/blockstorage"
kantags "github.com/kanisterio/kanister/pkg/blockstorage/tags"
)

Expand All @@ -16,6 +17,16 @@ func filterAvailable(descriptions []*awsefs.FileSystemDescription) []*awsefs.Fil
return result
}

func filterSnapshotsWithTags(snapshots []*blockstorage.Snapshot, tags map[string]string) []*blockstorage.Snapshot {
result := make([]*blockstorage.Snapshot, 0)
for i, snap := range snapshots {
if kantags.IsSubset(blockstorage.KeyValueToMap(snap.Tags), tags) {
result = append(result, snapshots[i])
}
}
return result
}

func filterWithTags(descriptions []*awsefs.FileSystemDescription, tags map[string]string) []*awsefs.FileSystemDescription {
result := make([]*awsefs.FileSystemDescription, 0)
for i, desc := range descriptions {
Expand Down
19 changes: 11 additions & 8 deletions pkg/chronicle/chronicle_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (p PushParams) Validate() error {
}

func Push(p PushParams) error {
log.Infof("%#v", p)
log.Debugf("%#v", p)
ctx := setupSignalHandler(context.Background())
var i int
for {
Expand Down Expand Up @@ -74,28 +74,31 @@ func push(ctx context.Context, p PushParams, ord int) error {
// Read profile.
prof, ok, err := readProfile(p.ProfilePath)
if !ok || err != nil {
return errors.Wrap(err, "")
return err
}

// Get envdir values if set.
var env []string
if p.EnvDir != "" {
var err error
envdir.EnvDir(p.EnvDir)
env, err = envdir.EnvDir(p.EnvDir)
if err != nil {
return err
}
}
log.Debugf("Pushing output from Command %d: %v. Environment: %v", ord, p.Command, env)
return pushWithEnv(ctx, p.Command, p.ArtifactPath, ord, prof, env)
}

func pushWithEnv(ctx context.Context, c []string, suffix string, ord int, prof param.Profile, env []string) error {
// Chronicle command w/ piped output.
cmd := exec.CommandContext(ctx, "sh", "-c", strings.Join(p.Command, " "))
cmd := exec.CommandContext(ctx, "sh", "-c", strings.Join(c, " "))
cmd.Env = append(cmd.Env, env...)
out, err := cmd.StdoutPipe()
if err != nil {
return errors.Wrap(err, "Failed to open command pipe")
}
cmd.Stderr = os.Stderr
cur := fmt.Sprintf("%s-%d", p.ArtifactPath, ord)
cur := fmt.Sprintf("%s-%d", suffix, ord)
// Write data to object store
if err := cmd.Start(); err != nil {
return errors.Wrap(err, "Failed to start chronicle pipe command")
Expand All @@ -109,11 +112,11 @@ func push(ctx context.Context, p PushParams, ord int) error {

// Write manifest pointing to new data
man := strings.NewReader(cur)
if err := location.Write(ctx, man, prof, p.ArtifactPath); err != nil {
if err := location.Write(ctx, man, prof, suffix); err != nil {
return errors.Wrap(err, "Failed to write command output to object storage")
}
// Delete old data
prev := fmt.Sprintf("%s-%d", p.ArtifactPath, ord-1)
prev := fmt.Sprintf("%s-%d", suffix, ord-1)
location.Delete(ctx, prof, prev)
return nil
}
Expand Down
37 changes: 30 additions & 7 deletions pkg/chronicle/chronicle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,32 @@ import (

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/objectstore"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/testutil"
)

// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }

type ChronicleSuite struct{}
type ChronicleSuite struct {
profile param.Profile
}

var _ = Suite(&ChronicleSuite{})

func (s *ChronicleSuite) TestPushPull(c *C) {
func (s *ChronicleSuite) SetUpSuite(c *C) {
osType := objectstore.ProviderTypeS3
loc := crv1alpha1.Location{
Type: crv1alpha1.LocationTypeS3Compliant,
Region: testutil.TestS3Region,
Bucket: testutil.TestS3BucketName,
}
prof := *testutil.ObjectStoreProfileOrSkip(c, osType, loc)
s.profile = *testutil.ObjectStoreProfileOrSkip(c, osType, loc)
}

func (s *ChronicleSuite) TestPushPull(c *C) {
pp := filepath.Join(c.MkDir(), "profile.json")
err := writeProfile(pp, prof)
err := writeProfile(pp, s.profile)
c.Assert(err, IsNil)

p := PushParams{
Expand All @@ -50,12 +56,29 @@ func (s *ChronicleSuite) TestPushPull(c *C) {

// Pull and check that we still get i
buf := bytes.NewBuffer(nil)
err = Pull(ctx, buf, prof, p.ArtifactPath)
err = Pull(ctx, buf, s.profile, p.ArtifactPath)
c.Assert(err, IsNil)
s, err := ioutil.ReadAll(buf)
str, err := ioutil.ReadAll(buf)
c.Assert(err, IsNil)
// Remove additional '\n'
t := strings.TrimSuffix(string(s), "\n")
t := strings.TrimSuffix(string(str), "\n")
c.Assert(t, Equals, strconv.Itoa(i))
}
}

func (s *ChronicleSuite) TestEnv(c *C) {
ctx := context.Background()
cmd := []string{"echo", "X:", "$X"}
suffix := c.TestName() + rand.String(5)
env := []string{"X=foo"}

err := pushWithEnv(ctx, cmd, suffix, 0, s.profile, env)
c.Assert(err, IsNil)
buf := bytes.NewBuffer(nil)
err = Pull(ctx, buf, s.profile, suffix)
c.Assert(err, IsNil)
str, err := ioutil.ReadAll(buf)
c.Assert(err, IsNil)
t := strings.TrimSuffix(string(str), "\n")
c.Assert(t, Equals, "X: foo")
}
2 changes: 1 addition & 1 deletion pkg/envdir/envdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func EnvDir(dir string) ([]string, error) {
}
e := make([]string, 0, len(fis))
for _, fi := range fis {
if fi.IsDir() {
if fi.IsDir() || fi.Mode()&os.ModeSymlink == os.ModeSymlink {
continue
}
p := filepath.Join(dir, fi.Name())
Expand Down
31 changes: 27 additions & 4 deletions pkg/function/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ func newRestoreDataAllBlueprint() *crv1alpha1.Blueprint {
Actions: map[string]*crv1alpha1.BlueprintAction{
"restore": &crv1alpha1.BlueprintAction{
Kind: param.StatefulSetKind,
SecretNames: []string{
"backupKey",
},
Phases: []crv1alpha1.BlueprintPhase{
crv1alpha1.BlueprintPhase{
Name: "testRestoreDataAll",
Expand All @@ -216,6 +213,28 @@ func newRestoreDataAllBlueprint() *crv1alpha1.Blueprint {
}
}

func newDeleteDataAllBlueprint() *crv1alpha1.Blueprint {
return &crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"delete": &crv1alpha1.BlueprintAction{
Kind: param.StatefulSetKind,
Phases: []crv1alpha1.BlueprintPhase{
crv1alpha1.BlueprintPhase{
Name: "testDelete",
Func: "DeleteDataAll",
Args: map[string]interface{}{
DeleteDataAllNamespaceArg: "{{ .StatefulSet.Namespace }}",
DeleteDataAllBackupArtifactPrefixArg: "{{ .Profile.Location.Bucket }}/{{ .Profile.Location.Prefix }}",
DeleteDataAllBackupInfo: fmt.Sprintf("{{ .Options.%s }}", BackupDataAllOutput),
DeleteDataAllReclaimSpace: true,
},
},
},
},
},
}
}

func (s *DataSuite) getTemplateParamsAndPVCName(c *C, replicas int32) (*param.TemplateParams, []string) {
ctx := context.Background()
ss, err := s.cli.AppsV1().StatefulSets(s.namespace).Create(testutil.NewTestStatefulSet(replicas))
Expand Down Expand Up @@ -317,7 +336,7 @@ func (s *DataSuite) TestBackupRestoreDataWithSnapshotID(c *C) {
}
}

func (s *DataSuite) TestBackupDataAll(c *C) {
func (s *DataSuite) TestBackupRestoreDeleteDataAll(c *C) {
var replicas int32
replicas = 2
tp, pvcs := s.getTemplateParamsAndPVCName(c, replicas)
Expand All @@ -343,6 +362,10 @@ func (s *DataSuite) TestBackupDataAll(c *C) {
bp = *newRestoreDataAllBlueprint()
_ = runAction(c, bp, "restore", tp)

// Test delete
bp = *newDeleteDataAllBlueprint()
_ = runAction(c, bp, "delete", tp)

}

func newCopyDataTestBlueprint() crv1alpha1.Blueprint {
Expand Down
Loading

0 comments on commit 8b93bc2

Please sign in to comment.