Skip to content

Commit

Permalink
uplink,private/{metaclient,object}: propagate copy object retention i…
Browse files Browse the repository at this point in the history
…nfo to metainfo

Closes storj/edge#452

Change-Id: I65d111d9c720ae436960996642dca17d91fc5459
  • Loading branch information
wilfred-asomanii committed Aug 13, 2024
1 parent c28e851 commit 9c782c9
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 9 deletions.
4 changes: 3 additions & 1 deletion copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"

"github.com/zeebo/errs"

"storj.io/uplink/private/metaclient"
)

// CopyObjectOptions options for CopyObject method.
Expand All @@ -24,7 +26,7 @@ func (project *Project) CopyObject(ctx context.Context, oldBucket, oldKey, newBu
}
defer func() { err = errs.Combine(err, db.Close()) }()

obj, err := db.CopyObject(ctx, oldBucket, oldKey, nil, newBucket, newKey)
obj, err := db.CopyObject(ctx, oldBucket, oldKey, nil, newBucket, newKey, metaclient.CopyObjectOptions{})
if err != nil {
return nil, convertKnownErrors(err, oldBucket, oldKey)
}
Expand Down
10 changes: 9 additions & 1 deletion private/metaclient/client_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type FinishCopyObjectParams struct {
NewEncryptedMetadataKeyNonce storj.Nonce
NewEncryptedMetadataKey []byte
NewSegmentKeys []EncryptedKeyAndNonce
Retention Retention
}

func (params *FinishCopyObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectFinishCopyRequest {
Expand All @@ -106,7 +107,7 @@ func (params *FinishCopyObjectParams) toRequest(header *pb.RequestHeader) *pb.Ob
EncryptedKey: keyAndNonce.EncryptedKey,
}
}
return &pb.ObjectFinishCopyRequest{
request := &pb.ObjectFinishCopyRequest{
Header: header,
StreamId: params.StreamID,
NewBucket: params.NewBucket,
Expand All @@ -115,6 +116,13 @@ func (params *FinishCopyObjectParams) toRequest(header *pb.RequestHeader) *pb.Ob
NewEncryptedMetadataKey: params.NewEncryptedMetadataKey,
NewSegmentKeys: keys,
}
if params.Retention != (Retention{}) {
request.Retention = &pb.Retention{
Mode: pb.Retention_Mode(params.Retention.Mode),
RetainUntil: params.Retention.RetainUntil,
}
}
return request
}

// BatchItem returns single item for batch request.
Expand Down
15 changes: 12 additions & 3 deletions private/metaclient/movecopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ type EncryptedKeyAndNonce struct {
EncryptedKey []byte
}

// CopyObjectOptions options for CopyObject method.
type CopyObjectOptions struct {
Retention Retention
}

