Skip to content

Commit

Permalink
feat(plugin): implement DeleteObjects functionality along with associ…
Browse files Browse the repository at this point in the history
…ated automated tests (#14)
  • Loading branch information
kheina authored Apr 4, 2024
1 parent 4ffe4d2 commit 09cece6
Show file tree
Hide file tree
Showing 2 changed files with 367 additions and 2 deletions.
80 changes: 79 additions & 1 deletion plugin/service/storage/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"os"
"path"
"strings"

"github.com/google/uuid"
"github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/storagebuckets"
Expand Down Expand Up @@ -410,7 +411,84 @@ func (sp *StoragePlugin) PutObject(ctx context.Context, req *pb.PutObjectRequest
// DeleteObjects deletes one or many files in an external object store via a
// provided key prefix.
func (sp *StoragePlugin) DeleteObjects(ctx context.Context, req *pb.DeleteObjectsRequest) (*pb.DeleteObjectsResponse, error) {
return nil, fmt.Errorf("unimplemented")
bucket := req.GetBucket()
if bucket == nil {
return nil, status.Error(codes.InvalidArgument, "no storage bucket information found")
}
if bucket.GetBucketName() == "" {
return nil, status.Error(codes.InvalidArgument, "storage bucket name is required")
}

if req.GetKeyPrefix() == "" {
return nil, status.Error(codes.InvalidArgument, "key prefix is required")
}

sa, err := getStorageAttributes(bucket.GetAttributes())
if err != nil {
return nil, err
}

sec, err := getStorageSecrets(bucket.GetSecrets())
if err != nil {
return nil, err
}

cl, err := minio.New(sa.EndpointUrl, &minio.Options{
Creds: credentials.NewStaticV4(sec.AccessKeyId, sec.SecretAccessKey, ""),
Secure: sa.UseSSL,
Region: sa.Region,
})
if err != nil {
return nil, status.Errorf(codes.Unknown, "failed to create minio sdk client: %v", err)
}

prefix := path.Join(bucket.GetBucketPrefix(), req.GetKeyPrefix())
if strings.HasSuffix(req.GetKeyPrefix(), "/") {
// path.Join ends by "cleaning" the path, including removing a trailing slash, if
// it exists. given that a slash is used to denote a folder, it is required here.
prefix += "/"
}

if !req.Recursive {
if err := cl.RemoveObject(ctx, bucket.GetBucketName(), prefix, minio.RemoveObjectOptions{}); err != nil {
return nil, status.Errorf(codes.Unknown, "error deleting minio object: %v", err)
}
return &pb.DeleteObjectsResponse{
ObjectsDeleted: uint32(1),
}, nil
}

objects := []minio.ObjectInfo{}
for obj := range cl.ListObjects(ctx, bucket.GetBucketName(), minio.ListObjectsOptions{
Prefix: prefix,
Recursive: true,
}) {
if obj.Err != nil {
return nil, status.Errorf(codes.Unknown, "error iterating minio bucket contents: %v", err)
}
objects = append(objects, obj)
}

emitter := make(chan minio.ObjectInfo)
go func() {
defer close(emitter)
// we could loop the listobjects respose here, but we want to check all the errs ahead of time
for _, obj := range objects {
emitter <- obj
}
}()

removed := 0
for res := range cl.RemoveObjectsWithResult(ctx, bucket.GetBucketName(), emitter, minio.RemoveObjectsOptions{}) {
if res.Err != nil {
return nil, status.Errorf(codes.Unknown, "error deleting minio object(s): %v", err)
}
removed++
}

return &pb.DeleteObjectsResponse{
ObjectsDeleted: uint32(removed),
}, nil
}

func dryRun(ctx context.Context, cl *minio.Client, bucket *storagebuckets.StorageBucket) error {
Expand Down
289 changes: 288 additions & 1 deletion plugin/service/storage/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,7 +1720,294 @@ func TestPutObject(t *testing.T) {
}
}

func TestDeleteObjects(t *testing.T) {}
func TestDeleteObjects(t *testing.T) {
ctx := context.Background()
server := internaltest.NewMinioServer(t)
bucketName := "test-bucket"
require.NoError(t, server.Client.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{}))

cases := []struct {
name string
req *plugin.DeleteObjectsRequest
setup func(*minio.Client, *plugin.DeleteObjectsRequest) error
expected uint32
err string
errCode codes.Code
}{
{
name: "nilRequest",
err: "no storage bucket information found",
errCode: codes.InvalidArgument,
},
{
name: "nilBucket",
req: &plugin.DeleteObjectsRequest{Bucket: nil},
err: "no storage bucket information found",
errCode: codes.InvalidArgument,
},
{
name: "emptyBucket",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{},
},
err: "storage bucket name is required",
errCode: codes.InvalidArgument,
},
{
name: "emptyObjectKey",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
},
KeyPrefix: "",
},
err: "key prefix is required",
errCode: codes.InvalidArgument,
},
{
name: "badStorageAttributes",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
Attributes: &structpb.Struct{
Fields: map[string]*structpb.Value{
"endpoint_url": structpb.NewStringValue("foo"),
},
},
},
KeyPrefix: "obj/path",
},
err: "attributes.endpoint_url.format: unknown protocol, should be http:// or https://",
errCode: codes.InvalidArgument,
},
{
name: "badStorageSecrets",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
Attributes: &structpb.Struct{
Fields: map[string]*structpb.Value{
"endpoint_url": structpb.NewStringValue("http://foo"),
},
},
Secrets: &structpb.Struct{
Fields: map[string]*structpb.Value{
"access_key_id": structpb.NewStringValue("foo"),
},
},
},
KeyPrefix: "obj/path",
},
err: "secrets.secret_access_key: missing required value \"secret_access_key\"",
errCode: codes.InvalidArgument,
},
{
name: "success",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
Attributes: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstEndpointUrl: structpb.NewStringValue("http://" + server.ApiAddr),
},
},
Secrets: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstAccessKeyId: structpb.NewStringValue(server.ServiceAccountAccessKeyId),
ConstSecretAccessKey: structpb.NewStringValue(server.ServiceAccountSecretAccessKey),
},
},
},
KeyPrefix: "obj/path",
},
setup: func(cl *minio.Client, req *plugin.DeleteObjectsRequest) error {
contents := "test file contents"
file := bytes.NewBufferString(contents)
if _, err := cl.PutObject(ctx, bucketName, req.KeyPrefix, file, int64(len(contents)), minio.PutObjectOptions{}); err != nil {
return err
}
return nil
},
expected: 1,
},
{
name: "successNothingToDelete",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
Attributes: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstEndpointUrl: structpb.NewStringValue("http://" + server.ApiAddr),
},
},
Secrets: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstAccessKeyId: structpb.NewStringValue(server.ServiceAccountAccessKeyId),
ConstSecretAccessKey: structpb.NewStringValue(server.ServiceAccountSecretAccessKey),
},
},
},
KeyPrefix: "obj/path",
},
// notice this is still 1, non-recursive always returns 1 on success, even if nothing was *actually* deleted
expected: 1,
},
{
name: "successRecursiveOneObjNoSlash",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
Attributes: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstEndpointUrl: structpb.NewStringValue("http://" + server.ApiAddr),
},
},
Secrets: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstAccessKeyId: structpb.NewStringValue(server.ServiceAccountAccessKeyId),
ConstSecretAccessKey: structpb.NewStringValue(server.ServiceAccountSecretAccessKey),
},
},
},
KeyPrefix: "obj/path",
Recursive: true,
},
setup: func(cl *minio.Client, req *plugin.DeleteObjectsRequest) error {
contents := "test file contents"
file := bytes.NewBufferString(contents)
if _, err := cl.PutObject(ctx, bucketName, req.KeyPrefix, file, int64(len(contents)), minio.PutObjectOptions{}); err != nil {
return err
}
return nil
},
expected: 1,
},
{
name: "successRecursiveNothingToDelete",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
Attributes: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstEndpointUrl: structpb.NewStringValue("http://" + server.ApiAddr),
},
},
Secrets: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstAccessKeyId: structpb.NewStringValue(server.ServiceAccountAccessKeyId),
ConstSecretAccessKey: structpb.NewStringValue(server.ServiceAccountSecretAccessKey),
},
},
},
KeyPrefix: "obj/path",
Recursive: true,
},
expected: 0,
},
{
name: "successRecursiveOneObj",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
Attributes: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstEndpointUrl: structpb.NewStringValue("http://" + server.ApiAddr),
},
},
Secrets: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstAccessKeyId: structpb.NewStringValue(server.ServiceAccountAccessKeyId),
ConstSecretAccessKey: structpb.NewStringValue(server.ServiceAccountSecretAccessKey),
},
},
},
KeyPrefix: "obj/path/",
Recursive: true,
},
setup: func(cl *minio.Client, req *plugin.DeleteObjectsRequest) error {
contents := "test file contents"
file := bytes.NewBufferString(contents)
if _, err := cl.PutObject(ctx, bucketName, req.KeyPrefix+"resr", file, int64(len(contents)), minio.PutObjectOptions{}); err != nil {
return err
}
return nil
},
expected: 1,
},
{
name: "successRecursiveManyObj",
req: &plugin.DeleteObjectsRequest{
Bucket: &storagebuckets.StorageBucket{
BucketName: bucketName,
Attributes: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstEndpointUrl: structpb.NewStringValue("http://" + server.ApiAddr),
},
},
Secrets: &structpb.Struct{
Fields: map[string]*structpb.Value{
ConstAccessKeyId: structpb.NewStringValue(server.ServiceAccountAccessKeyId),
ConstSecretAccessKey: structpb.NewStringValue(server.ServiceAccountSecretAccessKey),
},
},
},
KeyPrefix: "obj/path/",
Recursive: true,
},
setup: func(cl *minio.Client, req *plugin.DeleteObjectsRequest) error {
contents := "test file contents"
for i := 0; i < 10; i++ {
file := bytes.NewBufferString(contents)
if _, err := cl.PutObject(ctx, bucketName, req.KeyPrefix+fmt.Sprint(i), file, int64(len(contents)), minio.PutObjectOptions{}); err != nil {
return err
}
}
return nil
},
expected: 10,
},
}

for _, tt := range cases {
tt := tt
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
cl, err := minio.New(server.ApiAddr, &minio.Options{
Creds: credentials.NewStaticV4(server.RootUsername, server.RootPassword, ""),
Secure: false,
})
require.NoError(err)

if tt.setup != nil {
require.NoError(tt.setup(cl, tt.req))
}

sp := new(StoragePlugin)
res, err := sp.DeleteObjects(ctx, tt.req)

if tt.err != "" {
assert.NotNil(status.Convert(err))
assert.ErrorContains(err, tt.err)
assert.Equal(tt.errCode.String(), status.Code(err).String())
return
}

require.NotNil(res)
assert.Equal(tt.expected, res.ObjectsDeleted)

// make sure they were actually deleted on the server
obj := 0
for range cl.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
// we don't add the slash back here cause we want to make sure everything is gone
Prefix: path.Join(tt.req.Bucket.GetBucketPrefix(), tt.req.GetKeyPrefix()),
Recursive: true,
}) {
obj++
}
assert.Equal(0, obj)
})
}
}

func TestDryRun(t *testing.T) {
ctx := context.Background()
Expand Down

0 comments on commit 09cece6

Please sign in to comment.