From 5dbd825c2cf21d81bbc92cbfe5a4cb73aae7a169 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 13 Jun 2024 14:15:00 +0200 Subject: [PATCH] [IMPROVED] Handle error and reset ordered consumer in Messages() (#1646) Signed-off-by: Piotr Piotrowski --- jetstream/ordered.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/jetstream/ordered.go b/jetstream/ordered.go index 2752230d4..d4e71fafc 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -291,15 +292,24 @@ func (s *orderedSubscription) Next() (Msg, error) { } continue } + meta, err := msg.Metadata() if err != nil { - s.consumer.errHandler(s.consumer.serial)(sub, err) - continue + return nil, err } serial := serialNumberFromConsumer(meta.Consumer) + if serial != s.consumer.serial { + continue + } dseq := meta.Sequence.Consumer if dseq != s.consumer.cursor.deliverSeq+1 { - s.consumer.errHandler(serial)(sub, errOrderedSequenceMismatch) + if err := s.consumer.reset(); err != nil { + return nil, err + } + _, err := s.consumer.currentConsumer.Messages(s.opts...) + if err != nil { + return nil, err + } continue } s.consumer.cursor.deliverSeq = dseq @@ -448,7 +458,11 @@ func serialNumberFromConsumer(name string) int { if len(name) == 0 { return 0 } - serial, err := strconv.Atoi(name[len(name)-1:]) + parts := strings.Split(name, "_") + if len(parts) < 2 { + return 0 + } + serial, err := strconv.Atoi(parts[len(parts)-1]) if err != nil { return 0 }