Skip to content

Commit

Permalink
Add internal testing build tag and test
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Oct 13, 2023
1 parent 1a13c93 commit a6ccbf4
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 150 deletions.
215 changes: 215 additions & 0 deletions test/js_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
//go:build internal_testing
// +build internal_testing

package test

import (
"crypto/sha256"
"encoding/base64"
"math/rand"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/nats-io/nats.go"
)

// Need access to internals for loss testing.
func TestJetStreamOrderedConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error
_, err = js.AddStream(&nats.StreamConfig{
Name: "OBJECT",
Subjects: []string{"a"},
Storage: nats.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Will be used as start time to validate proper reset to sequence on retries.
startTime := time.Now()

// Create a sample asset.
msg := make([]byte, 1024*1024)
rand.Read(msg)
msg = []byte(base64.StdEncoding.EncodeToString(msg))
mlen, sum := len(msg), sha256.Sum256(msg)

// Now send into the stream as chunks.
const chunkSize = 1024
for i := 0; i < mlen; i += chunkSize {
var chunk []byte
if mlen-i <= chunkSize {
chunk = msg[i:]
} else {
chunk = msg[i : i+chunkSize]
}
msg := nats.NewMsg("a")
msg.Data = chunk
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}
js.PublishAsync("a", nil) // eof

select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}

// Do some tests on simple misconfigurations first.
// For ordered delivery a couple of things need to be set properly.
// Can't be durable or have ack policy that is not ack none or max deliver set.
_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.Durable("dlc"))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.AckExplicit())
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.MaxDeliver(10))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.DeliverSubject("some.subject"))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

si, err := js.StreamInfo("OBJECT")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

testConsumer := func() {
t.Helper()
var received uint32
var rmsg []byte
done := make(chan bool, 1)

cb := func(m *nats.Msg) {
// Check for eof
if len(m.Data) == 0 {
done <- true
return
}
atomic.AddUint32(&received, 1)
rmsg = append(rmsg, m.Data...)
}
// OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok.
sub, err := js.Subscribe("a", cb, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

select {
case <-done:
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1)
}
}

testSyncConsumer := func() {
t.Helper()
var received int
var rmsg []byte

// OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok.
sub, err := js.SubscribeSync("a", nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

var done bool
expires := time.Now().Add(5 * time.Second)
for time.Now().Before(expires) {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(m.Data) == 0 {
done = true
break
}
received++
rmsg = append(rmsg, m.Data...)
}
if !done {
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1)
}
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
}
}

// Now run normal test.
testConsumer()
testSyncConsumer()

// Now introduce some loss.
singleLoss := func(m *nats.Msg) *nats.Msg {
if rand.Intn(100) <= 10 && m.Header.Get("data") != "" {
nc.RemoveMsgFilter("a")
return nil
}
return m
}
nc.AddMsgFilter("a", singleLoss)
testConsumer()
nc.AddMsgFilter("a", singleLoss)
testSyncConsumer()

multiLoss := func(m *nats.Msg) *nats.Msg {
if rand.Intn(100) <= 10 && m.Header.Get("data") != "" {
return nil
}
return m
}
nc.AddMsgFilter("a", multiLoss)
testConsumer()
testSyncConsumer()

firstOnly := func(m *nats.Msg) *nats.Msg {
if meta, err := m.Metadata(); err == nil {
if meta.Sequence.Consumer == 1 {
nc.RemoveMsgFilter("a")
return nil
}
}
return m
}
nc.AddMsgFilter("a", firstOnly)
testConsumer()
nc.AddMsgFilter("a", firstOnly)
testSyncConsumer()

lastOnly := func(m *nats.Msg) *nats.Msg {
if meta, err := m.Metadata(); err == nil {
if meta.Sequence.Stream >= si.State.LastSeq-1 {
nc.RemoveMsgFilter("a")
return nil
}
}
return m
}
nc.AddMsgFilter("a", lastOnly)
testConsumer()
nc.AddMsgFilter("a", lastOnly)
testSyncConsumer()
}
150 changes: 0 additions & 150 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package test
import (
"context"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -9627,154 +9625,6 @@ func TestJetStreamSubscribeConsumerName(t *testing.T) {
}
}

// Need access to internals for loss testing.
func TestJetStreamOrderedConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error
_, err = js.AddStream(&nats.StreamConfig{
Name: "OBJECT",
Subjects: []string{"a"},
Storage: nats.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Will be used as start time to validate proper reset to sequence on retries.
startTime := time.Now()

// Create a sample asset.
msg := make([]byte, 1024*1024)
rand.Read(msg)
msg = []byte(base64.StdEncoding.EncodeToString(msg))
mlen, sum := len(msg), sha256.Sum256(msg)

// Now send into the stream as chunks.
const chunkSize = 1024
for i := 0; i < mlen; i += chunkSize {
var chunk []byte
if mlen-i <= chunkSize {
chunk = msg[i:]
} else {
chunk = msg[i : i+chunkSize]
}
msg := nats.NewMsg("a")
msg.Data = chunk
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}
js.PublishAsync("a", nil) // eof

select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}

// Do some tests on simple misconfigurations first.
// For ordered delivery a couple of things need to be set properly.
// Can't be durable or have ack policy that is not ack none or max deliver set.
_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.Durable("dlc"))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.AckExplicit())
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.MaxDeliver(10))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

_, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.DeliverSubject("some.subject"))
if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
t.Fatalf("Expected an error, got %v", err)
}

si, err := js.StreamInfo("OBJECT")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

testConsumer := func() {
t.Helper()
var received uint32
var rmsg []byte
done := make(chan bool, 1)

cb := func(m *nats.Msg) {
// Check for eof
if len(m.Data) == 0 {
done <- true
return
}
atomic.AddUint32(&received, 1)
rmsg = append(rmsg, m.Data...)
}
// OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok.
sub, err := js.Subscribe("a", cb, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

select {
case <-done:
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1)
}
}

testSyncConsumer := func() {
t.Helper()
var received int
var rmsg []byte

// OrderedConsumer does not need HB, it sets it on its own, but for test we override which is ok.
sub, err := js.SubscribeSync("a", nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond), nats.StartTime(startTime))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

var done bool
expires := time.Now().Add(5 * time.Second)
for time.Now().Before(expires) {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(m.Data) == 0 {
done = true
break
}
received++
rmsg = append(rmsg, m.Data...)
}
if !done {
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1)
}
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
}
}

// Now run normal test.
testConsumer()
testSyncConsumer()
}

func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down
Loading

0 comments on commit a6ccbf4

Please sign in to comment.