From 4a199db1b057164f1d46d4817457bae20e8c82c9 Mon Sep 17 00:00:00 2001 From: stephen-totty-hpe Date: Mon, 29 Jul 2024 17:34:22 -0400 Subject: [PATCH] implement nats_jetstream using new jetstream package Signed-off-by: stephen-totty-hpe --- protocol/nats_jetstream/v2/go.mod | 9 +- protocol/nats_jetstream/v2/go.sum | 18 +- protocol/nats_jetstream/v2/options.go | 52 ++++++ protocol/nats_jetstream/v2/protocol.go | 49 ++++++ protocol/nats_jetstream/v2/receiver.go | 166 ++++++++++++++++++- protocol/nats_jetstream/v2/sender.go | 79 ++++++++- samples/nats_jetstream/go.mod | 9 +- samples/nats_jetstream/go.sum | 18 +- test/integration/go.mod | 5 +- test/integration/go.sum | 12 +- test/integration/nats_jetstream/nats_test.go | 86 +++++++++- 11 files changed, 460 insertions(+), 43 deletions(-) diff --git a/protocol/nats_jetstream/v2/go.mod b/protocol/nats_jetstream/v2/go.mod index cfaa26b21..856b0ce36 100644 --- a/protocol/nats_jetstream/v2/go.mod +++ b/protocol/nats_jetstream/v2/go.mod @@ -6,7 +6,7 @@ replace github.com/cloudevents/sdk-go/v2 => ../../../v2 require ( github.com/cloudevents/sdk-go/v2 v2.14.0 - github.com/nats-io/nats.go v1.31.0 + github.com/nats-io/nats.go v1.36.0 ) require ( @@ -16,11 +16,12 @@ require ( github.com/klauspost/compress v1.17.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.8.4 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/protocol/nats_jetstream/v2/go.sum b/protocol/nats_jetstream/v2/go.sum index f5bf1bf7b..88ee19264 100644 --- a/protocol/nats_jetstream/v2/go.sum +++ b/protocol/nats_jetstream/v2/go.sum @@ -14,10 +14,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= +github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= @@ -30,10 +30,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/protocol/nats_jetstream/v2/options.go b/protocol/nats_jetstream/v2/options.go index f86642334..2dd854e34 100644 --- a/protocol/nats_jetstream/v2/options.go +++ b/protocol/nats_jetstream/v2/options.go @@ -9,9 +9,23 @@ import ( "errors" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber") +var ErrNoConsumerConfig = errors.New("no consumer config was given") +var ErrNoJetstream = errors.New("no jetstream implementation provided") +var ErrMoreThanOneStream = errors.New("more than one stream for given filter subjects") +var ErrMoreThanOneConsumerConfig = errors.New("more than one consumer config given") + +// ConsumerType - consumer types that have configurations defined in jetstream package +type ConsumerType int + +const ( + ConsumerType_Unknown ConsumerType = iota + ConsumerType_Ordinary + ConsumerType_Ordered +) // NatsOptions is a helper function to group a variadic nats.ProtocolOption into // []nats.Option that can be used by either Sender, Consumer or Protocol @@ -38,6 +52,14 @@ func WithSenderOptions(opts ...SenderOption) ProtocolOption { type SenderOption func(*Sender) error +// WithPublishOptions configures the Sender +func WithPublishOptions(publishOpts []jetstream.PublishOpt) SenderOption { + return func(s *Sender) error { + s.PublishOpts = publishOpts + return nil + } +} + type ConsumerOption func(*Consumer) error // WithQueueSubscriber configures the Consumer to join a queue group when subscribing @@ -50,3 +72,33 @@ func WithQueueSubscriber(queue string) ConsumerOption { return nil } } + +// WithConsumerConfig configures the Consumer with the given config +func WithConsumerConfig(consumerConfig *jetstream.ConsumerConfig) ConsumerOption { + return func(c *Consumer) error { + if c.OrderedConsumerConfig != nil { + return ErrMoreThanOneConsumerConfig + } + c.ConsumerConfig = consumerConfig + return nil + } +} + +// WithOrderedConsumerConfig configures the Consumer with the given config +func WithOrderedConsumerConfig(orderedConsumerConfig *jetstream.OrderedConsumerConfig) ConsumerOption { + return func(c *Consumer) error { + if c.ConsumerConfig != nil { + return ErrMoreThanOneConsumerConfig + } + c.OrderedConsumerConfig = orderedConsumerConfig + return nil + } +} + +// WithPullConsumeOptions configures the Consumer with the given pullConsumeOpts +func WithPullConsumeOptions(pullConsumeOpt []jetstream.PullConsumeOpt) ConsumerOption { + return func(c *Consumer) error { + c.PullConsumeOpt = pullConsumeOpt + return nil + } +} diff --git a/protocol/nats_jetstream/v2/protocol.go b/protocol/nats_jetstream/v2/protocol.go index fc8be507e..8add23d48 100644 --- a/protocol/nats_jetstream/v2/protocol.go +++ b/protocol/nats_jetstream/v2/protocol.go @@ -12,6 +12,7 @@ import ( "github.com/cloudevents/sdk-go/v2/protocol" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) // Protocol is a reference implementation for using the CloudEvents binding @@ -46,6 +47,24 @@ func NewProtocol(url, stream, sendSubject, receiveSubject string, natsOpts []nat return p, nil } +// NewProtocolV2 creates a new NATS protocol. +func NewProtocolV2(ctx context.Context, url, stream, sendSubject string, natsOpts []nats.Option, jsOpts []jetstream.JetStreamOpt, opts ...ProtocolOption) (*Protocol, error) { + conn, err := nats.Connect(url, natsOpts...) + if err != nil { + return nil, err + } + + p, err := NewProtocolFromConnV2(ctx, conn, stream, sendSubject, jsOpts, opts...) + if err != nil { + conn.Close() + return nil, err + } + + p.connOwned = true + + return p, nil +} + func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject string, jsOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ProtocolOption) (*Protocol, error) { var err error p := &Protocol{ @@ -67,6 +86,36 @@ func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject st return p, nil } +func NewProtocolFromConnV2(ctx context.Context, conn *nats.Conn, stream, sendSubject string, jsOpts []jetstream.JetStreamOpt, opts ...ProtocolOption) (*Protocol, error) { + var err error + var js jetstream.JetStream + p := &Protocol{ + Conn: conn, + } + + if err := p.applyOptions(opts...); err != nil { + return nil, err + } + + if js, err = jetstream.New(conn, jsOpts...); err != nil { + return nil, err + } + streamConfig := jetstream.StreamConfig{Name: stream, Subjects: []string{sendSubject}} + if _, err := js.CreateOrUpdateStream(ctx, streamConfig); err != nil { + return nil, err + } + + if p.Consumer, err = NewConsumerFromConnV2(ctx, conn, jsOpts, p.consumerOptions...); err != nil { + return nil, err + } + + if p.Sender, err = NewSenderFromConnV2(ctx, conn, sendSubject, jsOpts, p.senderOptions...); err != nil { + return nil, err + } + + return p, nil +} + // Send implements Sender.Send func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error { return p.Sender.Send(ctx, in, transformers...) diff --git a/protocol/nats_jetstream/v2/receiver.go b/protocol/nats_jetstream/v2/receiver.go index 75122488f..44715b36e 100644 --- a/protocol/nats_jetstream/v2/receiver.go +++ b/protocol/nats_jetstream/v2/receiver.go @@ -12,6 +12,7 @@ import ( "sync" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/protocol" @@ -38,6 +39,20 @@ func (r *Receiver) MsgHandler(msg *nats.Msg) { r.incoming <- msgErr{msg: NewMessage(msg)} } +// MsgHandlerV2 implements jetstream.MessageHandler and publishes messages onto our internal incoming channel to be delivered +// via r.Receive(ctx) +func (r *Receiver) MsgHandlerV2(msg jetstream.Msg) { + // *nats.Msg does not implement jetstream.Msg. + // jetstream.MessageHandler.Ack() returns error where nats.Msg.Ack() returns nothing + natsMessage := &nats.Msg{ + Subject: msg.Subject(), + Reply: msg.Reply(), + Header: msg.Headers(), + Data: msg.Data(), + } + r.incoming <- msgErr{msg: NewMessage(natsMessage)} +} + // Receive implements Receiver.Receive. func (r *Receiver) Receive(ctx context.Context) (binding.Message, error) { select { @@ -54,12 +69,20 @@ func (r *Receiver) Receive(ctx context.Context) (binding.Message, error) { type Consumer struct { Receiver - Conn *nats.Conn + // v1 options Jsm nats.JetStreamContext Subject string Subscriber Subscriber SubOpt []nats.SubOpt + // v2 options + JetStream jetstream.JetStream + ConsumerConfig *jetstream.ConsumerConfig + OrderedConsumerConfig *jetstream.OrderedConsumerConfig + PullConsumeOpt []jetstream.PullConsumeOpt + JetstreamConsumer jetstream.Consumer + + Conn *nats.Conn subMtx sync.Mutex internalClose chan struct{} connOwned bool @@ -82,6 +105,26 @@ func NewConsumer(url, stream, subject string, natsOpts []nats.Option, jsmOpts [] return c, err } +// NewConsumerV2 consumes from filterSubjects given in ConsumerConfig or OrderedConsumerConfig +// embedded in ConsumerOption. +// See: WithConsumerConfig(...) and WithOrderedConsumerConfig(...) +func NewConsumerV2(ctx context.Context, url string, natsOpts []nats.Option, jsOpts []jetstream.JetStreamOpt, opts ...ConsumerOption) (*Consumer, error) { + conn, err := nats.Connect(url, natsOpts...) + if err != nil { + return nil, err + } + + c, err := NewConsumerFromConnV2(ctx, conn, jsOpts, opts...) + if err != nil { + conn.Close() + return nil, err + } + + c.connOwned = true + + return c, err +} + func NewConsumerFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error) { jsm, err := conn.JetStream(jsmOpts...) if err != nil { @@ -129,14 +172,51 @@ func NewConsumerFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats return c, nil } +// NewConsumerFromConnV2 consumes from filterSubjects given in ConsumerConfig or OrderedConsumerConfig +// embedded in ConsumerOption. See: WithConsumerConfig(...) and WithOrderedConsumerConfig(...) +func NewConsumerFromConnV2(ctx context.Context, conn *nats.Conn, jsOpts []jetstream.JetStreamOpt, opts ...ConsumerOption) (*Consumer, error) { + var js jetstream.JetStream + var err error + if js, err = jetstream.New(conn, jsOpts...); err != nil { + return nil, err + } + + c := &Consumer{ + Receiver: *NewReceiver(), + Conn: conn, + JetStream: js, + internalClose: make(chan struct{}, 1), + } + + if err = c.applyOptions(opts...); err != nil { + return nil, err + } + + return c, nil +} + func (c *Consumer) OpenInbound(ctx context.Context) error { c.subMtx.Lock() defer c.subMtx.Unlock() - // Subscribe - sub, err := c.Subscriber.Subscribe(c.Jsm, c.Subject, c.MsgHandler, c.SubOpt...) - if err != nil { - return err + var sub *nats.Subscription + var consumeContext jetstream.ConsumeContext + var err error + version := c.getVersion() + switch version { + case 0: + return ErrNoJetstream + case 1: + if sub, err = c.Subscriber.Subscribe(c.Jsm, c.Subject, c.MsgHandler, c.SubOpt...); err != nil { + return err + } + case 2: + if err = c.createConsumer(ctx); err != nil { + return err + } + if consumeContext, err = c.JetstreamConsumer.Consume(c.MsgHandlerV2, c.PullConsumeOpt...); err != nil { + return err + } } // Wait until external or internal context done @@ -146,6 +226,10 @@ func (c *Consumer) OpenInbound(ctx context.Context) error { } // Finish to consume messages in the queue and close the subscription + if consumeContext != nil { + consumeContext.Drain() + return nil + } return sub.Drain() } @@ -175,6 +259,78 @@ func (c *Consumer) applyOptions(opts ...ConsumerOption) error { return nil } +// createConsumer creates a consumer based on the configured consumer config +func (c *Consumer) createConsumer(ctx context.Context) error { + var err error + var stream string + if stream, err = c.getStreamFromSubjects(ctx); err != nil { + return err + } + consumerType := c.getConsumerType() + switch consumerType { + case ConsumerType_Unknown: + return ErrNoConsumerConfig + case ConsumerType_Ordinary: + c.JetstreamConsumer, err = c.JetStream.CreateOrUpdateConsumer(ctx, stream, *c.ConsumerConfig) + case ConsumerType_Ordered: + c.JetstreamConsumer, err = c.JetStream.OrderedConsumer(ctx, stream, *c.OrderedConsumerConfig) + } + return err +} + +// getStreamFromSubjects finds the unique stream for the set of filter subjects +// If more than one stream is found, returns ErrMoreThanOneStream +func (c *Consumer) getStreamFromSubjects(ctx context.Context) (string, error) { + var subjects []string + if c.ConsumerConfig != nil && c.ConsumerConfig.FilterSubject != "" { + subjects = []string{c.ConsumerConfig.FilterSubject} + } + if c.ConsumerConfig != nil && len(c.ConsumerConfig.FilterSubjects) > 0 { + subjects = c.ConsumerConfig.FilterSubjects + } + if c.OrderedConsumerConfig != nil && len(c.OrderedConsumerConfig.FilterSubjects) > 0 { + subjects = c.OrderedConsumerConfig.FilterSubjects + } + var finalStream string + for i, subject := range subjects { + currentStream, err := c.JetStream.StreamNameBySubject(ctx, subject) + if err != nil { + return "", err + } + if i == 0 { + finalStream = currentStream + continue + } + if finalStream != currentStream { + return "", ErrMoreThanOneStream + } + } + return finalStream, nil +} + +// getVersion returns whether the consumer uses the older jsm or newer jetstream package +// 0 - None, 1 - jsm, 2 - jetstream +func (c *Consumer) getVersion() int { + if c.Jsm != nil { + return 1 + } + if c.JetStream != nil { + return 2 + } + return 0 +} + +// getConsumerType returns consumer type based on what has been configured +func (c *Consumer) getConsumerType() ConsumerType { + if c.ConsumerConfig != nil { + return ConsumerType_Ordinary + } + if c.OrderedConsumerConfig != nil { + return ConsumerType_Ordered + } + return ConsumerType_Unknown +} + var _ protocol.Opener = (*Consumer)(nil) var _ protocol.Receiver = (*Consumer)(nil) var _ protocol.Closer = (*Consumer)(nil) diff --git a/protocol/nats_jetstream/v2/sender.go b/protocol/nats_jetstream/v2/sender.go index 2ed7fa2d2..f2bd295b8 100644 --- a/protocol/nats_jetstream/v2/sender.go +++ b/protocol/nats_jetstream/v2/sender.go @@ -11,13 +11,20 @@ import ( "fmt" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/protocol" ) type Sender struct { - Jsm nats.JetStreamContext + // v1 implementation + Jsm nats.JetStreamContext + + // v2 implementation + JetStream jetstream.JetStream + PublishOpts []jetstream.PublishOpt + Conn *nats.Conn Subject string Stream string @@ -42,6 +49,24 @@ func NewSender(url, stream, subject string, natsOpts []nats.Option, jsmOpts []na return s, nil } +// NewSenderV2 creates a new protocol.Sender responsible for opening and closing the NATS connection +func NewSenderV2(ctx context.Context, url, subject string, natsOpts []nats.Option, jsOpts []jetstream.JetStreamOpt, opts ...SenderOption) (*Sender, error) { + conn, err := nats.Connect(url, natsOpts...) + if err != nil { + return nil, err + } + + s, err := NewSenderFromConnV2(ctx, conn, subject, jsOpts, opts...) + if err != nil { + conn.Close() + return nil, err + } + + s.connOwned = true + + return s, nil +} + // NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the NATS // connection to the caller func NewSenderFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.JSOpt, opts ...SenderOption) (*Sender, error) { @@ -77,6 +102,35 @@ func NewSenderFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.J return s, nil } +// NewSenderFromConnV2 creates a new protocol.Sender which leaves responsibility for opening and closing the NATS +// connection to the caller +func NewSenderFromConnV2(ctx context.Context, conn *nats.Conn, subject string, jsOpts []jetstream.JetStreamOpt, opts ...SenderOption) (*Sender, error) { + var js jetstream.JetStream + var err error + var stream string + if js, err = jetstream.New(conn, jsOpts...); err != nil { + return nil, err + } + + if stream, err = js.StreamNameBySubject(ctx, subject); err != nil { + return nil, err + } + + s := &Sender{ + JetStream: js, + Conn: conn, + Stream: stream, + Subject: subject, + } + + err = s.applyOptions(opts...) + if err != nil { + return nil, err + } + + return s, nil +} + // Close implements Sender.Sender // Sender sends messages. func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) { @@ -102,8 +156,15 @@ func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...b Header: header, } - _, err = s.Jsm.PublishMsg(natsMsg) - + version := s.getVersion() + switch version { + case 0: + return ErrNoJetstream + case 1: + _, err = s.Jsm.PublishMsg(natsMsg) + case 2: + _, err = s.JetStream.PublishMsg(ctx, natsMsg, s.PublishOpts...) + } return err } @@ -126,5 +187,17 @@ func (s *Sender) applyOptions(opts ...SenderOption) error { return nil } +// getVersion returns whether the consumer uses the older jsm or newer jetstream package +// 0 - None, 1 - jsm, 2 - jetstream +func (s *Sender) getVersion() int { + if s.Jsm != nil { + return 1 + } + if s.JetStream != nil { + return 2 + } + return 0 +} + var _ protocol.Sender = (*Sender)(nil) var _ protocol.Closer = (*Protocol)(nil) diff --git a/samples/nats_jetstream/go.mod b/samples/nats_jetstream/go.mod index 348486c3d..0d22e2c3e 100644 --- a/samples/nats_jetstream/go.mod +++ b/samples/nats_jetstream/go.mod @@ -14,14 +14,15 @@ require ( github.com/klauspost/compress v1.17.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/nats-io/nats.go v1.31.0 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nats.go v1.36.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect ) diff --git a/samples/nats_jetstream/go.sum b/samples/nats_jetstream/go.sum index bdce92605..3457e694d 100644 --- a/samples/nats_jetstream/go.sum +++ b/samples/nats_jetstream/go.sum @@ -16,10 +16,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= +github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -35,10 +35,12 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/test/integration/go.mod b/test/integration/go.mod index b80046249..7e44015e3 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -35,7 +35,7 @@ require ( github.com/eclipse/paho.golang v0.12.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.3.0 - github.com/nats-io/nats.go v1.31.0 + github.com/nats-io/nats.go v1.36.0 github.com/nats-io/stan.go v0.10.4 github.com/stretchr/testify v1.8.4 go.uber.org/atomic v1.4.0 @@ -68,7 +68,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -81,6 +81,7 @@ require ( golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/test/integration/go.sum b/test/integration/go.sum index 422913dd6..1eb53d5c1 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -117,13 +117,13 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU= -github.com/nats-io/nats-streaming-server v0.24.3 h1:uZez8jBkXscua++jaDsK7DhpSAkizdetar6yWbPMRco= +github.com/nats-io/nats-streaming-server v0.24.6 h1:iIZXuPSznnYkiy0P3L0AP9zEN9Etp+tITbbX1KKeq4Q= github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= +github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= @@ -229,6 +229,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/test/integration/nats_jetstream/nats_test.go b/test/integration/nats_jetstream/nats_test.go index 0c9d5b554..53222b2f5 100644 --- a/test/integration/nats_jetstream/nats_test.go +++ b/test/integration/nats_jetstream/nats_test.go @@ -11,6 +11,7 @@ import ( "testing" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ce_nats "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -32,6 +33,7 @@ func TestSendReceiveStructuredAndBinary(t *testing.T) { type args struct { opts []ce_nats.ProtocolOption bindingEncoding binding.Encoding + version int } tests := []struct { name string @@ -41,6 +43,7 @@ func TestSendReceiveStructuredAndBinary(t *testing.T) { name: "regular subscriber - structured", args: args{ bindingEncoding: binding.EncodingStructured, + version: 1, }, }, { @@ -52,12 +55,40 @@ func TestSendReceiveStructuredAndBinary(t *testing.T) { ), }, bindingEncoding: binding.EncodingStructured, + version: 1, + }, + }, + { + name: "pull consumer config - structured", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerOptions( + ce_nats.WithConsumerConfig(&jetstream.ConsumerConfig{ + Name: uuid.New().String(), + }), + ), + }, + bindingEncoding: binding.EncodingStructured, + version: 2, + }, + }, + { + name: "ordered consumer config - structured", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerOptions( + ce_nats.WithOrderedConsumerConfig(&jetstream.OrderedConsumerConfig{}), + ), + }, + bindingEncoding: binding.EncodingStructured, + version: 2, }, }, { name: "regular subscriber - binary", args: args{ bindingEncoding: binding.EncodingBinary, + version: 1, }, }, { name: "queue subscriber - binary", @@ -68,12 +99,38 @@ func TestSendReceiveStructuredAndBinary(t *testing.T) { ), }, bindingEncoding: binding.EncodingBinary, + version: 1, + }, + }, { + name: "pull consumer config - binary", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerOptions( + ce_nats.WithConsumerConfig(&jetstream.ConsumerConfig{ + Name: uuid.New().String(), + }), + ), + }, + bindingEncoding: binding.EncodingBinary, + version: 2, + }, + }, + { + name: "ordered consumer config - binary", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerOptions( + ce_nats.WithOrderedConsumerConfig(&jetstream.OrderedConsumerConfig{}), + ), + }, + bindingEncoding: binding.EncodingBinary, + version: 2, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cleanup, s, r := testProtocol(t, conn, tt.args.opts...) + cleanup, s, r := testProtocol(t, conn, tt.args.version, tt.args.opts...) defer cleanup() EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) { eventIn = ConvertEventExtensionsToString(t, eventIn) @@ -112,7 +169,7 @@ func testConn(t testing.TB) *nats.Conn { return conn } -func testProtocol(t testing.TB, natsConn *nats.Conn, opts ...ce_nats.ProtocolOption) (func(), bindings.Sender, +func testProtocol(t testing.TB, natsConn *nats.Conn, version int, opts ...ce_nats.ProtocolOption) (func(), bindings.Sender, bindings.Receiver) { // STAN connections actually connect to NATS, so the env var is named appropriately s := os.Getenv("TEST_NATS_SERVER") @@ -124,7 +181,20 @@ func testProtocol(t testing.TB, natsConn *nats.Conn, opts ...ce_nats.ProtocolOpt subject := stream + ".test" // use NewProtocol rather than individual Consumer and Sender since this gives us more coverage - p, err := ce_nats.NewProtocol(s, stream, subject, subject, ce_nats.NatsOptions(), []nats.JSOpt{}, []nats.SubOpt{}, opts...) + var p *ce_nats.Protocol + var err error + if version == 1 { + p, err = ce_nats.NewProtocol(s, stream, subject, subject, ce_nats.NatsOptions(), []nats.JSOpt{}, []nats.SubOpt{}, opts...) + } else { + ctx := context.Background() + p, err = ce_nats.NewProtocolV2(ctx, s, stream, subject, ce_nats.NatsOptions(), []jetstream.JetStreamOpt{}, opts...) + require.NoError(t, err) + if p.Consumer.ConsumerConfig != nil { + p.Consumer.ConsumerConfig.FilterSubjects = []string{subject} + } else if p.Consumer.OrderedConsumerConfig != nil { + p.Consumer.OrderedConsumerConfig.FilterSubjects = []string{subject} + } + } require.NoError(t, err) go func() { @@ -140,7 +210,15 @@ func testProtocol(t testing.TB, natsConn *nats.Conn, opts ...ce_nats.ProtocolOpt func BenchmarkSendReceive(b *testing.B) { conn := testConn(b) defer conn.Close() - c, s, r := testProtocol(b, conn) + c, s, r := testProtocol(b, conn, 1) + defer c() // Cleanup + test.BenchmarkSendReceive(b, s, r) +} + +func BenchmarkSendReceiveV2(b *testing.B) { + conn := testConn(b) + defer conn.Close() + c, s, r := testProtocol(b, conn, 2) defer c() // Cleanup test.BenchmarkSendReceive(b, s, r) }