Skip to content

Commit

Permalink
Update DeleteData func to remove aws cli command and use S3 Object St…
Browse files Browse the repository at this point in the history
…ore (#5068)

* Update DeleteData func to remove aws cli

* Update for recursive delete

* Fix CI error and update Delete funcs

* Fix CI error

* address review comments

* Address minor comments

Adds missing `>` in documentation.
Reorders functions

* Add unit test for DeleteAllWithPrefix
  • Loading branch information
SupriyaKasten authored and Ilya Kislenko committed Mar 8, 2019
1 parent e70f7ef commit 6a8b702
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 33 deletions.
26 changes: 3 additions & 23 deletions pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package function

import (
"context"
"fmt"
"strings"

"github.com/pkg/errors"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
)

Expand All @@ -30,23 +29,6 @@ func (*deleteDataFunc) Name() string {
return "DeleteData"
}

func generateDeleteCommand(artifact string, profile *param.Profile) []string {
// Command to export credentials
cmd := []string{"export", fmt.Sprintf("AWS_SECRET_ACCESS_KEY=%s\n", profile.Credential.KeyPair.Secret)}
cmd = append(cmd, "export", fmt.Sprintf("AWS_ACCESS_KEY_ID=%s\n", profile.Credential.KeyPair.ID))
// Command to delete from the object store
cmd = append(cmd, "aws")
if profile.Location.Endpoint != "" {
cmd = append(cmd, "--endpoint", profile.Location.Endpoint)
}
if profile.SkipSSLVerify {
cmd = append(cmd, "--no-verify-ssl")
}
cmd = append(cmd, "s3", "rm", artifact)
command := strings.Join(cmd, " ")
return []string{"bash", "-o", "errexit", "-o", "pipefail", "-c", command}
}

func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
var artifact, namespace string
var err error
Expand All @@ -60,10 +42,8 @@ func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
if err = validateProfile(tp.Profile); err != nil {
return nil, errors.Wrapf(err, "Failed to validate Profile")
}
// Generate delete command
cmd := generateDeleteCommand(artifact, tp.Profile)
// Use KubeTask to delete the artifact
return kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.16.0", cmd)

return nil, location.Delete(ctx, *tp.Profile, artifact)
}

func (*deleteDataFunc) RequiredArgs() []string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/location/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func deleteData(ctx context.Context, pType objectstore.ProviderType, profile par
if err != nil {
return err
}
return bucket.Delete(ctx, path)
return bucket.DeleteAllWithPrefix(ctx, path)
}

func getProviderType(lType crv1alpha1.LocationType) (objectstore.ProviderType, error) {
Expand Down
22 changes: 14 additions & 8 deletions pkg/objectstore/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,32 @@ func (d *directory) ListObjects(ctx context.Context) ([]string, error) {
}

// DeleteDirectory deletes all objects that have d.path as the prefix
// <bucket>/<d.path/<everything> including <bucket>/<d.path>/<some dir>/<objects>
// <bucket>/<d.path>/<everything> including <bucket>/<d.path>/<some dir>/<objects>
func (d *directory) DeleteDirectory(ctx context.Context) error {
if d.path == "" {
return errors.New("invalid entry")
}
return deleteWithPrefix(ctx, d.bucket.container, cloudName(d.path))
}

// Walk to find all entries that match the d.path prefix.
err := stow.Walk(d.bucket.container, cloudName(d.path), 10000,
// DeleteDirectory deletes all objects that have d.path/dir as the prefix
// <bucket>/<d.path>/dir/<everything> including <bucket>/<d.path>/dir/<some dir>/<objects>
func (d *directory) DeleteAllWithPrefix(ctx context.Context, prefix string) error {
p := cloudName(filepath.Join(d.path, prefix))
return deleteWithPrefix(ctx, d.bucket.container, p)
}

func deleteWithPrefix(ctx context.Context, c stow.Container, prefix string) error {
err := stow.Walk(c, prefix, 10000,
func(item stow.Item, err error) error {
if err != nil {
return err
}

return d.bucket.container.RemoveItem(item.Name())
return c.RemoveItem(item.Name())
})

if err != nil {
return err
return errors.Wrapf(err, "Failed to delete item %s", prefix)
}

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/objectstore/objectstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Directory interface {
// DeleteDirectory deletes the current directory
DeleteDirectory(context.Context) error

// DeleteAllWithPrefix deletes all directorys and objects with a provided prefix
DeleteAllWithPrefix(context.Context, string) error

// ListDirectories lists all the directories rooted in
// the current directory and their handle
ListDirectories(context.Context) (map[string]Directory, error)
Expand Down
35 changes: 34 additions & 1 deletion pkg/objectstore/objectstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,46 @@ func (s *ObjectStoreProviderSuite) TestDirectories(c *C) {

directory, err = rootDirectory.GetDirectory(ctx, dir1)
c.Check(err, IsNil)

// Delete everything by deleting the parent directory
err = directory.DeleteDirectory(ctx)
c.Check(err, IsNil)
checkNoItemsWithPrefix(c, cont, dir1)
}

func (s *ObjectStoreProviderSuite) TestDeleteAllWithPrefix(c *C) {
ctx := context.Background()
rootDirectory, err := s.root.CreateDirectory(ctx, s.testDir)
c.Assert(err, IsNil)
c.Assert(rootDirectory, NotNil)
const (
dir1 = "directory1"
dir2 = "directory2"
dir3 = "directory3"
)

directory, err := rootDirectory.CreateDirectory(ctx, dir1)
c.Assert(err, IsNil)

// Expecting /dir1/dir2
_, err = directory.CreateDirectory(ctx, dir2)
c.Assert(err, IsNil)

// Expecting root dir to have /dir1/dir2 and /dir3
_, err = rootDirectory.CreateDirectory(ctx, dir3)
c.Assert(err, IsNil)

// Delete everything with prefix "dir1"
err = rootDirectory.DeleteAllWithPrefix(ctx, dir1)
c.Assert(err, IsNil)

// Expecting root dir to have /dir3
directories, err := rootDirectory.ListDirectories(ctx)
c.Check(err, IsNil)
c.Check(directories, HasLen, 1)
_, ok := directories[dir3]
c.Check(ok, Equals, true)
}

// TestObjects verifies object operations: GetBytes and PutBytes
func (s *ObjectStoreProviderSuite) TestObjects(c *C) {
ctx := context.Background()
Expand Down

0 comments on commit 6a8b702

Please sign in to comment.