Skip to content

Commit

Permalink
[FIXED] Panic in ordered consumer (#1686)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Co-authored-by: Daniel Mack <daniel@zonque.org>
  • Loading branch information
piotrpio and zonque committed Aug 15, 2024
1 parent 7e8acc6 commit 6041d7e
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 7 deletions.
35 changes: 28 additions & 7 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ const (
consumerTypeFetch
)

var errOrderedSequenceMismatch = errors.New("sequence mismatch")
var (
errOrderedSequenceMismatch = errors.New("sequence mismatch")
errOrderedConsumerClosed = errors.New("ordered consumer closed")
)

// Consume can be used to continuously receive messages and handle them
// with the provided callback function. Consume cannot be used concurrently
Expand Down Expand Up @@ -142,6 +145,9 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
select {
case <-c.doReset:
if err := c.reset(); err != nil {
if errors.Is(err, errOrderedConsumerClosed) {
continue
}
c.errHandler(c.serial)(c.currentSub, err)
}
if c.withStopAfter {
Expand Down Expand Up @@ -173,6 +179,12 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
c.Unlock()
}
case <-sub.done:
s := sub.consumer.currentSub
if s != nil {
sub.consumer.Lock()
s.Stop()
sub.consumer.Unlock()
}
return
case msgsLeft, ok := <-c.stopAfterMsgsLeft:
if !ok {
Expand Down Expand Up @@ -276,6 +288,9 @@ func (s *orderedSubscription) Next() (Msg, error) {
s.opts[len(s.opts)-1] = StopAfter(s.consumer.stopAfter)
}
if err := s.consumer.reset(); err != nil {
if errors.Is(err, errOrderedConsumerClosed) {
return nil, ErrMsgIteratorClosed
}
return nil, err
}
cc, err := s.consumer.currentConsumer.Messages(s.opts...)
Expand All @@ -297,6 +312,9 @@ func (s *orderedSubscription) Next() (Msg, error) {
dseq := meta.Sequence.Consumer
if dseq != s.consumer.cursor.deliverSeq+1 {
if err := s.consumer.reset(); err != nil {
if errors.Is(err, errOrderedConsumerClosed) {
return nil, ErrMsgIteratorClosed
}
return nil, err
}
cc, err := s.consumer.currentConsumer.Messages(s.opts...)
Expand All @@ -318,17 +336,21 @@ func (s *orderedSubscription) Stop() {
}
s.consumer.Lock()
defer s.consumer.Unlock()
s.consumer.currentSub.Stop()
if s.consumer.currentSub != nil {
s.consumer.currentSub.Stop()
}
close(s.done)
}

func (s *orderedSubscription) Drain() {
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
return
}
s.consumer.currentConsumer.Lock()
defer s.consumer.currentConsumer.Unlock()
s.consumer.currentSub.Drain()
if s.consumer.currentSub != nil {
s.consumer.currentConsumer.Lock()
s.consumer.currentSub.Drain()
s.consumer.currentConsumer.Unlock()
}
close(s.done)
}

Expand Down Expand Up @@ -504,15 +526,14 @@ func (c *orderedConsumer) reset() error {
err = retryWithBackoff(func(attempt int) (bool, error) {
isClosed := atomic.LoadUint32(&c.subscription.closed) == 1
if isClosed {
return false, nil
return false, errOrderedConsumerClosed
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig)
if err != nil {
return true, err
}
c.currentConsumer = cons.(*pullConsumer)
return false, nil
}, backoffOpts)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ func TestKeyValueListKeys(t *testing.T) {

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
jetstream: enabled
accounts: {
A: {
Expand Down
39 changes: 39 additions & 0 deletions jetstream/test/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,45 @@ func TestOrderedConsumerConsume(t *testing.T) {
cc.Drain()
wg.Wait()
})

t.Run("stop consume during reset", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < 10; i++ {
c, err := s.OrderedConsumer(context.Background(), jetstream.OrderedConsumerConfig{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cc, err := c.Consume(func(msg jetstream.Msg) {
msg.Ack()
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := s.DeleteConsumer(context.Background(), c.CachedInfo().Name); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cc.Stop()
time.Sleep(50 * time.Millisecond)
}
})
}

func TestOrderedConsumerMessages(t *testing.T) {
Expand Down

0 comments on commit 6041d7e

Please sign in to comment.