From f70c2349e8a3b9347489e7cb341204d6e17223b5 Mon Sep 17 00:00:00 2001 From: stephen-totty-hpe Date: Fri, 18 Oct 2024 11:21:42 -0400 Subject: [PATCH] Add NATS Ack/Nak to nats jetstream V3 Finish Signed-off-by: stephen-totty-hpe --- protocol/nats_jetstream/v3/message.go | 24 ++++++ protocol/nats_jetstream/v3/message_test.go | 93 +++++++++++++++++++++- protocol/nats_jetstream/v3/options.go | 2 +- protocol/nats_jetstream/v3/options_test.go | 2 +- protocol/nats_jetstream/v3/protocol.go | 6 +- 5 files changed, 121 insertions(+), 6 deletions(-) diff --git a/protocol/nats_jetstream/v3/message.go b/protocol/nats_jetstream/v3/message.go index ff29c7e5..8b97597f 100644 --- a/protocol/nats_jetstream/v3/message.go +++ b/protocol/nats_jetstream/v3/message.go @@ -17,6 +17,7 @@ import ( "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/protocol" ) const ( @@ -105,6 +106,29 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) // Finish *must* be called when message from a Receiver can be forgotten by the receiver. func (m *Message) Finish(err error) error { + // Ack and Nak first checks to see if the message has been acknowleged + // and if Ack/Nak was done, it immediately returns an error without applying any logic to the message on the server. + // Nak will only be sent if the error given is explictly a NACK error(protocol.ResultNACK). + // AckPolicy effects if an explict Ack/Nak is needed. + // AckExplicit: The default policy. Each individual message must be acknowledged. + // Recommended for most reliability and functionality. + // AckNone: No acknowledgment needed; the server assumes acknowledgment on delivery. + // AckAll: Acknowledge only the last message received in a series; all previous messages are automatically acknowledged. + // Will acknowledge all pending messages for all subscribers for Pull Consumer. + // see: github.com/nats-io/nats.go/jetstream/ConsumerConfig.AckPolicy + if m.Msg == nil { + return nil + } + if protocol.IsNACK(err) { + if err = m.Msg.Nak(); err != jetstream.ErrMsgAlreadyAckd { + return err + } + } + if protocol.IsACK(err) { + if err = m.Msg.Ack(); err != jetstream.ErrMsgAlreadyAckd { + return err + } + } return nil } diff --git a/protocol/nats_jetstream/v3/message_test.go b/protocol/nats_jetstream/v3/message_test.go index d532075e..ba4cdef3 100644 --- a/protocol/nats_jetstream/v3/message_test.go +++ b/protocol/nats_jetstream/v3/message_test.go @@ -9,11 +9,13 @@ import ( "bytes" "context" "encoding/json" + "errors" "reflect" "testing" "github.com/cloudevents/sdk-go/v2/binding/spec" bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" + "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/test" @@ -23,11 +25,15 @@ import ( type jetStreamMsg struct { jetstream.Msg - msg *nats.Msg + msg *nats.Msg + ackCalled bool + nackCalled bool } func (j *jetStreamMsg) Data() []byte { return j.msg.Data } func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header } +func (j *jetStreamMsg) Ack() error { j.ackCalled = true; return nil } +func (j *jetStreamMsg) Nak() error { j.nackCalled = true; return nil } var ( outBinaryMessage = bindingtest.MockBinaryMessage{ @@ -190,3 +196,88 @@ func TestGetExtension(t *testing.T) { }) } } + +func TestFinish(t *testing.T) { + type args struct { + err error + } + type wants struct { + err error + ackCalled bool + nackCalled bool + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "nil error given", + args: args{ + err: nil, + }, + wants: wants{ + err: nil, + ackCalled: true, + nackCalled: false, + }, + }, + { + name: "ACK error given", + args: args{ + err: protocol.ResultACK, + }, + wants: wants{ + err: nil, + ackCalled: true, + nackCalled: false, + }, + }, + { + name: "NACK error given", + args: args{ + err: protocol.ResultNACK, + }, + wants: wants{ + err: nil, + ackCalled: false, + nackCalled: true, + }, + }, + { + name: "unknown error given", + args: args{ + err: errors.New("unknown"), + }, + wants: wants{ + err: nil, + ackCalled: false, + nackCalled: false, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + binaryReceiverMessage.ackCalled = false + binaryReceiverMessage.nackCalled = false + message := NewMessage(binaryReceiverMessage) + if message == nil { + t.Errorf("Error in NewMessage!") + } + gotErr := message.Finish(tt.args.err) + if gotErr != tt.wants.err { + t.Errorf("ExpectedErr %s, while got %s", tt.wants.err, gotErr) + } + var mockMessage *jetStreamMsg + if message != nil { + mockMessage = message.Msg.(*jetStreamMsg) + } + if mockMessage.ackCalled != tt.wants.ackCalled { + t.Errorf("ExpectedAck %t, while got %t", tt.wants.ackCalled, mockMessage.ackCalled) + } + if mockMessage.nackCalled != tt.wants.nackCalled { + t.Errorf("ExpectedNack %t, while got %t", tt.wants.nackCalled, mockMessage.nackCalled) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/options.go b/protocol/nats_jetstream/v3/options.go index dadbd69c..8ebcba59 100644 --- a/protocol/nats_jetstream/v3/options.go +++ b/protocol/nats_jetstream/v3/options.go @@ -42,7 +42,7 @@ func WithConnection(conn *nats.Conn) ProtocolOption { // WithJetStreamOptions sets jetstream options used in the protocol sender and receiver func WithJetStreamOptions(jetStreamOpts []jetstream.JetStreamOpt) ProtocolOption { return func(p *Protocol) error { - p.jetSteamOpts = jetStreamOpts + p.jetStreamOpts = jetStreamOpts return nil } } diff --git a/protocol/nats_jetstream/v3/options_test.go b/protocol/nats_jetstream/v3/options_test.go index d974f41b..1103d6a2 100644 --- a/protocol/nats_jetstream/v3/options_test.go +++ b/protocol/nats_jetstream/v3/options_test.go @@ -474,7 +474,7 @@ func TestWithJetStreamOptions(t *testing.T) { wants: wants{ err: nil, protocol: &Protocol{ - jetSteamOpts: jetStreamOpts, + jetStreamOpts: jetStreamOpts, }, }, }, diff --git a/protocol/nats_jetstream/v3/protocol.go b/protocol/nats_jetstream/v3/protocol.go index 2babfc42..e78b5000 100644 --- a/protocol/nats_jetstream/v3/protocol.go +++ b/protocol/nats_jetstream/v3/protocol.go @@ -29,8 +29,8 @@ type Protocol struct { natsOpts []nats.Option // jetstream options - jetSteamOpts []jetstream.JetStreamOpt - jetStream jetstream.JetStream + jetStreamOpts []jetstream.JetStreamOpt + jetStream jetstream.JetStream // receiver incoming chan msgErr @@ -76,7 +76,7 @@ func New(ctx context.Context, opts ...ProtocolOption) (*Protocol, error) { } } - if p.jetStream, errConnection = jetstream.New(p.conn, p.jetSteamOpts...); errConnection != nil { + if p.jetStream, errConnection = jetstream.New(p.conn, p.jetStreamOpts...); errConnection != nil { return nil, errConnection }