Skip to content

Commit

Permalink
[IMPROVEMENT] Added client retry for async publish in legacy JetStrea…
Browse files Browse the repository at this point in the history
…m API (#1695)
  • Loading branch information
pranavmehta94 authored and piotrpio committed Dec 13, 2024
1 parent f3b6df9 commit 575c4b4
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 36 deletions.
134 changes: 98 additions & 36 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ type pubOpts struct {

// stallWait is the max wait of a async pub ack.
stallWait time.Duration

// internal option to re-use existing paf in case of retry.
pafRetry *pubAckFuture
}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand Down Expand Up @@ -633,13 +636,17 @@ type PubAckFuture interface {
}

type pubAckFuture struct {
js *js
msg *Msg
pa *PubAck
st time.Time
err error
errCh chan error
doneCh chan *PubAck
js *js
msg *Msg
pa *PubAck
st time.Time
err error
errCh chan error
doneCh chan *PubAck
retries int
maxRetries int
retryWait time.Duration
reply string
}

func (paf *pubAckFuture) Ok() <-chan *PubAck {
Expand Down Expand Up @@ -848,20 +855,30 @@ func (js *js) handleAsyncReply(m *Msg) {
js.mu.Unlock()
return
}
// Remove
delete(js.pafs, id)

// Check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
closeStc := func() {
// Check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
}
}
// Check on anyone one waiting on done status.
if js.dch != nil && len(js.pafs) == 0 {
dch := js.dch
js.dch = nil
// Defer here so error is processed and can be checked.
defer close(dch)

closeDchFn := func() func() {
var dch chan struct{}
// Check on anyone one waiting on done status.
if js.dch != nil && len(js.pafs) == 0 {
dch = js.dch
js.dch = nil
}
// Return function to close done channel which
// should be deferred so that error is processed and
// can be checked.
return func() {
if dch != nil {
close(dch)
}
}
}

doErr := func(err error) {
Expand All @@ -878,10 +895,39 @@ func (js *js) handleAsyncReply(m *Msg) {

// Process no responders etc.
if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
if paf.retries < paf.maxRetries {
paf.retries++
time.AfterFunc(paf.retryWait, func() {
js.mu.Lock()
paf := js.getPAF(id)
js.mu.Unlock()
if paf == nil {
return
}
_, err := js.PublishMsgAsync(paf.msg, pubOptFn(func(po *pubOpts) error {
po.pafRetry = paf
return nil
}))
if err != nil {
js.mu.Lock()
doErr(err)
}
})
js.mu.Unlock()
return
}
delete(js.pafs, id)
closeStc()
defer closeDchFn()()
doErr(ErrNoResponders)
return
}

//remove
delete(js.pafs, id)
closeStc()
defer closeDchFn()()

var pa pubAckResponse
if err := json.Unmarshal(m.Data, &pa); err != nil {
doErr(ErrInvalidJSAck)
Expand Down Expand Up @@ -948,6 +994,10 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
}
}

if o.rnum < 0 {
return nil, fmt.Errorf("%w: retry attempts cannot be negative", ErrInvalidArg)
}

// Timeouts and contexts do not make sense for these.
if o.ttl != 0 || o.ctx != nil {
return nil, ErrContextAndTimeout
Expand Down Expand Up @@ -975,30 +1025,42 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
}

// Reply
if m.Reply != _EMPTY_ {
paf := o.pafRetry
if paf == nil && m.Reply != _EMPTY_ {
return nil, errors.New("nats: reply subject should be empty")
}
reply := m.Reply
m.Reply = js.newAsyncReply()
defer func() { m.Reply = reply }()
var id string
var reply string

if m.Reply == _EMPTY_ {
return nil, errors.New("nats: error creating async reply handler")
}
// register new paf if not retrying
if paf == nil {
reply = js.newAsyncReply()

id := m.Reply[js.replyPrefixLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)
if reply == _EMPTY_ {
return nil, errors.New("nats: error creating async reply handler")
}

if maxPending > 0 && numPending >= maxPending {
select {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
id = reply[js.replyPrefixLen:]
paf = &pubAckFuture{msg: m, st: time.Now(), maxRetries: o.rnum, retryWait: o.rwait, reply: reply}
numPending, maxPending := js.registerPAF(id, paf)

if maxPending > 0 && numPending > maxPending {
select {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
}
}
} else {
reply = paf.reply
id = reply[js.replyPrefixLen:]
}
hdr, err := m.headerBytes()
if err != nil {
return nil, err
}
if err := js.nc.PublishMsg(m); err != nil {
if err := js.nc.publish(m.Subject, reply, hdr, m.Data); err != nil {
js.clearPAF(id)
return nil, err
}
Expand Down
87 changes: 87 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8207,6 +8207,93 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds())
}

func TestPublishAsyncRetry(t *testing.T) {
tests := []struct {
name string
pubOpts []nats.PubOpt
ackError error
pubErr error
}{
{
name: "retry until stream is ready",
pubOpts: []nats.PubOpt{
nats.RetryAttempts(10),
nats.RetryWait(100 * time.Millisecond),
},
},
{
name: "fail after max retries",
pubOpts: []nats.PubOpt{
nats.RetryAttempts(2),
nats.RetryWait(50 * time.Millisecond),
},
ackError: nats.ErrNoResponders,
},
{
name: "no retries",
pubOpts: nil,
ackError: nats.ErrNoResponders,
},
{
name: "invalid retry attempts",
pubOpts: []nats.PubOpt{
nats.RetryAttempts(-1),
},
pubErr: nats.ErrInvalidArg,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// set max pending to 1 so that we can test if retries don't cause stall
js, err := nc.JetStream(nats.PublishAsyncMaxPending(1))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

test.pubOpts = append(test.pubOpts, nats.StallWait(1*time.Nanosecond))
ack, err := js.PublishAsync("foo", []byte("hello"), test.pubOpts...)
if !errors.Is(err, test.pubErr) {
t.Fatalf("Expected error: %v; got: %v", test.pubErr, err)
}
if err != nil {
return
}
errs := make(chan error, 1)
go func() {
// create stream with delay so that publish will receive no responders
time.Sleep(300 * time.Millisecond)
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}); err != nil {
errs <- err
}
}()
select {
case <-ack.Ok():
case err := <-ack.Err():
if test.ackError != nil {
if !errors.Is(err, test.ackError) {
t.Fatalf("Expected error: %v; got: %v", test.ackError, err)
}
} else {
t.Fatalf("Unexpected ack error: %v", err)
}
case err := <-errs:
t.Fatalf("Error creating stream: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for ack")
}
})
}
}
func TestJetStreamCleanupPublisher(t *testing.T) {

t.Run("cleanup js publisher", func(t *testing.T) {
Expand Down

0 comments on commit 575c4b4

Please sign in to comment.