From 98919ae3ce7f63987131bb15dfeec3fa6bb3a2bd Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 4 Mar 2024 17:13:43 +0100 Subject: [PATCH] [IMPROVED] Bind Streams in Object Store Watchers (#1578) When mirroring Object Store, there is a need to setup subject transformations. However, client also needs to bind to the stream to avoid stream lookup, which is both not necessary and expensive, and also does not work with Object Store mirrors. Signed-off-by: Tomasz Pietrek --- jetstream/object.go | 3 +- jetstream/test/object_test.go | 69 +++++++++++++++++++++++++++++++++++ object.go | 3 +- test/object_test.go | 18 +++++++++ 4 files changed, 91 insertions(+), 2 deletions(-) diff --git a/jetstream/object.go b/jetstream/object.go index 8b2b7097b..a0eecff33 100644 --- a/jetstream/object.go +++ b/jetstream/object.go @@ -1308,7 +1308,8 @@ func (obs *obs) Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, err } // Used ordered consumer to deliver results. - subOpts := []nats.SubOpt{nats.OrderedConsumer()} + streamName := fmt.Sprintf(objNameTmpl, obs.name) + subOpts := []nats.SubOpt{nats.OrderedConsumer(), nats.BindStream(streamName)} if !o.includeHistory { subOpts = append(subOpts, nats.DeliverLastPerSubject()) } diff --git a/jetstream/test/object_test.go b/jetstream/test/object_test.go index 70e2b7096..8f421c51f 100644 --- a/jetstream/test/object_test.go +++ b/jetstream/test/object_test.go @@ -1197,3 +1197,72 @@ func TestObjectStoreCompression(t *testing.T) { t.Fatalf("Expected stream to be compressed with S2") } } + +func TestObjectStoreMirror(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + bucketName := "test-bucket" + + ctx := context.Background() + obs, err := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: bucketName, Description: "testing"}) + expectOk(t, err) + + mirrorBucketName := "mirror-test-bucket" + + _, err = js.CreateStream(ctx, jetstream.StreamConfig{ + Name: fmt.Sprintf("OBJ_%s", mirrorBucketName), + Mirror: &jetstream.StreamSource{ + Name: fmt.Sprintf("OBJ_%s", bucketName), + SubjectTransforms: []jetstream.SubjectTransformConfig{ + { + Source: fmt.Sprintf("$O.%s.>", bucketName), + Destination: fmt.Sprintf("$O.%s.>", mirrorBucketName), + }, + }, + }, + AllowRollup: true, // meta messages are always rollups + }) + if err != nil { + t.Fatalf("Error creating object store bucket mirror: %v", err) + } + + _, err = obs.PutString(ctx, "A", "abc") + expectOk(t, err) + + mirrorObs, err := js.ObjectStore(ctx, mirrorBucketName) + expectOk(t, err) + + // Make sure we sync. + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + mirrorValue, err := mirrorObs.GetString(ctx, "A") + if err != nil { + return err + } + if mirrorValue != "abc" { + t.Fatalf("Expected mirrored object store value to be the same as original") + } + return nil + }) + + watcher, err := mirrorObs.Watch(ctx) + if err != nil { + t.Fatalf("Error creating watcher: %v", err) + } + defer watcher.Stop() + + // expect to get one value and nil + for { + select { + case info := <-watcher.Updates(): + if info == nil { + return + } + case <-time.After(2 * time.Second): + t.Fatalf("Expected to receive an update") + } + } +} diff --git a/object.go b/object.go index 4a965adfc..75ceaa8e9 100644 --- a/object.go +++ b/object.go @@ -1115,7 +1115,8 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) { } // Used ordered consumer to deliver results. - subOpts := []SubOpt{OrderedConsumer()} + streamName := fmt.Sprintf(objNameTmpl, obs.name) + subOpts := []SubOpt{OrderedConsumer(), BindStream(streamName)} if !o.includeHistory { subOpts = append(subOpts, DeliverLastPerSubject()) } diff --git a/test/object_test.go b/test/object_test.go index a386ec8ce..f6ecb57a2 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -1163,4 +1163,22 @@ func TestObjectStoreMirror(t *testing.T) { } return nil }) + + watcher, err := mirrorObs.Watch() + if err != nil { + t.Fatalf("Error creating watcher: %v", err) + } + defer watcher.Stop() + + // expect to get one value and nil + for { + select { + case info := <-watcher.Updates(): + if info == nil { + return + } + case <-time.After(2 * time.Second): + t.Fatalf("Expected to receive an update") + } + } }