Skip to content

Commit

Permalink
js: mirror test updates
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed Feb 6, 2023
1 parent c860828 commit 424de47
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 87 deletions.
3 changes: 2 additions & 1 deletion jserrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ const (

JSErrCodeMessageNotFound ErrorCode = 10037

JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeBadRequest ErrorCode = 10003
JSStreamInvalidConfig ErrorCode = 10052

JSErrCodeStreamWrongLastSequence ErrorCode = 10071
)
Expand Down
147 changes: 61 additions & 86 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5515,103 +5515,78 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
}
})

// Commenting out this test until we figure out what was the intent.
// Since v2.8.0, this test would fail with a "detected cycle" error,
// I guess because "m1" already sources "origin", so creating a
// stream with both as a source is bad.
/*
t.Run("create sourced stream from origin", func(t *testing.T) {
sources := make([]*nats.StreamSource, 0)
sources = append(sources, &nats.StreamSource{Name: "origin"})
sources = append(sources, &nats.StreamSource{Name: "m1"})
streamName := "s2"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Sources: sources,
Storage: nats.FileStorage,
Replicas: 1,
})
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}
msgs := make([]*nats.RawStreamMsg, 0)
// Stored message sequences start at 1
startSequence := 1
expectedTotal := totalMsgs * 2
t.Run("bind to stream with subject not in stream", func(t *testing.T) {
// The Stream does not have a subject called 'nothing' but client is still able
// to bind to the origin stream even though it cannot consume messages.
// After updating the stream with the subject this consumer will be able to
// match and receive messages.
sub, err := js.SubscribeSync("nothing", nats.BindStream("origin"))
if err != nil {
t.Fatal(err)
}
_, err = sub.NextMsg(1 * time.Second)
if !errors.Is(err, nats.ErrTimeout) {
t.Fatal("Expected timeout error")
}

GetNextMsg:
for i := startSequence; i < expectedTotal+1; i++ {
var (
err error
seq = uint64(i)
msg *nats.RawStreamMsg
timeout = time.Now().Add(5 * time.Second)
)
info, err := sub.ConsumerInfo()
if err != nil {
t.Fatal(err)
}
got := info.Stream
expected := "origin"
if got != expected {
t.Fatalf("Expected %v, got %v", expected, got)
}

Retry:
for time.Now().Before(timeout) {
msg, err = js.GetMsg(streamName, seq)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue Retry
}
msgs = append(msgs, msg)
continue GetNextMsg
}
if err != nil {
t.Fatalf("Unexpected error fetching seq=%v: %v", seq, err)
}
}
got = info.Config.FilterSubject
expected = "nothing"
if got != expected {
t.Fatalf("Expected %v, got %v", expected, got)
}

got := len(msgs)
if got < expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}
t.Run("can consume after stream update", func(t *testing.T) {
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "origin",
Placement: &nats.Placement{
Tags: []string{"NODE_0"},
},
Storage: nats.MemoryStorage,
Replicas: 1,
Subjects: []string{"origin", "nothing"},
})
js.Publish("nothing", []byte("hello world"))

si, err := js.StreamInfo(streamName)
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
got = int(si.State.Msgs)
if got != expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
t.Error(err)
}
got = len(si.Sources)
expected := 2
got = msg.Subject
expected = "nothing"
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
t.Fatalf("Expected %v, got %v", expected, got)
}

t.Run("consume from sourced stream", func(t *testing.T) {
sub, err := js.SubscribeSync("origin", nats.BindStream(streamName))
if err != nil {
t.Fatal(err)
}
})
})

mmsgs := make([]*nats.Msg, 0)
for i := 0; i < totalMsgs; i++ {
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Error(err)
}
meta, err := msg.Metadata()
if err != nil {
t.Error(err)
}
if meta.Stream != streamName {
t.Errorf("Expected m1, got: %v", meta.Stream)
}
mmsgs = append(mmsgs, msg)
}
if len(mmsgs) != totalMsgs {
t.Errorf("Expected to consume %v msgs, got: %v", totalMsgs, len(mmsgs))
}
})
t.Run("create sourced stream with a cycle", func(t *testing.T) {
// Since v2.8.0, this test would fail with a "detected cycle" error.
sources := make([]*nats.StreamSource, 0)
sources = append(sources, &nats.StreamSource{Name: "origin"})
sources = append(sources, &nats.StreamSource{Name: "m1"})
streamName := "s2"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Sources: sources,
Storage: nats.FileStorage,
Replicas: 1,
})
*/
var aerr *nats.APIError
if ok := errors.As(err, &aerr); !ok || aerr.ErrorCode != nats.JSStreamInvalidConfig {
t.Fatalf("Expected nats.APIError, got %v", err)
}
})
}

func TestJetStream_ClusterMultipleSubscribe(t *testing.T) {
Expand Down

0 comments on commit 424de47

Please sign in to comment.