Skip to content

Commit

Permalink
[IMPROVED] Bind Streams in Object Store Watchers (#1578)
Browse files Browse the repository at this point in the history
When mirroring Object Store, there is a need to setup subject
transformations. However, client also needs to bind to the stream
to avoid stream lookup, which is both not necessary and expensive,
and also does not work with Object Store mirrors.

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema authored and piotrpio committed Dec 13, 2024
1 parent f0712b8 commit 0d49e3f
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 2 deletions.
3 changes: 2 additions & 1 deletion jetstream/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,8 @@ func (obs *obs) Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, err
}

// Used ordered consumer to deliver results.
subOpts := []nats.SubOpt{nats.OrderedConsumer()}
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subOpts := []nats.SubOpt{nats.OrderedConsumer(), nats.BindStream(streamName)}
if !o.includeHistory {
subOpts = append(subOpts, nats.DeliverLastPerSubject())
}
Expand Down
69 changes: 69 additions & 0 deletions jetstream/test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,3 +1197,72 @@ func TestObjectStoreCompression(t *testing.T) {
t.Fatalf("Expected stream to be compressed with S2")
}
}

func TestObjectStoreMirror(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

bucketName := "test-bucket"

ctx := context.Background()
obs, err := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: bucketName, Description: "testing"})
expectOk(t, err)

mirrorBucketName := "mirror-test-bucket"

_, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: fmt.Sprintf("OBJ_%s", mirrorBucketName),
Mirror: &jetstream.StreamSource{
Name: fmt.Sprintf("OBJ_%s", bucketName),
SubjectTransforms: []jetstream.SubjectTransformConfig{
{
Source: fmt.Sprintf("$O.%s.>", bucketName),
Destination: fmt.Sprintf("$O.%s.>", mirrorBucketName),
},
},
},
AllowRollup: true, // meta messages are always rollups
})
if err != nil {
t.Fatalf("Error creating object store bucket mirror: %v", err)
}

_, err = obs.PutString(ctx, "A", "abc")
expectOk(t, err)

mirrorObs, err := js.ObjectStore(ctx, mirrorBucketName)
expectOk(t, err)

// Make sure we sync.
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
mirrorValue, err := mirrorObs.GetString(ctx, "A")
if err != nil {
return err
}
if mirrorValue != "abc" {
t.Fatalf("Expected mirrored object store value to be the same as original")
}
return nil
})

watcher, err := mirrorObs.Watch(ctx)
if err != nil {
t.Fatalf("Error creating watcher: %v", err)
}
defer watcher.Stop()

// expect to get one value and nil
for {
select {
case info := <-watcher.Updates():
if info == nil {
return
}
case <-time.After(2 * time.Second):
t.Fatalf("Expected to receive an update")
}
}
}
3 changes: 2 additions & 1 deletion object.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,8 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
}

// Used ordered consumer to deliver results.
subOpts := []SubOpt{OrderedConsumer()}
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subOpts := []SubOpt{OrderedConsumer(), BindStream(streamName)}
if !o.includeHistory {
subOpts = append(subOpts, DeliverLastPerSubject())
}
Expand Down
18 changes: 18 additions & 0 deletions test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,4 +1163,22 @@ func TestObjectStoreMirror(t *testing.T) {
}
return nil
})

watcher, err := mirrorObs.Watch()
if err != nil {
t.Fatalf("Error creating watcher: %v", err)
}
defer watcher.Stop()

// expect to get one value and nil
for {
select {
case info := <-watcher.Updates():
if info == nil {
return
}
case <-time.After(2 * time.Second):
t.Fatalf("Expected to receive an update")
}
}
}

0 comments on commit 0d49e3f

Please sign in to comment.