// CopyObject atomically copies object to a different bucket or/and key. Source object version can be specified.
func (db *DB) CopyObject(ctx context.Context, sourceBucket, sourceKey string, sourceVersion []byte, targetBucket, targetKey string) (_ *Object, err error) {
func (db *DB) CopyObject(ctx context.Context, sourceBucket, sourceKey string, sourceVersion []byte, targetBucket, targetKey string, opts CopyObjectOptions) (_ *Object, err error) {
defer mon.Task()(&ctx)(&err)

err = validateMoveCopyInput(sourceBucket, sourceKey, targetBucket, targetKey)
Expand Down Expand Up @@ -72,14 +77,18 @@ func (db *DB) CopyObject(ctx context.Context, sourceBucket, sourceKey string, so
return nil, errs.Wrap(err)
}

obj, err := db.metainfo.FinishCopyObject(ctx, FinishCopyObjectParams{
params := FinishCopyObjectParams{
StreamID: response.StreamID,
NewBucket: []byte(targetBucket),
NewEncryptedObjectKey: []byte(targetEncKey.Raw()),
NewEncryptedMetadataKeyNonce: newMetadataKeyNonce,
NewEncryptedMetadataKey: newMetadataEncryptedKey,
NewSegmentKeys: newKeys,
})
}
if opts != (CopyObjectOptions{}) {
params.Retention = opts.Retention
}
obj, err := db.metainfo.FinishCopyObject(ctx, params)
if err != nil {
return nil, errs.Wrap(err)
}
Expand Down
22 changes: 20 additions & 2 deletions private/object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package object
import (
"context"
"errors"
"strings"
_ "unsafe" // for go:linkname

"github.com/spacemonkeygo/monkit/v3"
Expand All @@ -28,6 +29,9 @@ var packageError = errs.Class("object")
// ErrMethodNotAllowed is returned when method is not allowed against specified entity (e.g. object).
var ErrMethodNotAllowed = errors.New("method not allowed")

// ErrNoObjectLockConfiguration is returned when a locked object is copied to a bucket without object lock configuration.
var ErrNoObjectLockConfiguration = errors.New("destination bucket has no object lock configuration")

// IPSummary contains information about the object IP-s.
type IPSummary = metaclient.GetObjectIPsResponse

Expand Down Expand Up @@ -59,6 +63,11 @@ type ListObjectVersionsOptions struct {
Limit int
}

// CopyObjectOptions options for CopyObject method.
type CopyObjectOptions struct {
Retention metaclient.Retention
}

// Info returns the last information about the uploaded object.
func (upload *VersionedUpload) Info() *VersionedObject {
metaObj := upload_getMetaclientObject(upload.upload)
Expand Down Expand Up @@ -278,7 +287,7 @@ func CommitUpload(ctx context.Context, project *uplink.Project, bucket, key, upl
}

// CopyObject atomically copies object to a different bucket or/and key.
func CopyObject(ctx context.Context, project *uplink.Project, sourceBucket, sourceKey string, sourceVersion []byte, targetBucket, targetKey string, options *uplink.CopyObjectOptions) (_ *VersionedObject, err error) {
func CopyObject(ctx context.Context, project *uplink.Project, sourceBucket, sourceKey string, sourceVersion []byte, targetBucket, targetKey string, options CopyObjectOptions) (_ *VersionedObject, err error) {
defer mon.Task()(&ctx)(&err)

db, err := dialMetainfoDB(ctx, project)
Expand All @@ -287,7 +296,11 @@ func CopyObject(ctx context.Context, project *uplink.Project, sourceBucket, sour
}
defer func() { err = errs.Combine(err, db.Close()) }()

obj, err := db.CopyObject(ctx, sourceBucket, sourceKey, sourceVersion, targetBucket, targetKey)
metaOpts := metaclient.CopyObjectOptions{}
if options != (CopyObjectOptions{}) {
metaOpts.Retention = options.Retention
}
obj, err := db.CopyObject(ctx, sourceBucket, sourceKey, sourceVersion, targetBucket, targetKey, metaOpts)
if err != nil {
return nil, packageConvertKnownErrors(err, sourceBucket, sourceKey)
}
Expand Down Expand Up @@ -370,6 +383,11 @@ func packageConvertKnownErrors(err error, bucket, key string) error {
if errs2.IsRPC(err, rpcstatus.MethodNotAllowed) {
return ErrMethodNotAllowed
}
if errs2.IsRPC(err, rpcstatus.FailedPrecondition) {
if strings.HasSuffix(errs.Unwrap(err).Error(), "cannot specify Object Lock settings when uploading into a bucket without Object Lock enabled") {
return ErrNoObjectLockConfiguration
}
}
return convertKnownErrors(err, bucket, key)
}

Expand Down
84 changes: 82 additions & 2 deletions testsuite/private/object/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestCopyObject(t *testing.T) {
_, err = planet.Uplinks[0].UploadWithOptions(ctx, planet.Satellites[0], bucketName, objectKey, testrand.Bytes(6*memory.KiB), nil)
require.NoError(t, err)

copiedObject, err := object.CopyObject(ctx, project, bucketName, objectKey, obj.Version, bucketName, objectKey+"-copy", nil)
copiedObject, err := object.CopyObject(ctx, project, bucketName, objectKey, obj.Version, bucketName, objectKey+"-copy", object.CopyObjectOptions{})
require.NoError(t, err)
require.NotEmpty(t, copiedObject.Version)

Expand All @@ -500,11 +500,91 @@ func TestCopyObject(t *testing.T) {

nonExistingVersion := slices.Clone(obj.Version)
nonExistingVersion[0]++ // change original version
_, err = object.CopyObject(ctx, project, bucketName, objectKey, nonExistingVersion, bucketName, objectKey+"-copy", nil)
_, err = object.CopyObject(ctx, project, bucketName, objectKey, nonExistingVersion, bucketName, objectKey+"-copy", object.CopyObjectOptions{})
require.ErrorIs(t, err, uplink.ErrObjectNotFound)
})
}

func TestCopyObjectWithObjectLock(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.UseBucketLevelObjectVersioning = true
config.Metainfo.UseBucketLevelObjectLock = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
upl := planet.Uplinks[0]
projectID := upl.Projects[0].ID
userID := upl.Projects[0].Owner.ID

userCtx, err := sat.UserContext(ctx, userID)
require.NoError(t, err)

_, key, err := sat.API.Console.Service.CreateAPIKey(userCtx, projectID, "test key", macaroon.APIKeyVersionObjectLock)
require.NoError(t, err)

access, err := uplink.RequestAccessWithPassphrase(ctx, sat.URL(), key.Serialize(), "")
require.NoError(t, err)

upl.Access[sat.ID()] = access

err = sat.API.DB.Console().Projects().UpdateDefaultVersioning(ctx, projectID, console.DefaultVersioning(buckets.VersioningEnabled))
require.NoError(t, err)

project, err := upl.OpenProject(ctx, sat)
require.NoError(t, err)
defer ctx.Check(project.Close)

bucketName := "test-bucket"
objectKey := "test-object"

_, err = bucket.CreateBucketWithObjectLock(ctx, project, bucket.CreateBucketWithObjectLockParams{
Name: bucketName,
ObjectLockEnabled: true,
})
require.NoError(t, err)

obj, err := planet.Uplinks[0].UploadWithOptions(ctx, planet.Satellites[0], bucketName, objectKey, testrand.Bytes(5*memory.KiB), nil)
require.NoError(t, err)

retention := metaclient.Retention{
Mode: storj.ComplianceMode,
RetainUntil: time.Now().Add(time.Hour),
}
err = object.SetObjectRetention(ctx, project, bucketName, objectKey, obj.Version, retention)
require.NoError(t, err)

copiedObject, err := object.CopyObject(ctx, project, bucketName, objectKey, obj.Version, bucketName, objectKey+"-copy", object.CopyObjectOptions{
Retention: retention,
})
require.NoError(t, err)
require.NotNil(t, copiedObject.Retention)
require.Equal(t, retention.Mode, copiedObject.Retention.Mode)
require.WithinDuration(t, retention.RetainUntil.UTC(), copiedObject.Retention.RetainUntil, time.Minute)

objectInfo, err := object.StatObject(ctx, project, bucketName, copiedObject.Key, copiedObject.Version)
require.NoError(t, err)
require.Equal(t, retention.Mode, objectInfo.Retention.Mode)
require.WithinDuration(t, retention.RetainUntil.UTC(), objectInfo.Retention.RetainUntil, time.Minute)

noLockBucket := "no-lock-bucket"
_, err = bucket.CreateBucketWithObjectLock(ctx, project, bucket.CreateBucketWithObjectLockParams{
Name: noLockBucket,
ObjectLockEnabled: false,
})
require.NoError(t, err)

// cannot copy a locked object to a bucket without object lock
_, err = object.CopyObject(ctx, project, bucketName, objectKey, obj.Version, noLockBucket, objectKey, object.CopyObjectOptions{
Retention: retention,
})
require.ErrorIs(t, err, object.ErrNoObjectLockConfiguration)
})
}

func TestObject_Versioning(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
Expand Down

0 comments on commit 9c782c9

Please sign in to comment.