Skip to content

Commit

Permalink
[IMPROVED] Handle error and reset ordered consumer in Messages() (#1646)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Jun 13, 2024
1 parent 005a6f2 commit 5dbd825
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -291,15 +292,24 @@ func (s *orderedSubscription) Next() (Msg, error) {
}
continue
}

meta, err := msg.Metadata()
if err != nil {
s.consumer.errHandler(s.consumer.serial)(sub, err)
continue
return nil, err
}
serial := serialNumberFromConsumer(meta.Consumer)
if serial != s.consumer.serial {
continue
}
dseq := meta.Sequence.Consumer
if dseq != s.consumer.cursor.deliverSeq+1 {
s.consumer.errHandler(serial)(sub, errOrderedSequenceMismatch)
if err := s.consumer.reset(); err != nil {
return nil, err
}
_, err := s.consumer.currentConsumer.Messages(s.opts...)
if err != nil {
return nil, err
}
continue
}
s.consumer.cursor.deliverSeq = dseq
Expand Down Expand Up @@ -448,7 +458,11 @@ func serialNumberFromConsumer(name string) int {
if len(name) == 0 {
return 0
}
serial, err := strconv.Atoi(name[len(name)-1:])
parts := strings.Split(name, "_")
if len(parts) < 2 {
return 0
}
serial, err := strconv.Atoi(parts[len(parts)-1])
if err != nil {
return 0
}
Expand Down

0 comments on commit 5dbd825

Please sign in to comment.