Skip to content

Commit

Permalink
Add internal testing build tag and test
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Oct 13, 2023
1 parent 1a13c93 commit 4065e27
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 235 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ before_script:
- golangci-lint run ./jetstream/...
script:
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi

Expand Down
2 changes: 1 addition & 1 deletion scripts/cov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
rm -rf ./cov
mkdir cov
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/nats.out . -tags=skip_no_race_tests
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -tags=skip_no_race_tests
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -tags=skip_no_race_tests,internal_testing
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/jetstream.out -coverpkg=github.com/nats-io/nats.go/jetstream ./jetstream/test -tags=skip_no_race_tests
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin -tags=skip_no_race_tests
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto -tags=skip_no_race_tests
Expand Down
313 changes: 313 additions & 0 deletions test/js_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
//go:build internal_testing
// +build internal_testing

package test

import (
"crypto/sha256"
"encoding/base64"
"fmt"
"math/rand"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/nats-io/nats.go"
)

// Need access to internals for loss testing.
func TestJetStreamOrderedConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error
_, err = js.AddStream(&nats.StreamConfig{
Name: "OBJECT",
Subjects: []string{"a"},
Storage: nats.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Will be used as start time to validate proper reset to sequence on retries.
startTime := time.Now()

// Create a sample asset.
msg := make([]byte, 1024*1024)
rand.Read(msg)
msg = []byte(base64.StdEncoding.EncodeToString(msg))
mlen, sum := len(msg), sha256.Sum256(msg)

// Now send into the stream as chunks.
const chunkSize = 1024
for i := 0; i < mlen; i += chunkSize {
var chunk []byte
if mlen-i <= chunkSize {
chunk = msg[i:]
} else {
chunk = msg[i : i+chunkSize]
}
msg := nats.NewMsg("a")
msg.Data = chunk
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}
js.PublishAsync("a", nil) // eof

select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}

// Do some tests on simple misconfigurations first.
// For ordered delivery a couple of things need to be set properly.
// Can't be durable or have ack policy that is not ack none or max deliver set.
_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.Durable("dlc"))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.AckExplicit())
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.MaxDeliver(10))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.DeliverSubject("some.subject"))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

si, err := js.StreamInfo("OBJECT")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

testConsumer := func() {
t.Helper()
var received uint32
var rmsg []byte
done := make(chan bool, 1)

cb := func(m *nats.Msg) {
// Check for eof
if len(m.Data) == 0 {
done <- true
return
}
atomic.AddUint32(&received, 1)
rmsg = append(rmsg, m.Data...)
}
// OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok.
sub, err := js.Subscribe("a", cb, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

select {
case <-done:
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1)
}
}

testSyncConsumer := func() {
t.Helper()
var received int
var rmsg []byte

// OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok.
sub, err := js.SubscribeSync("a", nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

var done bool
expires := time.Now().Add(5 * time.Second)
for time.Now().Before(expires) {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(m.Data) == 0 {
done = true
break
}
received++
rmsg = append(rmsg, m.Data...)
}
if !done {
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1)
}
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
}
}

// Now run normal test.
testConsumer()
testSyncConsumer()

// Now introduce some loss.
singleLoss := func(m *nats.Msg) *nats.Msg {
if rand.Intn(100) <= 10 && m.Header.Get("data") != "" {
nc.RemoveMsgFilter("a")
return nil
}
return m
}
nc.AddMsgFilter("a", singleLoss)
testConsumer()
nc.AddMsgFilter("a", singleLoss)
testSyncConsumer()

multiLoss := func(m *nats.Msg) *nats.Msg {
if rand.Intn(100) <= 10 && m.Header.Get("data") != "" {
return nil
}
return m
}
nc.AddMsgFilter("a", multiLoss)
testConsumer()
testSyncConsumer()

firstOnly := func(m *nats.Msg) *nats.Msg {
if meta, err := m.Metadata(); err == nil {
if meta.Sequence.Consumer == 1 {
nc.RemoveMsgFilter("a")
return nil
}
}
return m
}
nc.AddMsgFilter("a", firstOnly)
testConsumer()
nc.AddMsgFilter("a", firstOnly)
testSyncConsumer()

lastOnly := func(m *nats.Msg) *nats.Msg {
if meta, err := m.Metadata(); err == nil {
if meta.Sequence.Stream >= si.State.LastSeq-1 {
nc.RemoveMsgFilter("a")
return nil
}
}
return m
}
nc.AddMsgFilter("a", lastOnly)
testConsumer()
nc.AddMsgFilter("a", lastOnly)
testSyncConsumer()
}

func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&nats.StreamConfig{
Name: "OBJECT",
Subjects: []string{"a"},
Storage: nats.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

count := int32(0)
sub, err := js.Subscribe("a", func(m *nats.Msg) {
atomic.AddInt32(&count, 1)
}, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Ask to auto-unsub after 10 messages.
sub.AutoUnsubscribe(10)

// Set a message filter that will drop 1 message
dm := 0
singleLoss := func(m *nats.Msg) *nats.Msg {
if m.Header.Get("data") != "" {
dm++
if dm == 5 {
nc.RemoveMsgFilter("a")
return nil
}
}
return m
}
nc.AddMsgFilter("a", singleLoss)

// Now produce 20 messages
for i := 0; i < 20; i++ {
msg := nats.NewMsg("a")
msg.Data = []byte(fmt.Sprintf("msg_%d", i+1))
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}

select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}

// Wait for the subscription to be marked as invalid
deadline := time.Now().Add(time.Second)
ok := false
for time.Now().Before(deadline) {
if !sub.IsValid() {
ok = true
break
}
}
if !ok {
t.Fatalf("Subscription still valid")
}

// Wait a bit to make sure we are not receiving more than expected,
// and give a chance for the server to process the auto-unsub
// protocol.
time.Sleep(500 * time.Millisecond)

if n := atomic.LoadInt32(&count); n != 10 {
t.Fatalf("Sub should have received only 10 messages, got %v", n)
}

// Now capture the in msgs count for the connection
inMsgs := nc.Stats().InMsgs

// Send one more message and this count should not increase if the
// server had properly processed the auto-unsub after the
// reset of the ordered consumer. Use a different connection
// to send.
nc2, js2 := jsClient(t, s)
defer nc2.Close()

js2.Publish("a", []byte("should not be received"))

newInMsgs := nc.Stats().InMsgs
if inMsgs != newInMsgs {
t.Fatal("Seems that AUTO-UNSUB was not properly handled")
}
}
Loading

0 comments on commit 4065e27

Please sign in to comment.