From 4065e270471b2c3edf639241ea226e79a616c10e Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Sat, 14 Oct 2023 00:53:56 +0200 Subject: [PATCH] Add internal testing build tag and test Signed-off-by: Piotr Piotrowski --- .travis.yml | 2 +- scripts/cov.sh | 2 +- test/js_internal_test.go | 313 +++++++++++++++++++++++++++++++++++++++ test/js_test.go | 233 ----------------------------- testing_internal.go | 26 ++++ 5 files changed, 341 insertions(+), 235 deletions(-) create mode 100644 test/js_internal_test.go create mode 100644 testing_internal.go diff --git a/.travis.yml b/.travis.yml index 368797051..1505f773d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,7 +22,7 @@ before_script: - golangci-lint run ./jetstream/... script: - go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off -- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi after_success: - if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi diff --git a/scripts/cov.sh b/scripts/cov.sh index 6c80ab9b9..80828cb16 100755 --- a/scripts/cov.sh +++ b/scripts/cov.sh @@ -4,7 +4,7 @@ rm -rf ./cov mkdir cov go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/nats.out . -tags=skip_no_race_tests -go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -tags=skip_no_race_tests +go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -tags=skip_no_race_tests,internal_testing go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/jetstream.out -coverpkg=github.com/nats-io/nats.go/jetstream ./jetstream/test -tags=skip_no_race_tests go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin -tags=skip_no_race_tests go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto -tags=skip_no_race_tests diff --git a/test/js_internal_test.go b/test/js_internal_test.go new file mode 100644 index 000000000..6ad89d994 --- /dev/null +++ b/test/js_internal_test.go @@ -0,0 +1,313 @@ +//go:build internal_testing +// +build internal_testing + +package test + +import ( + "crypto/sha256" + "encoding/base64" + "fmt" + "math/rand" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/nats-io/nats.go" +) + +// Need access to internals for loss testing. +func TestJetStreamOrderedConsumer(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + _, err = js.AddStream(&nats.StreamConfig{ + Name: "OBJECT", + Subjects: []string{"a"}, + Storage: nats.MemoryStorage, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Will be used as start time to validate proper reset to sequence on retries. + startTime := time.Now() + + // Create a sample asset. + msg := make([]byte, 1024*1024) + rand.Read(msg) + msg = []byte(base64.StdEncoding.EncodeToString(msg)) + mlen, sum := len(msg), sha256.Sum256(msg) + + // Now send into the stream as chunks. + const chunkSize = 1024 + for i := 0; i < mlen; i += chunkSize { + var chunk []byte + if mlen-i <= chunkSize { + chunk = msg[i:] + } else { + chunk = msg[i : i+chunkSize] + } + msg := nats.NewMsg("a") + msg.Data = chunk + msg.Header.Set("data", "true") + js.PublishMsgAsync(msg) + } + js.PublishAsync("a", nil) // eof + + select { + case <-js.PublishAsyncComplete(): + case <-time.After(time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Do some tests on simple misconfigurations first. + // For ordered delivery a couple of things need to be set properly. + // Can't be durable or have ack policy that is not ack none or max deliver set. + _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.Durable("dlc")) + if err == nil || !strings.Contains(err.Error(), "ordered consumer") { + t.Fatalf("Expected an error, got %v", err) + } + + _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.AckExplicit()) + if err == nil || !strings.Contains(err.Error(), "ordered consumer") { + t.Fatalf("Expected an error, got %v", err) + } + + _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.MaxDeliver(10)) + if err == nil || !strings.Contains(err.Error(), "ordered consumer") { + t.Fatalf("Expected an error, got %v", err) + } + + _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.DeliverSubject("some.subject")) + if err == nil || !strings.Contains(err.Error(), "ordered consumer") { + t.Fatalf("Expected an error, got %v", err) + } + + si, err := js.StreamInfo("OBJECT") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + testConsumer := func() { + t.Helper() + var received uint32 + var rmsg []byte + done := make(chan bool, 1) + + cb := func(m *nats.Msg) { + // Check for eof + if len(m.Data) == 0 { + done <- true + return + } + atomic.AddUint32(&received, 1) + rmsg = append(rmsg, m.Data...) + } + // OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok. + sub, err := js.Subscribe("a", cb, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + select { + case <-done: + if rsum := sha256.Sum256(rmsg); rsum != sum { + t.Fatalf("Objects do not match") + } + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1) + } + } + + testSyncConsumer := func() { + t.Helper() + var received int + var rmsg []byte + + // OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok. + sub, err := js.SubscribeSync("a", nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + var done bool + expires := time.Now().Add(5 * time.Second) + for time.Now().Before(expires) { + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(m.Data) == 0 { + done = true + break + } + received++ + rmsg = append(rmsg, m.Data...) + } + if !done { + t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1) + } + if rsum := sha256.Sum256(rmsg); rsum != sum { + t.Fatalf("Objects do not match") + } + } + + // Now run normal test. + testConsumer() + testSyncConsumer() + + // Now introduce some loss. + singleLoss := func(m *nats.Msg) *nats.Msg { + if rand.Intn(100) <= 10 && m.Header.Get("data") != "" { + nc.RemoveMsgFilter("a") + return nil + } + return m + } + nc.AddMsgFilter("a", singleLoss) + testConsumer() + nc.AddMsgFilter("a", singleLoss) + testSyncConsumer() + + multiLoss := func(m *nats.Msg) *nats.Msg { + if rand.Intn(100) <= 10 && m.Header.Get("data") != "" { + return nil + } + return m + } + nc.AddMsgFilter("a", multiLoss) + testConsumer() + testSyncConsumer() + + firstOnly := func(m *nats.Msg) *nats.Msg { + if meta, err := m.Metadata(); err == nil { + if meta.Sequence.Consumer == 1 { + nc.RemoveMsgFilter("a") + return nil + } + } + return m + } + nc.AddMsgFilter("a", firstOnly) + testConsumer() + nc.AddMsgFilter("a", firstOnly) + testSyncConsumer() + + lastOnly := func(m *nats.Msg) *nats.Msg { + if meta, err := m.Metadata(); err == nil { + if meta.Sequence.Stream >= si.State.LastSeq-1 { + nc.RemoveMsgFilter("a") + return nil + } + } + return m + } + nc.AddMsgFilter("a", lastOnly) + testConsumer() + nc.AddMsgFilter("a", lastOnly) + testSyncConsumer() +} + +func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "OBJECT", + Subjects: []string{"a"}, + Storage: nats.MemoryStorage, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + count := int32(0) + sub, err := js.Subscribe("a", func(m *nats.Msg) { + atomic.AddInt32(&count, 1) + }, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Ask to auto-unsub after 10 messages. + sub.AutoUnsubscribe(10) + + // Set a message filter that will drop 1 message + dm := 0 + singleLoss := func(m *nats.Msg) *nats.Msg { + if m.Header.Get("data") != "" { + dm++ + if dm == 5 { + nc.RemoveMsgFilter("a") + return nil + } + } + return m + } + nc.AddMsgFilter("a", singleLoss) + + // Now produce 20 messages + for i := 0; i < 20; i++ { + msg := nats.NewMsg("a") + msg.Data = []byte(fmt.Sprintf("msg_%d", i+1)) + msg.Header.Set("data", "true") + js.PublishMsgAsync(msg) + } + + select { + case <-js.PublishAsyncComplete(): + case <-time.After(time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Wait for the subscription to be marked as invalid + deadline := time.Now().Add(time.Second) + ok := false + for time.Now().Before(deadline) { + if !sub.IsValid() { + ok = true + break + } + } + if !ok { + t.Fatalf("Subscription still valid") + } + + // Wait a bit to make sure we are not receiving more than expected, + // and give a chance for the server to process the auto-unsub + // protocol. + time.Sleep(500 * time.Millisecond) + + if n := atomic.LoadInt32(&count); n != 10 { + t.Fatalf("Sub should have received only 10 messages, got %v", n) + } + + // Now capture the in msgs count for the connection + inMsgs := nc.Stats().InMsgs + + // Send one more message and this count should not increase if the + // server had properly processed the auto-unsub after the + // reset of the ordered consumer. Use a different connection + // to send. + nc2, js2 := jsClient(t, s) + defer nc2.Close() + + js2.Publish("a", []byte("should not be received")) + + newInMsgs := nc.Stats().InMsgs + if inMsgs != newInMsgs { + t.Fatal("Seems that AUTO-UNSUB was not properly handled") + } +} diff --git a/test/js_test.go b/test/js_test.go index 096c98978..28076707b 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -16,8 +16,6 @@ package test import ( "context" "crypto/rand" - "crypto/sha256" - "encoding/base64" "encoding/json" "errors" "fmt" @@ -9627,237 +9625,6 @@ func TestJetStreamSubscribeConsumerName(t *testing.T) { } } -// Need access to internals for loss testing. -func TestJetStreamOrderedConsumer(t *testing.T) { - s := RunBasicJetStreamServer() - defer shutdownJSServerAndRemoveStorage(t, s) - - nc, js := jsClient(t, s) - defer nc.Close() - - var err error - _, err = js.AddStream(&nats.StreamConfig{ - Name: "OBJECT", - Subjects: []string{"a"}, - Storage: nats.MemoryStorage, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Will be used as start time to validate proper reset to sequence on retries. - startTime := time.Now() - - // Create a sample asset. - msg := make([]byte, 1024*1024) - rand.Read(msg) - msg = []byte(base64.StdEncoding.EncodeToString(msg)) - mlen, sum := len(msg), sha256.Sum256(msg) - - // Now send into the stream as chunks. - const chunkSize = 1024 - for i := 0; i < mlen; i += chunkSize { - var chunk []byte - if mlen-i <= chunkSize { - chunk = msg[i:] - } else { - chunk = msg[i : i+chunkSize] - } - msg := nats.NewMsg("a") - msg.Data = chunk - msg.Header.Set("data", "true") - js.PublishMsgAsync(msg) - } - js.PublishAsync("a", nil) // eof - - select { - case <-js.PublishAsyncComplete(): - case <-time.After(time.Second): - t.Fatalf("Did not receive completion signal") - } - - // Do some tests on simple misconfigurations first. - // For ordered delivery a couple of things need to be set properly. - // Can't be durable or have ack policy that is not ack none or max deliver set. - _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.Durable("dlc")) - if err == nil || !strings.Contains(err.Error(), "ordered consumer") { - t.Fatalf("Expected an error, got %v", err) - } - - _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.AckExplicit()) - if err == nil || !strings.Contains(err.Error(), "ordered consumer") { - t.Fatalf("Expected an error, got %v", err) - } - - _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.MaxDeliver(10)) - if err == nil || !strings.Contains(err.Error(), "ordered consumer") { - t.Fatalf("Expected an error, got %v", err) - } - - _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.DeliverSubject("some.subject")) - if err == nil || !strings.Contains(err.Error(), "ordered consumer") { - t.Fatalf("Expected an error, got %v", err) - } - - si, err := js.StreamInfo("OBJECT") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - testConsumer := func() { - t.Helper() - var received uint32 - var rmsg []byte - done := make(chan bool, 1) - - cb := func(m *nats.Msg) { - // Check for eof - if len(m.Data) == 0 { - done <- true - return - } - atomic.AddUint32(&received, 1) - rmsg = append(rmsg, m.Data...) - } - // OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok. - sub, err := js.Subscribe("a", cb, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - - select { - case <-done: - if rsum := sha256.Sum256(rmsg); rsum != sum { - t.Fatalf("Objects do not match") - } - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1) - } - } - - testSyncConsumer := func() { - t.Helper() - var received int - var rmsg []byte - - // OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok. - sub, err := js.SubscribeSync("a", nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - - var done bool - expires := time.Now().Add(5 * time.Second) - for time.Now().Before(expires) { - m, err := sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(m.Data) == 0 { - done = true - break - } - received++ - rmsg = append(rmsg, m.Data...) - } - if !done { - t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1) - } - if rsum := sha256.Sum256(rmsg); rsum != sum { - t.Fatalf("Objects do not match") - } - } - - // Now run normal test. - testConsumer() - testSyncConsumer() -} - -func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) { - s := RunBasicJetStreamServer() - defer shutdownJSServerAndRemoveStorage(t, s) - - nc, js := jsClient(t, s) - defer nc.Close() - - var err error - - _, err = js.AddStream(&nats.StreamConfig{ - Name: "OBJECT", - Subjects: []string{"a"}, - Storage: nats.MemoryStorage, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - count := int32(0) - sub, err := js.Subscribe("a", func(m *nats.Msg) { - atomic.AddInt32(&count, 1) - }, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Ask to auto-unsub after 10 messages. - sub.AutoUnsubscribe(10) - - // Now produce 20 messages - for i := 0; i < 20; i++ { - msg := nats.NewMsg("a") - msg.Data = []byte(fmt.Sprintf("msg_%d", i+1)) - msg.Header.Set("data", "true") - js.PublishMsgAsync(msg) - } - - select { - case <-js.PublishAsyncComplete(): - case <-time.After(time.Second): - t.Fatalf("Did not receive completion signal") - } - - // Wait for the subscription to be marked as invalid - deadline := time.Now().Add(time.Second) - ok := false - for time.Now().Before(deadline) { - if !sub.IsValid() { - ok = true - break - } - } - if !ok { - t.Fatalf("Subscription still valid") - } - - // Wait a bit to make sure we are not receiving more than expected, - // and give a chance for the server to process the auto-unsub - // protocol. - time.Sleep(500 * time.Millisecond) - - if n := atomic.LoadInt32(&count); n != 10 { - t.Fatalf("Sub should have received only 10 messages, got %v", n) - } - - // Now capture the in msgs count for the connection - inMsgs := nc.Stats().InMsgs - - // Send one more message and this count should not increase if the - // server had properly processed the auto-unsub after the - // reset of the ordered consumer. Use a different connection - // to send. - nc2, js2 := jsClient(t, s) - defer nc2.Close() - - js2.Publish("a", []byte("should not be received")) - - newInMsgs := nc.Stats().InMsgs - if inMsgs != newInMsgs { - t.Fatal("Seems that AUTO-UNSUB was not properly handled") - } -} - func TestJetStreamOrderedConsumerDeleteAssets(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) diff --git a/testing_internal.go b/testing_internal.go new file mode 100644 index 000000000..f9ee9cfff --- /dev/null +++ b/testing_internal.go @@ -0,0 +1,26 @@ +//go:build internal_testing +// +build internal_testing + +package nats + +func (nc *Conn) AddMsgFilter(subject string, filter msgFilter) { + nc.subsMu.Lock() + defer nc.subsMu.Unlock() + + if nc.filters == nil { + nc.filters = make(map[string]msgFilter) + } + nc.filters[subject] = filter +} + +func (nc *Conn) RemoveMsgFilter(subject string) { + nc.subsMu.Lock() + defer nc.subsMu.Unlock() + + if nc.filters != nil { + delete(nc.filters, subject) + if len(nc.filters) == 0 { + nc.filters = nil + } + } +}