Skip to content

Commit

Permalink
feat: add batch processing and typed message handling for JetStream
Browse files Browse the repository at this point in the history
Introduce `BatchConsumer` and `BatchHandler` for stream batch processing. Add `TypedMessage` and `TypedMessageBatch` interfaces to enable typed payload handling. Implement new test utilities for NATS server setup and additional tests for batch handling and typed messages.
  • Loading branch information
mikluko committed Jan 31, 2025
1 parent 9d7f6b3 commit 2041833
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 31 deletions.
7 changes: 5 additions & 2 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package peanats
import (
"context"
"fmt"

"github.com/mikluko/peanats/examples/protojson/api"
"github.com/mikluko/peanats/testutil"

"testing"

"github.com/nats-io/nats.go"
)

func BenchmarkServer(b *testing.B) {
ns := RunNats(b)
ns := testutil.NatsServer(b)

sc, err := nats.Connect(ns.Addr().String())
if err != nil {
Expand Down Expand Up @@ -103,7 +106,7 @@ func benchmarkTyped[RQ, RS any](b *testing.B, codec Codec, rq *RQ, rs *RS) {
req.On("Subject").Return("foo").Times(b.N)
req.On("Data").Return(rqp).Times(b.N)

rsp, _ := codec.Encode(rs)
rsp, err := codec.Encode(rs)
if err != nil {
b.Fatalf("response encode failed: %s", err)
}
Expand Down
86 changes: 86 additions & 0 deletions jetconsumer/consumerbatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package jetconsumer

import (
"context"
"errors"
"time"

"github.com/nats-io/nats.go/jetstream"
)

type JetstreamFetcher interface {
Fetch(batch int, opts ...jetstream.FetchOpt) (jetstream.MessageBatch, error)
}

type BatchConsumer struct {
BaseContext context.Context
Fetcher JetstreamFetcher
Handler BatchHandler
Executor func(func())

BatchSize int
Wait time.Duration

cancel context.CancelFunc
err error
}

func (c *BatchConsumer) Start() (err error) {
if c.Fetcher == nil {
panic("jetstream consumer is not set")
}
if c.Handler == nil {
panic("handler is not set")
}
if c.BatchSize == 0 {
panic("batch size is not set")
}

if c.BaseContext == nil {
c.BaseContext = context.Background()
}
c.BaseContext, c.cancel = context.WithCancel(c.BaseContext)

if c.Executor == nil {
c.Executor = func(f func()) { f() }
}

go c.loop()

return nil
}

func (c *BatchConsumer) Stop() {
c.cancel()
}

func (c *BatchConsumer) Error() error {
return c.err
}

func (c *BatchConsumer) loop() {
opts := make([]jetstream.FetchOpt, 0)
if c.Wait > 0 {
opts = append(opts, jetstream.FetchMaxWait(c.Wait))
}
for {
select {
case <-c.BaseContext.Done():
return
default:
batch, err := c.Fetcher.Fetch(c.BatchSize, opts...)
if errors.Is(err, jetstream.ErrNoMessages) {
continue
} else if err != nil {
c.err = err
return
}
c.Executor(func() {
err = c.Handler.Serve(c.BaseContext, batch)
if err != nil {
panic(err)
}
})
}
}
}
80 changes: 80 additions & 0 deletions jetconsumer/consumerbatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package jetconsumer

import (
"context"
"errors"
"io"
"sync"
"testing"

"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/require"

"github.com/mikluko/peanats"
"github.com/mikluko/peanats/jetmessage"
"github.com/mikluko/peanats/testutil"
)

type testMessageType struct {
Seq uint `json:"seq"`
}

func TestTypedBatchConsumer(t *testing.T) {
ns := testutil.NatsServer(t)
nc := testutil.NatsConn(t, ns)

js, err := jetstream.New(nc)
require.NoError(t, err)

stream, err := js.CreateStream(context.TODO(), jetstream.StreamConfig{
Name: t.Name(),
Subjects: []string{t.Name()},
})
require.NoError(t, err)
consumer, err := stream.CreateConsumer(context.TODO(), jetstream.ConsumerConfig{
Durable: t.Name(),
AckPolicy: jetstream.AckAllPolicy,
})
require.NoError(t, err)

wg := sync.WaitGroup{}

h := TypedBatchHandlerFunc[testMessageType](func(ctx context.Context, batch jetmessage.TypedMessageBatch[testMessageType]) error {
var last jetmessage.TypedMessage[testMessageType]
for {
msg, err := batch.Next(ctx)
if errors.Is(err, io.EOF) {
break
}
if errors.Is(err, context.Canceled) {
break
}
require.NoError(t, err)
last = msg
wg.Done()
}
if last == nil {
return nil
}
return last.Ack()
})

c := BatchConsumer{
Fetcher: consumer,
Handler: HandleBatchType[testMessageType](peanats.JsonCodec{}, h),
BatchSize: 10,
}
err = c.Start()
require.NoError(t, err)

pub := peanats.NewTypedPublisher[testMessageType](peanats.JsonCodec{}, peanats.NewPublisher(nc)).WithSubject(t.Name())
for i := 0; i < 10; i++ {
wg.Add(1)
err = pub.Publish(&testMessageType{Seq: uint(i)})
require.NoError(t, err)
}

wg.Wait()
c.Stop()
require.NoError(t, c.Error())
}
10 changes: 6 additions & 4 deletions jetconsumer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/nats-io/nats.go/jetstream"

"github.com/mikluko/peanats"
"github.com/mikluko/peanats/jetmessage"
)

type Handler interface {
Expand All @@ -18,23 +19,24 @@ type HandlerFunc func(ctx context.Context, msg jetstream.Msg) error
func (f HandlerFunc) Serve(ctx context.Context, msg jetstream.Msg) error { return f(ctx, msg) }

type TypedHandler[T any] interface {
Serve(context.Context, TypedMessage[T]) error
Serve(context.Context, jetmessage.TypedMessage[T]) error
}

type TypedHandlerFunc[T any] func(peanats.TypedRequest[T]) error

func (f TypedHandlerFunc[T]) Serve(arg peanats.TypedRequest[T]) error { return f(arg) }

func HandleType[T any](h TypedHandler[T]) Handler {
return messageHandlerImpl[T]{h}
func HandleType[T any](c peanats.Codec, h TypedHandler[T]) Handler {
return messageHandlerImpl[T]{c, h}
}

type messageHandlerImpl[T any] struct {
c peanats.Codec
h TypedHandler[T]
}

func (h messageHandlerImpl[T]) Serve(ctx context.Context, msg jetstream.Msg) (err error) {
tm, err := message[T](msg)
tm, err := jetmessage.NewMessage[T](h.c, msg)
if err != nil {
return err
}
Expand Down
44 changes: 44 additions & 0 deletions jetconsumer/handlerbatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package jetconsumer

import (
"context"

"github.com/nats-io/nats.go/jetstream"

"github.com/mikluko/peanats"
"github.com/mikluko/peanats/jetmessage"
)

type BatchHandler interface {
Serve(ctx context.Context, batch jetstream.MessageBatch) error
}

type BatchHandlerFunc func(ctx context.Context, batch jetstream.MessageBatch) error

func (f BatchHandlerFunc) Serve(ctx context.Context, batch jetstream.MessageBatch) error {
return f(ctx, batch)
}

type TypedBatchHandler[T any] interface {
Serve(context.Context, jetmessage.TypedMessageBatch[T]) error
}

type TypedBatchHandlerFunc[T any] func(context.Context, jetmessage.TypedMessageBatch[T]) error

func (f TypedBatchHandlerFunc[T]) Serve(ctx context.Context, batch jetmessage.TypedMessageBatch[T]) error {
return f(ctx, batch)
}

func HandleBatchType[T any](c peanats.Codec, h TypedBatchHandler[T]) BatchHandler {
return batchHandlerImpl[T]{c, h}
}

type batchHandlerImpl[T any] struct {
c peanats.Codec
h TypedBatchHandler[T]
}

func (h batchHandlerImpl[T]) Serve(ctx context.Context, raw jetstream.MessageBatch) (err error) {
batch := jetmessage.NewMessageBatch[T](h.c, raw)
return h.h.Serve(ctx, batch)
}
12 changes: 6 additions & 6 deletions jetconsumer/message.go → jetmessage/message.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package jetconsumer
package jetmessage

import (
"github.com/nats-io/nats.go/jetstream"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

"github.com/mikluko/peanats"
)

type TypedMessage[T any] interface {
Expand All @@ -20,10 +20,10 @@ type requestImpl[T any] struct {

func (r requestImpl[T]) Payload() *T { return r.obj }

// message is TypedMessage factory function
func message[T any](msg jetstream.Msg) (TypedMessage[T], error) {
// NewMessage is TypedMessage factory function
func NewMessage[T any](codec peanats.Codec, msg jetstream.Msg) (TypedMessage[T], error) {
obj := new(T)
err := protojson.Unmarshal(msg.Data(), any(obj).(proto.Message))
err := codec.Decode(msg.Data(), obj)
if err != nil {
return nil, err
}
Expand Down
36 changes: 36 additions & 0 deletions jetmessage/messagebatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package jetmessage

import (
"context"
"io"

"github.com/nats-io/nats.go/jetstream"

"github.com/mikluko/peanats"
)

type TypedMessageBatch[T any] interface {
jetstream.MessageBatch
Next(ctx context.Context) (TypedMessage[T], error)
}

func NewMessageBatch[T any](codec peanats.Codec, batch jetstream.MessageBatch) TypedMessageBatch[T] {
return &messageBatchImpl[T]{batch, codec}
}

type messageBatchImpl[T any] struct {
jetstream.MessageBatch
codec peanats.Codec
}

func (r *messageBatchImpl[T]) Next(ctx context.Context) (TypedMessage[T], error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case msg, ok := <-r.Messages():
if !ok {
return nil, io.EOF
}
return NewMessage[T](r.codec, msg)
}
}
56 changes: 56 additions & 0 deletions jetmessage/messagebatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package jetmessage

import (
"context"
"io"
"testing"

"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/require"

"github.com/mikluko/peanats"
"github.com/mikluko/peanats/testutil"
)

type testMessageType struct {
Seq uint `json:"seq"`
}

func TestTypedMessageBatch(t *testing.T) {
ns := testutil.NatsServer(t)
nc := testutil.NatsConn(t, ns)

js, err := jetstream.New(nc)
require.NoError(t, err)

stream, err := js.CreateStream(context.TODO(), jetstream.StreamConfig{
Name: "test-stream",
Subjects: []string{"test"},
})
require.NoError(t, err)

pub := peanats.NewTypedPublisher[testMessageType](peanats.JsonCodec{}, peanats.NewPublisher(nc)).WithSubject("test")
for i := 0; i < 10; i++ {
err = pub.Publish(&testMessageType{Seq: uint(i)})
require.NoError(t, err)
}

cons, err := stream.CreateConsumer(context.TODO(), jetstream.ConsumerConfig{
Durable: "test-consumer",
})
require.NoError(t, err)

raw, err := cons.Fetch(10)
require.NoError(t, err)

batch := NewMessageBatch[testMessageType](peanats.JsonCodec{}, raw)

for i := 0; i < 10; i++ {
msg, err := batch.Next(context.Background())
require.NoError(t, err)
require.Equal(t, uint(i), msg.Payload().Seq)
}

_, err = batch.Next(context.Background())
require.ErrorIs(t, err, io.EOF)
}
Loading

0 comments on commit 2041833

Please sign in to comment.