From 366498866f93b15f0efe8bc719a887addc22a72c Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 8 Feb 2024 13:50:38 +0100 Subject: [PATCH 1/2] [ADDED] UpdateObjectStore and CreateOrUpdateObjectStore methods Signed-off-by: Piotr Piotrowski --- jetstream/object.go | 85 +++++++++++++++++++++++++++++------ jetstream/test/object_test.go | 65 ++++++++++++++++++++++++++- 2 files changed, 135 insertions(+), 15 deletions(-) diff --git a/jetstream/object.go b/jetstream/object.go index 32715d788..7cc622de9 100644 --- a/jetstream/object.go +++ b/jetstream/object.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -41,6 +41,10 @@ type ( ObjectStore(ctx context.Context, bucket string) (ObjectStore, error) // CreateObjectStore will create an object store. CreateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) + // UpdateObjectStore will update an existing object store. + UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) + // CreateOrUpdateObjectStore will create or update an object store. + CreateOrUpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) // DeleteObjectStore will delete the underlying stream for the named object. DeleteObjectStore(ctx context.Context, bucket string) error // ObjectStoreNames is used to retrieve a list of bucket names @@ -253,10 +257,73 @@ const ( objDigestTmpl = objDigestType + "%s" ) -// CreateObjectStore will create an object store. func (js *jetStream) CreateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) { + scfg, err := js.prepareObjectStoreConfig(ctx, cfg) + if err != nil { + return nil, err + } + + stream, err := js.CreateStream(ctx, scfg) + if err != nil { + if errors.Is(err, ErrStreamNameAlreadyInUse) { + // errors are joined so that backwards compatibility is retained + // and previous checks for ErrStreamNameAlreadyInUse will still work. + err = errors.Join(fmt.Errorf("%w: %s", ErrBucketExists, cfg.Bucket), err) + } + return nil, err + } + pushJS, err := js.legacyJetStream() + if err != nil { + return nil, err + } + + return mapStreamToObjectStore(js, pushJS, cfg.Bucket, stream), nil +} + +func (js *jetStream) UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) { + scfg, err := js.prepareObjectStoreConfig(ctx, cfg) + if err != nil { + return nil, err + } + + // Attempt to update the stream. + // If the stream does not exist, create it. + stream, err := js.UpdateStream(ctx, scfg) + if err != nil { + if errors.Is(err, ErrStreamNotFound) { + return nil, fmt.Errorf("%w: %s", ErrBucketNotFound, cfg.Bucket) + } + return nil, err + } + pushJS, err := js.legacyJetStream() + if err != nil { + return nil, err + } + + return mapStreamToObjectStore(js, pushJS, cfg.Bucket, stream), nil +} + +func (js *jetStream) CreateOrUpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) { + scfg, err := js.prepareObjectStoreConfig(ctx, cfg) + if err != nil { + return nil, err + } + + stream, err := js.CreateOrUpdateStream(ctx, scfg) + if err != nil { + return nil, err + } + pushJS, err := js.legacyJetStream() + if err != nil { + return nil, err + } + + return mapStreamToObjectStore(js, pushJS, cfg.Bucket, stream), nil +} + +func (js *jetStream) prepareObjectStoreConfig(ctx context.Context, cfg ObjectStoreConfig) (StreamConfig, error) { if !validBucketRe.MatchString(cfg.Bucket) { - return nil, ErrInvalidStoreName + return StreamConfig{}, ErrInvalidStoreName } name := cfg.Bucket @@ -294,17 +361,7 @@ func (js *jetStream) CreateObjectStore(ctx context.Context, cfg ObjectStoreConfi Compression: compression, } - // Create our stream. - stream, err := js.CreateStream(ctx, scfg) - if err != nil { - return nil, err - } - pushJS, err := js.legacyJetStream() - if err != nil { - return nil, err - } - - return mapStreamToObjectStore(js, pushJS, name, stream), nil + return scfg, nil } // ObjectStore will look up and bind to an existing object store instance. diff --git a/jetstream/test/object_test.go b/jetstream/test/object_test.go index 7b9d85fc2..e315797f3 100644 --- a/jetstream/test/object_test.go +++ b/jetstream/test/object_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -129,6 +129,69 @@ func TestObjectBasics(t *testing.T) { expectErr(t, err, jetstream.ErrBucketNotFound) } +func TestCreateObjectStore(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx := context.Background() + + // invalid bucket name + _, err := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST.", Description: "Test store"}) + expectErr(t, err, jetstream.ErrInvalidStoreName) + + _, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "Test store"}) + expectOk(t, err) + + // Check that we can't overwrite existing bucket. + _, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "New store"}) + expectErr(t, err, jetstream.ErrBucketExists) + + // assert that we're backwards compatible + expectErr(t, err, jetstream.ErrStreamNameAlreadyInUse) +} + +func TestUpdateObjectStore(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx := context.Background() + + // cannot update a non-existing bucket + _, err := js.UpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "Test store"}) + expectErr(t, err, jetstream.ErrBucketNotFound) + + _, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "Test store"}) + expectOk(t, err) + + // update the bucket + _, err = js.UpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "New store"}) + expectOk(t, err) +} + +func TestCreateOrUpdateObjectStore(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx := context.Background() + + // invalid bucket name + _, err := js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST.", Description: "Test store"}) + expectErr(t, err, jetstream.ErrInvalidStoreName) + + _, err = js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "Test store"}) + expectOk(t, err) + + // update the bucket + _, err = js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "New store"}) + expectOk(t, err) +} + func TestGetObjectDigestMismatch(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) From b54334236bd0655a2f890b835029a88fb873c4c0 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Fri, 9 Feb 2024 11:26:04 +0100 Subject: [PATCH 2/2] Remove invalid comment Co-authored-by: Tomasz Pietrek --- jetstream/object.go | 1 - 1 file changed, 1 deletion(-) diff --git a/jetstream/object.go b/jetstream/object.go index 7cc622de9..b0c8a3a53 100644 --- a/jetstream/object.go +++ b/jetstream/object.go @@ -287,7 +287,6 @@ func (js *jetStream) UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfi } // Attempt to update the stream. - // If the stream does not exist, create it. stream, err := js.UpdateStream(ctx, scfg) if err != nil { if errors.Is(err, ErrStreamNotFound) {