diff --git a/nats_test.go b/nats_test.go index 695753210..a69128ca6 100644 --- a/nats_test.go +++ b/nats_test.go @@ -61,7 +61,7 @@ func WaitTime(ch chan bool, timeout time.Duration) error { return errors.New("timeout") } -func stackFatalf(t *testing.T, f string, args ...interface{}) { +func stackFatalf(t *testing.T, f string, args ...any) { lines := make([]string, 0, 32) msg := fmt.Sprintf(f, args...) lines = append(lines, msg) @@ -1576,7 +1576,7 @@ func TestNoPanicOnSrvPoolSizeChanging(t *testing.T) { func TestHeaderParser(t *testing.T) { shouldErr := func(hdr string) { t.Helper() - if _, err := decodeHeadersMsg([]byte(hdr)); err == nil { + if _, err := DecodeHeadersMsg([]byte(hdr)); err == nil { t.Fatalf("Expected an error") } } @@ -1588,7 +1588,7 @@ func TestHeaderParser(t *testing.T) { // Check that we can do inline status and descriptions checkStatus := func(hdr string, status int, description string) { t.Helper() - hdrs, err := decodeHeadersMsg([]byte(hdr + "\r\n\r\n")) + hdrs, err := DecodeHeadersMsg([]byte(hdr + "\r\n\r\n")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1782,7 +1782,7 @@ func BenchmarkHeaderDecode(b *testing.B) { } for i := 0; i < b.N; i++ { - if _, err := decodeHeadersMsg(hdr); err != nil { + if _, err := DecodeHeadersMsg(hdr); err != nil { b.Fatalf("Unexpected error: %v", err) } } diff --git a/test/cluster_test.go b/test/cluster_test.go index cb6776451..5aa276b77 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -114,7 +114,7 @@ func TestServersOption(t *testing.T) { } func TestNewStyleServersOption(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL, nats.DontRandomize(), nats.Timeout(100*time.Millisecond)) + _, err := nats.Connect(nats.DefaultURL, nats.DontRandomize(), nats.Timeout(100*time.Millisecond)) if runtime.GOOS == "windows" { if err == nil || !strings.Contains(err.Error(), "timeout") { t.Fatalf("Expected timeout, got %v", err) @@ -122,7 +122,6 @@ func TestNewStyleServersOption(t *testing.T) { } else if err != nats.ErrNoServers { t.Fatalf("Wrong error: '%v'\n", err) } - defer nc.Close() servers := strings.Join(testServers, ",") _, err = nats.Connect(servers, nats.DontRandomize(), nats.Timeout(100*time.Millisecond)) @@ -139,7 +138,7 @@ func TestNewStyleServersOption(t *testing.T) { // Do this in case some failure occurs before explicit shutdown defer s1.Shutdown() - nc, err = nats.Connect(servers, nats.DontRandomize(), nats.Timeout(100*time.Millisecond)) + nc, err := nats.Connect(servers, nats.DontRandomize(), nats.Timeout(100*time.Millisecond)) if err != nil { t.Fatalf("Could not connect: %v\n", err) } diff --git a/test/conn_test.go b/test/conn_test.go index 70a10c066..588acc222 100644 --- a/test/conn_test.go +++ b/test/conn_test.go @@ -1058,13 +1058,12 @@ func TestConnectHandler(t *testing.T) { connHandler := func(*nats.Conn) { connected <- true } - nc, err := nats.Connect(nats.DefaultURL, + _, err := nats.Connect(nats.DefaultURL, nats.ConnectHandler(connHandler)) if err == nil { t.Fatalf("Expected error on connect, got nil") } - defer nc.Close() select { case <-connected: t.Fatalf("ConnectedCB invoked when no connection established") diff --git a/test/js_test.go b/test/js_test.go index 695e283d4..096c98978 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -479,6 +479,13 @@ func TestJetStreamSubscribe(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + initialPending, err := sub.InitialConsumerPending() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if initialPending != 0 { + t.Fatalf("Expected no initial pending, got %d", initialPending) + } sub.Unsubscribe() // Check that Queue subscribe with HB or FC fails. @@ -566,12 +573,20 @@ func TestJetStreamSubscribe(t *testing.T) { done <- true } }) + if err != nil { t.Fatalf("Unexpected error: %v", err) } expectConsumers(t, 3) defer sub3.Unsubscribe() + initialPending, err = sub3.InitialConsumerPending() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if initialPending != 10 { + t.Fatalf("Expected initial pending of 10, got %d", initialPending) + } select { case <-done: case <-time.After(5 * time.Second): @@ -958,6 +973,409 @@ func TestJetStreamSubscribe(t *testing.T) { } } +func TestJetStreamSubscribe_SkipConsumerLookup(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "cons", + DeliverSubject: "_INBOX.foo", + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // for checking whether subscribe looks up the consumer + infoSub, err := nc.SubscribeSync("$JS.API.CONSUMER.INFO.TEST.*") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer infoSub.Unsubscribe() + + // for checking whether subscribe creates the consumer + createConsSub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.>") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer createConsSub.Unsubscribe() + t.Run("use Bind to skip consumer lookup and create", func(t *testing.T) { + sub, err := js.SubscribeSync("", nats.Bind("TEST", "cons"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + // we should get timeout waiting for msg on CONSUMER.INFO + if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil { + t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject) + } + + // we should get timeout waiting for msg on CONSUMER.CREATE + if msg, err := createConsSub.NextMsg(50 * time.Millisecond); err == nil { + t.Fatalf("Expected to skip consumer create; got message on %q", msg.Subject) + } + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if _, err := sub.NextMsg(100 * time.Millisecond); err != nil { + t.Fatalf("Expected to receive msg; got: %s", err) + } + }) + t.Run("use Durable, skip consumer lookup but overwrite the consumer", func(t *testing.T) { + sub, err := js.SubscribeSync("foo", nats.Durable("cons"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // we should get timeout waiting for msg on CONSUMER.INFO + if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil { + t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject) + } + + // we should get msg on CONSUMER.CREATE + if _, err := createConsSub.NextMsg(50 * time.Millisecond); err != nil { + t.Fatalf("Expected consumer create; got: %s", err) + } + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if _, err := sub.NextMsg(100 * time.Millisecond); err != nil { + t.Fatalf("Expected to receive msg; got: %s", err) + } + }) + t.Run("create new consumer with Durable, skip lookup", func(t *testing.T) { + sub, err := js.SubscribeSync("foo", nats.Durable("pp"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo1")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + // we should get timeout waiting for msg on CONSUMER.INFO + if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil { + t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject) + } + + // we should get msg on CONSUMER.CREATE + if _, err := createConsSub.NextMsg(50 * time.Millisecond); err != nil { + t.Fatalf("Expected consumer create; got: %s", err) + } + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if _, err := sub.NextMsg(100 * time.Millisecond); err != nil { + t.Fatalf("Expected to receive msg; got: %s", err) + } + }) + t.Run("create new consumer with ConsumerName, skip lookup", func(t *testing.T) { + sub, err := js.SubscribeSync("foo", nats.ConsumerName("pp"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo1")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + // we should get timeout waiting for msg on CONSUMER.INFO + if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil { + t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject) + } + + // we should get msg on CONSUMER.CREATE + if _, err := createConsSub.NextMsg(50 * time.Millisecond); err != nil { + t.Fatalf("Expected consumer create; got: %s", err) + } + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if _, err := sub.NextMsg(100 * time.Millisecond); err != nil { + t.Fatalf("Expected to receive msg; got: %s", err) + } + }) + + t.Run("create ephemeral consumer, SkipConsumerLookup has no effect", func(t *testing.T) { + sub, err := js.SubscribeSync("foo", nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo2")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + // we should get timeout waiting for msg on CONSUMER.INFO + if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil { + t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject) + } + + // we should get msg on CONSUMER.CREATE + if _, err := createConsSub.NextMsg(50 * time.Millisecond); err != nil { + t.Fatalf("Expected consumer create; got: %s", err) + } + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if _, err := sub.NextMsg(100 * time.Millisecond); err != nil { + t.Fatalf("Expected to receive msg; got: %s", err) + } + }) + t.Run("attempt to update ack policy of existing consumer", func(t *testing.T) { + _, err := js.SubscribeSync("foo", nats.Durable("cons"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo"), nats.AckAll()) + if err == nil || !strings.Contains(err.Error(), "ack policy can not be updated") { + t.Fatalf("Expected update consumer error, got: %v", err) + } + }) +} + +func TestPullSubscribeFetchWithHeartbeat(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.PullSubscribe("foo", "") + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + defer sub.Unsubscribe() + for i := 0; i < 5; i++ { + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Unexpected error: %s", err) + } + } + + // fetch 5 messages, should finish immediately + msgs, err := sub.Fetch(5, nats.PullHeartbeat(100*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if len(msgs) != 5 { + t.Fatalf("Expected %d messages; got: %d", 5, len(msgs)) + } + now := time.Now() + // no messages available, should time out normally + _, err = sub.Fetch(5, nats.PullHeartbeat(50*time.Millisecond), nats.MaxWait(300*time.Millisecond)) + elapsed := time.Since(now) + if elapsed < 300*time.Millisecond { + t.Fatalf("Expected timeout after 300ms; got: %v", elapsed) + } + if !errors.Is(err, nats.ErrTimeout) { + t.Fatalf("Expected timeout error; got: %v", err) + } + + // delete consumer to verify heartbeats are not sent anymore + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := js.DeleteConsumer("TEST", info.Name); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = sub.Fetch(5, nats.PullHeartbeat(100*time.Millisecond), nats.MaxWait(1*time.Second)) + if !errors.Is(err, nats.ErrNoHeartbeat) { + t.Fatalf("Expected no heartbeat error; got: %v", err) + } + + // heartbeat value too large + _, err = sub.Fetch(5, nats.PullHeartbeat(200*time.Millisecond), nats.MaxWait(300*time.Millisecond)) + if !errors.Is(err, nats.ErrInvalidArg) { + t.Fatalf("Expected invalid arg error; got: %v", err) + } + + // heartbeat value invalid + _, err = sub.Fetch(5, nats.PullHeartbeat(-1)) + if !errors.Is(err, nats.ErrInvalidArg) { + t.Fatalf("Expected invalid arg error; got: %v", err) + } + + // set short timeout on JetStream context + js, err = nc.JetStream(nats.MaxWait(100 * time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub1, err := js.PullSubscribe("foo", "") + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + defer sub.Unsubscribe() + + // should produce invalid arg error based on default timeout from JetStream context + _, err = sub1.Fetch(5, nats.PullHeartbeat(100*time.Millisecond)) + if !errors.Is(err, nats.ErrInvalidArg) { + t.Fatalf("Expected invalid arg error; got: %v", err) + } + + // overwrite default timeout with context timeout, fetch available messages + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() + msgs, err = sub1.Fetch(10, nats.PullHeartbeat(100*time.Millisecond), nats.Context(ctx)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if len(msgs) != 5 { + t.Fatalf("Expected %d messages; got: %d", 5, len(msgs)) + } + for _, msg := range msgs { + msg.Ack() + } + + // overwrite default timeout with max wait, should time out because no messages are available + _, err = sub1.Fetch(5, nats.PullHeartbeat(100*time.Millisecond), nats.MaxWait(300*time.Millisecond)) + if !errors.Is(err, nats.ErrTimeout) { + t.Fatalf("Expected timeout error; got: %v", err) + } +} + +func TestPullSubscribeFetchBatchWithHeartbeat(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.PullSubscribe("foo", "") + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + defer sub.Unsubscribe() + for i := 0; i < 5; i++ { + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Unexpected error: %s", err) + } + } + + // fetch 5 messages, should finish immediately + msgs, err := sub.FetchBatch(5, nats.PullHeartbeat(100*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + var i int + for msg := range msgs.Messages() { + i++ + msg.Ack() + } + if i != 5 { + t.Fatalf("Expected %d messages; got: %d", 5, i) + } + if msgs.Error() != nil { + t.Fatalf("Unexpected error: %s", msgs.Error()) + } + now := time.Now() + // no messages available, should time out normally + msgs, err = sub.FetchBatch(5, nats.PullHeartbeat(50*time.Millisecond), nats.MaxWait(300*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + i = 0 + for msg := range msgs.Messages() { + i++ + msg.Ack() + } + elapsed := time.Since(now) + if i != 0 { + t.Fatalf("Expected %d messages; got: %d", 0, i) + } + if msgs.Error() != nil { + t.Fatalf("Unexpected error: %s", msgs.Error()) + } + if elapsed < 290*time.Millisecond { + t.Fatalf("Expected timeout after 300ms; got: %v", elapsed) + } + + // delete consumer to verify heartbeats are not sent anymore + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := js.DeleteConsumer("TEST", info.Name); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + now = time.Now() + msgs, err = sub.FetchBatch(5, nats.PullHeartbeat(100*time.Millisecond), nats.MaxWait(1*time.Second)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + for msg := range msgs.Messages() { + msg.Ack() + } + elapsed = time.Since(now) + if elapsed < 200*time.Millisecond || elapsed > 300*time.Millisecond { + t.Fatalf("Expected timeout after 200ms and before 300ms; got: %v", elapsed) + } + if !errors.Is(msgs.Error(), nats.ErrNoHeartbeat) { + t.Fatalf("Expected no heartbeat error; got: %v", err) + } + + // heartbeat value too large + _, err = sub.FetchBatch(5, nats.PullHeartbeat(200*time.Millisecond), nats.MaxWait(300*time.Millisecond)) + if !errors.Is(err, nats.ErrInvalidArg) { + t.Fatalf("Expected no heartbeat error; got: %v", err) + } + + // heartbeat value invalid + _, err = sub.FetchBatch(5, nats.PullHeartbeat(-1)) + if !errors.Is(err, nats.ErrInvalidArg) { + t.Fatalf("Expected no heartbeat error; got: %v", err) + } + + // set short timeout on JetStream context + js, err = nc.JetStream(nats.MaxWait(100 * time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub1, err := js.PullSubscribe("foo", "") + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + defer sub.Unsubscribe() + + // should produce invalid arg error based on default timeout from JetStream context + _, err = sub1.Fetch(5, nats.PullHeartbeat(100*time.Millisecond)) + if !errors.Is(err, nats.ErrInvalidArg) { + t.Fatalf("Expected invalid arg error; got: %v", err) + } + + // overwrite default timeout with context timeout, fetch available messages + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() + msgs, err = sub1.FetchBatch(10, nats.PullHeartbeat(100*time.Millisecond), nats.Context(ctx)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + for msg := range msgs.Messages() { + msg.Ack() + } + + // overwrite default timeout with max wait, should time out because no messages are available + msgs, err = sub1.FetchBatch(5, nats.PullHeartbeat(100*time.Millisecond), nats.MaxWait(300*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + <-msgs.Done() + if msgs.Error() != nil { + t.Fatalf("Unexpected error: %s", msgs.Error()) + } +} + func TestPullSubscribeFetchBatch(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) @@ -1258,7 +1676,7 @@ func TestPullSubscribeFetchBatch(t *testing.T) { t.Fatalf("Unexpected error: %s", err) } go func() { - time.Sleep(10 * time.Millisecond) + time.Sleep(200 * time.Millisecond) cancel() }() msgs := make([]*nats.Msg, 0) @@ -1733,14 +2151,46 @@ func TestJetStreamManagement(t *testing.T) { // Create the stream using our client API. var si *nats.StreamInfo + t.Run("create stream", func(t *testing.T) { - si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar"}}) + consLimits := nats.StreamConsumerLimits{ + MaxAckPending: 100, + InactiveThreshold: 10 * time.Second, + } + cfg := &nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo", "bar", "baz"}, + Compression: nats.S2Compression, + ConsumerLimits: nats.StreamConsumerLimits{ + MaxAckPending: 100, + InactiveThreshold: 10 * time.Second, + }, + FirstSeq: 22, + Metadata: map[string]string{ + "foo": "bar", + "baz": "quux", + }, + } + + si, err := js.AddStream(cfg) if err != nil { t.Fatalf("Unexpected error: %v", err) } if si == nil || si.Config.Name != "foo" { t.Fatalf("StreamInfo is not correct %+v", si) } + if !reflect.DeepEqual(si.Config.Metadata, map[string]string{"foo": "bar", "baz": "quux"}) { + t.Fatalf("Metadata is not correct %+v", si.Config.Metadata) + } + if si.Config.Compression != nats.S2Compression { + t.Fatalf("Compression is not correct %+v", si.Config.Compression) + } + if si.Config.FirstSeq != 22 { + t.Fatalf("FirstSeq is not correct %+v", si.Config.FirstSeq) + } + if si.Config.ConsumerLimits != consLimits { + t.Fatalf("ConsumerLimits is not correct %+v", si.Config.ConsumerLimits) + } }) t.Run("stream with given name already exists", func(t *testing.T) { @@ -1825,7 +2275,14 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc", + AckPolicy: nats.AckExplicitPolicy, + Metadata: map[string]string{ + "foo": "bar", + "baz": "quux", + }, + }) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1839,6 +2296,9 @@ func TestJetStreamManagement(t *testing.T) { if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", ci) } + if !reflect.DeepEqual(ci.Config.Metadata, map[string]string{"foo": "bar", "baz": "quux"}) { + t.Fatalf("Metadata is not correct %+v", ci.Config.Metadata) + } }) t.Run("with name set", func(t *testing.T) { sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc-1") @@ -1958,14 +2418,90 @@ func TestJetStreamManagement(t *testing.T) { } }) - t.Run("with invalid consumer name", func(t *testing.T) { - if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName { - t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) - } - if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test durable"}); err != nats.ErrInvalidConsumerName { - t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) + t.Run("durable consumer with multiple filter subjects", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc-5") + if err != nil { + t.Fatalf("Unexpected error: %v", err) } - }) + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc-5", + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", "bar"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"durable_name":"dlc-5"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Config.Durable != "dlc-5" || !reflect.DeepEqual(ci.Config.FilterSubjects, []string{"foo", "bar"}) { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) + + t.Run("ephemeral consumer with multiple filter subjects", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", "bar"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci == nil || !reflect.DeepEqual(ci.Config.FilterSubjects, []string{"foo", "bar"}) { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) + + t.Run("multiple filter subjects errors", func(t *testing.T) { + // both filter subject and filter subjects provided + _, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", "bar"}, + FilterSubject: "baz", + }) + if !errors.Is(err, nats.ErrDuplicateFilterSubjects) { + t.Fatalf("Expected: %v; got: %v", nats.ErrDuplicateFilterSubjects, err) + } + // overlapping filter subjects + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo.*", "foo.A"}, + }) + if !errors.Is(err, nats.ErrOverlappingFilterSubjects) { + t.Fatalf("Expected: %v; got: %v", nats.ErrOverlappingFilterSubjects, err) + } + // empty filter subject in filter subjects + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", ""}, + }) + if !errors.Is(err, nats.ErrEmptyFilter) { + t.Fatalf("Expected: %v; got: %v", nats.ErrEmptyFilter, err) + } + }) + + t.Run("with invalid consumer name", func(t *testing.T) { + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) + } + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test durable"}); err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) + } + }) t.Run("consumer with given name already exists, configs do not match", func(t *testing.T) { // configs do not match @@ -2064,7 +2600,7 @@ func TestJetStreamManagement(t *testing.T) { for info := range js.Consumers("foo") { infos = append(infos, info) } - if len(infos) != 6 || infos[0].Stream != "foo" { + if len(infos) != 8 || infos[0].Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", infos) } }) @@ -2076,7 +2612,7 @@ func TestJetStreamManagement(t *testing.T) { for name := range js.ConsumerNames("foo", nats.Context(ctx)) { names = append(names, name) } - if got, want := len(names), 6; got != want { + if got, want := len(names), 8; got != want { t.Fatalf("Unexpected names, got=%d, want=%d", got, want) } }) @@ -2260,6 +2796,120 @@ func TestJetStreamManagement(t *testing.T) { }) } +func TestStreamConfigMatches(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, js := jsClient(t, srv) + defer nc.Close() + + cfg := nats.StreamConfig{ + Name: "stream", + Description: "desc", + Subjects: []string{"foo.*"}, + Retention: nats.WorkQueuePolicy, + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + Discard: nats.DiscardNew, + DiscardNewPerSubject: true, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Storage: nats.MemoryStorage, + Replicas: 1, + NoAck: true, + Duplicates: 10 * time.Second, + Sealed: false, + DenyDelete: true, + DenyPurge: false, + AllowRollup: true, + Compression: nats.S2Compression, + FirstSeq: 5, + SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + RePublish: &nats.RePublish{ + Source: ">", + Destination: "RP.>", + HeadersOnly: true, + }, + AllowDirect: true, + ConsumerLimits: nats.StreamConsumerLimits{ + InactiveThreshold: 10 * time.Second, + MaxAckPending: 500, + }, + Metadata: map[string]string{ + "foo": "bar", + }, + } + + s, err := js.AddStream(&cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.Config, cfg) { + t.Fatalf("StreamConfig doesn't match: %#v", s.Config) + } + + cfgMirror := nats.StreamConfig{ + Name: "mirror", + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Replicas: 1, + Duplicates: 10 * time.Second, + Mirror: &nats.StreamSource{ + Name: "stream", + OptStartSeq: 10, + SubjectTransforms: []nats.SubjectTransformConfig{ + {Source: ">", Destination: "transformed.>"}, + }, + }, + MirrorDirect: true, + SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + } + + s, err = js.AddStream(&cfgMirror) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.Config, cfgMirror) { + t.Fatalf("StreamConfig doesn't match: %#v", s.Config) + } + + cfgSourcing := nats.StreamConfig{ + Name: "sourcing", + Subjects: []string{"BAR"}, + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Replicas: 1, + Duplicates: 10 * time.Second, + Sources: []*nats.StreamSource{ + { + Name: "stream", + OptStartSeq: 10, + SubjectTransforms: []nats.SubjectTransformConfig{ + {Source: ">", Destination: "transformed.>"}, + }, + }, + }, + SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + } + + s, err = js.AddStream(&cfgSourcing) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.Config, cfgSourcing) { + t.Fatalf("StreamConfig doesn't match: %#v", s.Config) + } +} + func TestStreamLister(t *testing.T) { tests := []struct { name string @@ -2561,7 +3211,7 @@ func TestAccountInfo(t *testing.T) { } if !reflect.DeepEqual(test.expected, info) { - t.Fatalf("Accoount info does not match; expected: %v; got: %v", test.expected, info) + t.Fatalf("Account info does not match; expected: %v; got: %v", test.expected, info) } _, err = js.AddStream(&nats.StreamConfig{Name: "FOO", MaxBytes: 1024}) if err != nil { @@ -2583,7 +3233,7 @@ func TestAccountInfo(t *testing.T) { } if !reflect.DeepEqual(test.expected, info) { - t.Fatalf("Accoount info does not match; expected: %v; got: %v", test.expected, info) + t.Fatalf("Account info does not match; expected: %v; got: %v", test.expected, info) } }) } @@ -3943,7 +4593,7 @@ func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) { t.Fatalf("Error on next msg: %v", err) } if string(msg.Data) != "bar msg 2" { - t.Fatalf("Unexepcted last message: %q", msg.Data) + t.Fatalf("Unexpected last message: %q", msg.Data) } } @@ -4004,12 +4654,11 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) { } <-ctx.Done() - sub.Drain() - if got != totalMsgs { t.Fatalf("Expected %d, got %d", totalMsgs, got) } + // check if consumer is configured properly ci, err := js.ConsumerInfo("TEST", test.name) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -4017,6 +4666,9 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) { if ci.Config.AckPolicy != test.expected { t.Fatalf("Expected %v, got %v", test.expected, ci.Config.AckPolicy) } + + // drain the subscription. This will remove the consumer + sub.Drain() }) } @@ -5233,6 +5885,70 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) { } } +func TestJetStreamSubscribe_FilterSubjects(t *testing.T) { + tests := []struct { + name string + durable string + }{ + { + name: "ephemeral consumer", + }, + { + name: "durable consumer", + durable: "cons", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + for i := 0; i < 5; i++ { + js.Publish("foo", []byte("msg")) + } + for i := 0; i < 5; i++ { + js.Publish("bar", []byte("msg")) + } + for i := 0; i < 5; i++ { + js.Publish("baz", []byte("msg")) + } + + opts := []nats.SubOpt{nats.BindStream("TEST"), nats.ConsumerFilterSubjects("foo", "baz")} + if test.durable != "" { + opts = append(opts, nats.Durable(test.durable)) + } + sub, err := js.SubscribeSync("", opts...) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + for i := 0; i < 10; i++ { + msg, err := sub.NextMsg(500 * time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if msg.Subject != "foo" && msg.Subject != "baz" { + t.Fatalf("Unexpected message subject: %s", msg.Subject) + } + } + }) + } + +} + func TestJetStreamSubscribe_ConfigCantChange(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) @@ -5421,18 +6137,21 @@ func setupJSClusterWithSize(t *testing.T, clusterName string, size int) []*jsSer nodes := make([]*jsServer, size) opts := make([]*server.Options, 0) - getAddr := func() (string, string, int) { + var activeListeners []net.Listener + getAddr := func(t *testing.T) (string, string, int) { l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { - panic(err) + t.Fatalf("Unexpected error: %v", err) } - defer l.Close() addr := l.Addr() host := addr.(*net.TCPAddr).IP.String() port := addr.(*net.TCPAddr).Port - l.Close() time.Sleep(100 * time.Millisecond) + + // we cannot close the listener immediately to avoid duplicate port binding + // the returned net.Listener has to be closed after all ports are drawn + activeListeners = append(activeListeners, l) return addr.String(), host, port } @@ -5449,11 +6168,11 @@ func setupJSClusterWithSize(t *testing.T, clusterName string, size int) []*jsSer if size > 1 { o.Cluster.Name = clusterName - _, host1, port1 := getAddr() + _, host1, port1 := getAddr(t) o.Host = host1 o.Port = port1 - addr2, host2, port2 := getAddr() + addr2, host2, port2 := getAddr(t) o.Cluster.Host = host2 o.Cluster.Port = port2 o.Tags = []string{o.ServerName} @@ -5461,6 +6180,10 @@ func setupJSClusterWithSize(t *testing.T, clusterName string, size int) []*jsSer } opts = append(opts, &o) } + // close all connections used to randomize ports + for _, l := range activeListeners { + l.Close() + } if size > 1 { routesStr := server.RoutesFromStr(strings.Join(routes, ",")) @@ -5755,17 +6478,6 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { } }) - t.Run("bind to stream with wrong subject fails", func(t *testing.T) { - _, err := js.SubscribeSync("secret", nats.BindStream("origin")) - if err == nil { - t.Fatal("Unexpected success") - } - apiErr := &nats.APIError{} - if !errors.As(err, &apiErr) || apiErr.ErrorCode != 10093 { - t.Fatalf("Expected API error 10093; got: %v", err) - } - }) - t.Run("bind to origin stream", func(t *testing.T) { // This would only avoid the stream names lookup. sub, err := js.SubscribeSync("origin", nats.BindStream("origin")) @@ -5919,103 +6631,78 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { } }) - // Commenting out this test until we figure out what was the intent. - // Since v2.8.0, this test would fail with a "detected cycle" error, - // I guess because "m1" already sources "origin", so creating a - // stream with both as a source is bad. - /* - t.Run("create sourced stream from origin", func(t *testing.T) { - sources := make([]*nats.StreamSource, 0) - sources = append(sources, &nats.StreamSource{Name: "origin"}) - sources = append(sources, &nats.StreamSource{Name: "m1"}) - streamName := "s2" - _, err = js.AddStream(&nats.StreamConfig{ - Name: streamName, - Sources: sources, - Storage: nats.FileStorage, - Replicas: 1, - }) - if err != nil { - t.Fatalf("Unexpected error creating stream: %v", err) - } - - msgs := make([]*nats.RawStreamMsg, 0) - - // Stored message sequences start at 1 - startSequence := 1 - expectedTotal := totalMsgs * 2 + t.Run("bind to stream with subject not in stream", func(t *testing.T) { + // The Stream does not have a subject called 'nothing' but client is still able + // to bind to the origin stream even though it cannot consume messages. + // After updating the stream with the subject this consumer will be able to + // match and receive messages. + sub, err := js.SubscribeSync("nothing", nats.BindStream("origin")) + if err != nil { + t.Fatal(err) + } + _, err = sub.NextMsg(1 * time.Second) + if !errors.Is(err, nats.ErrTimeout) { + t.Fatal("Expected timeout error") + } - GetNextMsg: - for i := startSequence; i < expectedTotal+1; i++ { - var ( - err error - seq = uint64(i) - msg *nats.RawStreamMsg - timeout = time.Now().Add(5 * time.Second) - ) + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + got := info.Stream + expected := "origin" + if got != expected { + t.Fatalf("Expected %v, got %v", expected, got) + } - Retry: - for time.Now().Before(timeout) { - msg, err = js.GetMsg(streamName, seq) - if err != nil { - time.Sleep(100 * time.Millisecond) - continue Retry - } - msgs = append(msgs, msg) - continue GetNextMsg - } - if err != nil { - t.Fatalf("Unexpected error fetching seq=%v: %v", seq, err) - } - } + got = info.Config.FilterSubject + expected = "nothing" + if got != expected { + t.Fatalf("Expected %v, got %v", expected, got) + } - got := len(msgs) - if got < expectedTotal { - t.Errorf("Expected %v, got: %v", expectedTotal, got) - } + t.Run("can consume after stream update", func(t *testing.T) { + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "origin", + Placement: &nats.Placement{ + Tags: []string{"NODE_0"}, + }, + Storage: nats.MemoryStorage, + Replicas: 1, + Subjects: []string{"origin", "nothing"}, + }) + js.Publish("nothing", []byte("hello world")) - si, err := js.StreamInfo(streamName) + msg, err := sub.NextMsg(1 * time.Second) if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - got = int(si.State.Msgs) - if got != expectedTotal { - t.Errorf("Expected %v, got: %v", expectedTotal, got) + t.Error(err) } - - got = len(si.Sources) - expected := 2 + got = msg.Subject + expected = "nothing" if got != expected { - t.Errorf("Expected %v, got: %v", expected, got) + t.Fatalf("Expected %v, got %v", expected, got) } - t.Run("consume from sourced stream", func(t *testing.T) { - sub, err := js.SubscribeSync("origin", nats.BindStream(streamName)) - if err != nil { - t.Fatal(err) - } + }) + }) - mmsgs := make([]*nats.Msg, 0) - for i := 0; i < totalMsgs; i++ { - msg, err := sub.NextMsg(2 * time.Second) - if err != nil { - t.Error(err) - } - meta, err := msg.Metadata() - if err != nil { - t.Error(err) - } - if meta.Stream != streamName { - t.Errorf("Expected m1, got: %v", meta.Stream) - } - mmsgs = append(mmsgs, msg) - } - if len(mmsgs) != totalMsgs { - t.Errorf("Expected to consume %v msgs, got: %v", totalMsgs, len(mmsgs)) - } - }) + t.Run("create sourced stream with a cycle", func(t *testing.T) { + // Since v2.8.0, this test would fail with a "detected cycle" error. + sources := make([]*nats.StreamSource, 0) + sources = append(sources, &nats.StreamSource{Name: "origin"}) + sources = append(sources, &nats.StreamSource{Name: "m1"}) + streamName := "s2" + _, err = js.AddStream(&nats.StreamConfig{ + Name: streamName, + Sources: sources, + Storage: nats.FileStorage, + Replicas: 1, }) - */ + var aerr *nats.APIError + if ok := errors.As(err, &aerr); !ok || aerr.ErrorCode != nats.JSStreamInvalidConfig { + t.Fatalf("Expected nats.APIError, got %v", err) + } + }) } func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { @@ -6332,7 +7019,7 @@ func testJetStream_ClusterMultipleFetchPullSubscribe(t *testing.T, subject strin gotNoMessages bool count = 0 ) - queues.Range(func(k, v interface{}) bool { + queues.Range(func(k, v any) bool { msgs := v.([]*nats.Msg) count += len(msgs) @@ -7218,6 +7905,61 @@ func TestJetStreamPublishAsync(t *testing.T) { } } +func TestPublishAsyncResetPendingOnReconnect(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + // Now create a stream and expect a PubAck from <-OK(). + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + errs := make(chan error, 1) + done := make(chan struct{}, 1) + acks := make(chan nats.PubAckFuture, 100) + go func() { + for i := 0; i < 100; i++ { + if ack, err := js.PublishAsync("FOO", []byte("hello")); err != nil { + errs <- err + return + } else { + acks <- ack + } + } + close(acks) + done <- struct{}{} + }() + select { + case <-done: + case err := <-errs: + t.Fatalf("Unexpected error during publish: %v", err) + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + s.Shutdown() + time.Sleep(100 * time.Millisecond) + if pending := js.PublishAsyncPending(); pending != 0 { + t.Fatalf("Expected no pending messages after server shutdown; got: %d", pending) + } + s = RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + for ack := range acks { + select { + case <-ack.Ok(): + case err := <-ack.Err(): + if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) { + t.Fatalf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err) + } + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + } +} + func TestJetStreamPublishAsyncPerf(t *testing.T) { // Comment out below to run this benchmark. t.SkipNow() @@ -7639,7 +8381,7 @@ func TestJetStreamDomain(t *testing.T) { } } -// Test that we properly enfore per subject msg limits. +// Test that we properly enforce per subject msg limits. func TestJetStreamMaxMsgsPerSubject(t *testing.T) { const subjectMax = 5 msc := nats.StreamConfig{ @@ -8625,7 +9367,7 @@ func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) { s := RunBasicJetStreamServer() // monitor for ErrConsumerNotActive error and suppress logging - hbMissed := make(chan struct{}) + hbMissed := make(chan struct{}, 10) errHandler := func(c *nats.Conn, s *nats.Subscription, err error) { if !errors.Is(err, nats.ErrConsumerNotActive) { t.Fatalf("Unexpected error: %v", err) @@ -8793,7 +9535,7 @@ func TestJetStreamStreamInfoAlternates(t *testing.T) { }) } -func TestJetStreamAckTokens(t *testing.T) { +func TestJetStreamSubscribeConsumerName(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) @@ -8805,201 +9547,83 @@ func TestJetStreamAckTokens(t *testing.T) { // Create the stream using our client API. _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST", - Subjects: []string{"foo"}, + Subjects: []string{"foo", "bar", "baz", "foo.*"}, }) if err != nil { t.Fatalf("Unexpected error: %v", err) } + _, err = js.Publish("foo", []byte("first")) + if err != nil { + t.Fatal(err) + } - sub, err := js.SubscribeSync("foo") + // Lookup the stream for testing. + _, err = js.StreamInfo("TEST") if err != nil { - t.Fatalf("Error on subscribe: %v", err) + t.Fatalf("stream lookup failed: %v", err) } - now := time.Now() - for _, test := range []struct { - name string - expected *nats.MsgMetadata - str string - end string - err bool - }{ - { - "valid token size but not js ack", - nil, - "1.2.3.4.5.6.7.8.9", - "", - true, - }, - { - "valid token size but not js ack", - nil, - "1.2.3.4.5.6.7.8.9.10.11.12", - "", - true, - }, - { - "invalid token size", - nil, - "$JS.ACK.3.4.5.6.7.8", - "", - true, - }, - { - "invalid token size", - nil, - "$JS.ACK.3.4.5.6.7.8.9.10", - "", - true, - }, - { - "v1 style", - &nats.MsgMetadata{ - Stream: "TEST", - Consumer: "cons", - NumDelivered: 1, - Sequence: nats.SequencePair{ - Stream: 2, - Consumer: 3, - }, - Timestamp: now, - NumPending: 4, - }, - "", - "", - false, - }, - { - "v2 style no domain with hash", - &nats.MsgMetadata{ - Stream: "TEST", - Consumer: "cons", - NumDelivered: 1, - Sequence: nats.SequencePair{ - Stream: 2, - Consumer: 3, - }, - Timestamp: now, - NumPending: 4, - }, - "_.ACCHASH.", - ".abcde", - false, - }, - { - "v2 style with domain and hash", - &nats.MsgMetadata{ - Domain: "HUB", - Stream: "TEST", - Consumer: "cons", - NumDelivered: 1, - Sequence: nats.SequencePair{ - Stream: 2, - Consumer: 3, - }, - Timestamp: now, - NumPending: 4, - }, - "HUB.ACCHASH.", - ".abcde", - false, - }, - { - "more than 12 tokens", - &nats.MsgMetadata{ - Domain: "HUB", - Stream: "TEST", - Consumer: "cons", - NumDelivered: 1, - Sequence: nats.SequencePair{ - Stream: 2, - Consumer: 3, - }, - Timestamp: now, - NumPending: 4, - }, - "HUB.ACCHASH.", - ".abcde.ghijk.lmnop", - false, - }, - } { - t.Run(test.name, func(t *testing.T) { - msg := nats.NewMsg("foo") - msg.Sub = sub - if test.err { - msg.Reply = test.str - } else { - msg.Reply = fmt.Sprintf("$JS.ACK.%sTEST.cons.1.2.3.%v.4%s", test.str, now.UnixNano(), test.end) - } - - meta, err := msg.Metadata() - if test.err { - if err == nil || meta != nil { - t.Fatalf("Expected error for content: %q, got meta=%+v err=%v", test.str, meta, err) - } - // Expected error, we are done - return - } - if err != nil { - t.Fatalf("Expected: %+v with reply: %q, got error %v", test.expected, msg.Reply, err) - } - if meta.Timestamp.UnixNano() != now.UnixNano() { - t.Fatalf("Timestamp is bad: %v vs %v", now.UnixNano(), meta.Timestamp.UnixNano()) - } - meta.Timestamp = time.Time{} - test.expected.Timestamp = time.Time{} - if !reflect.DeepEqual(test.expected, meta) { - t.Fatalf("Expected %+v, got %+v", test.expected, meta) - } - }) + sub, err := js.SubscribeSync("foo", nats.ConsumerName("my-ephemeral")) + if err != nil { + t.Fatal(err) } -} - -func TestJetStreamExpiredPullRequests(t *testing.T) { - s := RunBasicJetStreamServer() - defer shutdownJSServerAndRemoveStorage(t, s) - - nc, js := jsClient(t, s) - defer nc.Close() - - var err error - - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - }) + cinfo, err := sub.ConsumerInfo() if err != nil { - t.Fatalf("Unexpected error: %v", err) + t.Fatal(err) + } + got := cinfo.Config.Name + expected := "my-ephemeral" + if got != expected { + t.Fatalf("Expected: %v, got: %v", expected, got) + } + // Confirm that this is a durable. + got = cinfo.Config.Durable + expected = "" + if got != expected { + t.Fatalf("Expected: %v, got: %v", expected, got) + } + _, err = sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatal(err) } - sub, err := js.PullSubscribe("foo", "bar", nats.PullMaxWaiting(2)) + // ConsumerName will be ignored in case a durable name has been set. + sub, err = js.SubscribeSync("foo", nats.Durable("durable"), nats.ConsumerName("custom-name")) if err != nil { - t.Fatalf("Error on subscribe: %v", err) + t.Fatal(err) } - // Make sure that we reject batch < 1 - if _, err := sub.Fetch(0); err == nil { - t.Fatal("Expected error, did not get one") + cinfo, err = sub.ConsumerInfo() + if err != nil { + t.Fatal(err) } - if _, err := sub.Fetch(-1); err == nil { - t.Fatal("Expected error, did not get one") + got = cinfo.Config.Name + expected = "durable" + if got != expected { + t.Fatalf("Expected: %v, got: %v", expected, got) + } + got = cinfo.Config.Durable + expected = "durable" + if got != expected { + t.Fatalf("Expected: %v, got: %v", expected, got) + } + _, err = sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatal(err) } - // Send 2 fetch requests - for i := 0; i < 2; i++ { - if _, err = sub.Fetch(1, nats.MaxWait(15*time.Millisecond)); err == nil { - t.Fatalf("Expected error, got none") - } + // Default Ephemeral name should be short like in the server. + sub, err = js.SubscribeSync("foo", nats.ConsumerName("")) + if err != nil { + t.Fatal(err) } - // Wait before the above expire - time.Sleep(50 * time.Millisecond) - batches := []int{1, 10} - for _, bsz := range batches { - start := time.Now() - _, err = sub.Fetch(bsz, nats.MaxWait(250*time.Millisecond)) - dur := time.Since(start) - if err == nil || dur < 50*time.Millisecond { - t.Fatalf("Expected error and wait for 250ms, got err=%v and dur=%v", err, dur) - } + cinfo, err = sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + expectedSize := 8 + result := len(cinfo.Config.Name) + if result != expectedSize { + t.Fatalf("Expected: %v, got: %v", expectedSize, result) } } @@ -9433,6 +10057,168 @@ func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) { t.Fatalf("Expected %v messages, got only %v", total, got) } +func TestJetStreamAckTokens(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + now := time.Now() + for _, test := range []struct { + name string + expected *nats.MsgMetadata + str string + end string + err bool + }{ + { + "valid token size but not js ack", + nil, + "1.2.3.4.5.6.7.8.9", + "", + true, + }, + { + "valid token size but not js ack", + nil, + "1.2.3.4.5.6.7.8.9.10.11.12", + "", + true, + }, + { + "invalid token size", + nil, + "$JS.ACK.3.4.5.6.7.8", + "", + true, + }, + { + "invalid token size", + nil, + "$JS.ACK.3.4.5.6.7.8.9.10", + "", + true, + }, + { + "v1 style", + &nats.MsgMetadata{ + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: nats.SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "", + "", + false, + }, + { + "v2 style no domain with hash", + &nats.MsgMetadata{ + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: nats.SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "_.ACCHASH.", + ".abcde", + false, + }, + { + "v2 style with domain and hash", + &nats.MsgMetadata{ + Domain: "HUB", + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: nats.SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "HUB.ACCHASH.", + ".abcde", + false, + }, + { + "more than 12 tokens", + &nats.MsgMetadata{ + Domain: "HUB", + Stream: "TEST", + Consumer: "cons", + NumDelivered: 1, + Sequence: nats.SequencePair{ + Stream: 2, + Consumer: 3, + }, + Timestamp: now, + NumPending: 4, + }, + "HUB.ACCHASH.", + ".abcde.ghijk.lmnop", + false, + }, + } { + t.Run(test.name, func(t *testing.T) { + msg := nats.NewMsg("foo") + msg.Sub = sub + if test.err { + msg.Reply = test.str + } else { + msg.Reply = fmt.Sprintf("$JS.ACK.%sTEST.cons.1.2.3.%v.4%s", test.str, now.UnixNano(), test.end) + } + + meta, err := msg.Metadata() + if test.err { + if err == nil || meta != nil { + t.Fatalf("Expected error for content: %q, got meta=%+v err=%v", test.str, meta, err) + } + // Expected error, we are done + return + } + if err != nil { + t.Fatalf("Expected: %+v with reply: %q, got error %v", test.expected, msg.Reply, err) + } + if meta.Timestamp.UnixNano() != now.UnixNano() { + t.Fatalf("Timestamp is bad: %v vs %v", now.UnixNano(), meta.Timestamp.UnixNano()) + } + meta.Timestamp = time.Time{} + test.expected.Timestamp = time.Time{} + if !reflect.DeepEqual(test.expected, meta) { + t.Fatalf("Expected %+v, got %+v", test.expected, meta) + } + }) + } +} + func TestJetStreamTracing(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) @@ -9471,6 +10257,54 @@ func TestJetStreamTracing(t *testing.T) { } } +func TestJetStreamExpiredPullRequests(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.PullSubscribe("foo", "bar", nats.PullMaxWaiting(2)) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // Make sure that we reject batch < 1 + if _, err := sub.Fetch(0); err == nil { + t.Fatal("Expected error, did not get one") + } + if _, err := sub.Fetch(-1); err == nil { + t.Fatal("Expected error, did not get one") + } + + // Send 2 fetch requests + for i := 0; i < 2; i++ { + if _, err = sub.Fetch(1, nats.MaxWait(15*time.Millisecond)); err == nil { + t.Fatalf("Expected error, got none") + } + } + // Wait before the above expire + time.Sleep(50 * time.Millisecond) + batches := []int{1, 10} + for _, bsz := range batches { + start := time.Now() + _, err = sub.Fetch(bsz, nats.MaxWait(250*time.Millisecond)) + dur := time.Since(start) + if err == nil || dur < 50*time.Millisecond { + t.Fatalf("Expected error and wait for 250ms, got err=%v and dur=%v", err, dur) + } + } +} + func TestJetStreamSyncSubscribeWithMaxAckPending(t *testing.T) { opts := natsserver.DefaultTestOptions opts.Port = -1 diff --git a/ws_test.go b/ws_test.go index 0e7677de0..2227c82af 100644 --- a/ws_test.go +++ b/ws_test.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022 The NATS Authors +// Copyright 2021-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,7 +15,6 @@ package nats import ( "bytes" - "compress/flate" "context" "fmt" "io"