Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] StatusChanged for core and js subscriptions #1570

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 140 additions & 10 deletions nats.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2023 The NATS Authors
// Copyright 2012-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -608,14 +608,17 @@ type Subscription struct {
// For holding information about a JetStream consumer.
jsi *jsSub

delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
draining bool
status SubStatus
statListeners map[chan SubStatus][]SubStatus

// Type of Subscription
typ SubscriptionType
Expand All @@ -636,6 +639,30 @@ type Subscription struct {
dropped int
}

// Status represents the state of the connection.
type SubStatus int

const (
SubscriptionActive = SubStatus(iota)
SubscriptionDraining
SubscriptionClosed
SubscriptionSlowConsumer
)

func (s SubStatus) String() string {
switch s {
case SubscriptionActive:
return "Active"
case SubscriptionDraining:
return "Draining"
case SubscriptionClosed:
return "Closed"
case SubscriptionSlowConsumer:
return "SlowConsumer"
}
return "unknown status"
}

// Msg represents a message delivered by NATS. This structure is used
// by Subscribers and PublishMsg().
//
Expand Down Expand Up @@ -3292,6 +3319,9 @@ func (nc *Conn) processMsg(data []byte) {
}

// Clear any SlowConsumer status.
if sub.sc {
sub.changeSubStatus(SubscriptionActive)
}
sub.sc = false
sub.mu.Unlock()

Expand All @@ -3315,8 +3345,9 @@ slowConsumer:
sub.pMsgs--
sub.pBytes -= len(m.Data)
}
sub.mu.Unlock()
if sc {
sub.changeSubStatus(SubscriptionSlowConsumer)
sub.mu.Unlock()
// Now we need connection's lock and we may end-up in the situation
// that we were trying to avoid, except that in this case, the client
// is already experiencing client-side slow consumer situation.
Expand All @@ -3326,6 +3357,8 @@ slowConsumer:
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrSlowConsumer) })
}
nc.mu.Unlock()
} else {
sub.mu.Unlock()
}
}

Expand Down Expand Up @@ -4298,6 +4331,7 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
nc.kickFlusher()
}

sub.changeSubStatus(SubscriptionActive)
return sub, nil
}

Expand Down Expand Up @@ -4341,6 +4375,7 @@ func (nc *Conn) removeSub(s *Subscription) {
}
// Mark as invalid
s.closed = true
s.changeSubStatus(SubscriptionClosed)
Jarema marked this conversation as resolved.
Show resolved Hide resolved
if s.pCond != nil {
s.pCond.Broadcast()
}
Expand Down Expand Up @@ -4410,6 +4445,91 @@ func (s *Subscription) Drain() error {
return conn.unsubscribe(s, 0, true)
}

// IsDraining returns a boolean indicating whether the subscription
// is being drained.
// This will return false if the subscription has already been closed.
func (s *Subscription) IsDraining() bool {
if s == nil {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
return s.draining
}

// StatusChanged returns a channel on which given list of subscription status
// changes will be sent. If no status is provided, all status changes will be sent.
// Available statuses are SubscriptionActive, SubscriptionDraining, SubscriptionClosed,
// and SubscriptionSlowConsumer.
// The returned channel will be closed when the subscription is closed.
func (s *Subscription) StatusChanged(statuses ...SubStatus) <-chan SubStatus {
if len(statuses) == 0 {
statuses = []SubStatus{SubscriptionActive, SubscriptionDraining, SubscriptionClosed, SubscriptionSlowConsumer}
}
ch := make(chan SubStatus, 10)
for _, status := range statuses {
s.registerStatusChangeListener(status, ch)
// initial status
if status == s.status {
ch <- status
}
}
return ch
}

// registerStatusChangeListener registers a channel waiting for a specific status change event.
// Status change events are non-blocking - if no receiver is waiting for the status change,
// it will not be sent on the channel. Closed channels are ignored.
func (s *Subscription) registerStatusChangeListener(status SubStatus, ch chan SubStatus) {
s.mu.Lock()
defer s.mu.Unlock()
if s.statListeners == nil {
s.statListeners = make(map[chan SubStatus][]SubStatus)
}
if _, ok := s.statListeners[ch]; !ok {
s.statListeners[ch] = make([]SubStatus, 0)
}
s.statListeners[ch] = append(s.statListeners[ch], status)
}

// sendStatusEvent sends subscription status event to all channels.
// If there is no listener, sendStatusEvent
// will not block. Lock should be held entering.
func (s *Subscription) sendStatusEvent(status SubStatus) {
for ch, statuses := range s.statListeners {
if !containsStatus(statuses, status) {
continue
}
// only send event if someone's listening
select {
case ch <- status:
default:
}
if status == SubscriptionClosed {
close(ch)
}
}
}

func containsStatus(statuses []SubStatus, status SubStatus) bool {
for _, s := range statuses {
if s == status {
return true
}
}
return false
}

// changeSubStatus changes subscription status and sends events
// to all listeners. Lock should be held entering.
func (s *Subscription) changeSubStatus(status SubStatus) {
if s == nil {
return
}
s.sendStatusEvent(status)
s.status = status
}

// Unsubscribe will remove interest in the given subject.
//
// For a JetStream subscription, if the library has created the JetStream
Expand Down Expand Up @@ -4448,6 +4568,11 @@ func (s *Subscription) Unsubscribe() error {
// checkDrained will watch for a subscription to be fully drained
// and then remove it.
func (nc *Conn) checkDrained(sub *Subscription) {
defer func() {
sub.mu.Lock()
defer sub.mu.Unlock()
sub.draining = false
}()
if nc == nil || sub == nil {
return
}
Expand Down Expand Up @@ -4557,6 +4682,10 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
}

if drainMode {
s.mu.Lock()
s.draining = true
sub.changeSubStatus(SubscriptionDraining)
s.mu.Unlock()
go nc.checkDrained(sub)
}

Expand Down Expand Up @@ -4659,6 +4788,7 @@ func (s *Subscription) validateNextMsgState(pullSubInternal bool) error {
return ErrSyncSubRequired
}
if s.sc {
s.changeSubStatus(SubscriptionActive)
s.sc = false
return ErrSlowConsumer
}
Expand Down
9 changes: 8 additions & 1 deletion test/drain_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2023 The NATS Authors
// Copyright 2018-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -55,6 +55,9 @@ func TestDrain(t *testing.T) {

// Drain it and make sure we receive all messages.
sub.Drain()
if !sub.IsDraining() {
t.Fatalf("Expected to be draining")
}
select {
case <-done:
break
Expand All @@ -64,6 +67,10 @@ func TestDrain(t *testing.T) {
t.Fatalf("Did not receive all messages: %d of %d", r, expected)
}
}
time.Sleep(100 * time.Millisecond)
if sub.IsDraining() {
t.Fatalf("Expected to be done draining")
}
}

func TestDrainQueueSub(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/js_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2023 The NATS Authors
// Copyright 2020-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
Loading