Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] StatusChanged for core and js subscriptions #1570

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 79 additions & 8 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,14 +608,16 @@ type Subscription struct {
// For holding information about a JetStream consumer.
jsi *jsSub

delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
draining bool
drainingComplete chan struct{}

// Type of Subscription
typ SubscriptionType
Expand Down Expand Up @@ -4410,6 +4412,65 @@ func (s *Subscription) Drain() error {
return conn.unsubscribe(s, 0, true)
}

type DrainStatus interface {
PendingMsgs() int
Draining() bool
Complete() <-chan struct{}
}

type drainStatus struct {
sub *Subscription
}

func (s *Subscription) DrainStatus() DrainStatus {
s.mu.Lock()
defer s.mu.Unlock()
return &drainStatus{
sub: s,
}
}

func (s *drainStatus) Draining() bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be part of subscription state instead sub.IsDraining maybe? like the sub companion for nc.IsDraining

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I suppose getting rid of DrainStatus() method and hanging everything off from Subscription is a good call. At first I had a different idea around structuring it and left the API it as it was. I'll change that.

if s.sub == nil {
return false
}
s.sub.mu.Lock()
defer s.sub.mu.Unlock()
return s.sub.draining
}

func (s *drainStatus) Complete() <-chan struct{} {
var drainCh chan struct{}
if s.sub == nil {
// if the subscription is nil, return a closed channel
// to avoid blocking on the caller side.
drainCh = make(chan struct{})
close(drainCh)
return drainCh
}
s.sub.mu.Lock()
defer s.sub.mu.Unlock()
if s.sub.drainingComplete == nil {
drainCh = make(chan struct{})
s.sub.drainingComplete = drainCh
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drainingComplete maybe too long, could be instead sub.drainCh like here or sub.dch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drainCh sounds good, I'll change it. dch is a bit too ambiguous for my liking.

} else {
drainCh = s.sub.drainingComplete
}
if !s.sub.draining {
// if the subscription is not draining, close the channel
// immediately to avoid blocking on the caller side.
close(drainCh)
s.sub.drainingComplete = nil
}
return drainCh
}

func (s *drainStatus) PendingMsgs() int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can get this from sub.Pending() already for both PendingMsgs abd PendingBytes, maybe not needed?

s.sub.mu.Lock()
defer s.sub.mu.Unlock()
return s.sub.pMsgs
}

// Unsubscribe will remove interest in the given subject.
//
// For a JetStream subscription, if the library has created the JetStream
Expand Down Expand Up @@ -4448,6 +4509,15 @@ func (s *Subscription) Unsubscribe() error {
// checkDrained will watch for a subscription to be fully drained
// and then remove it.
func (nc *Conn) checkDrained(sub *Subscription) {
defer func() {
sub.mu.Lock()
defer sub.mu.Unlock()
sub.draining = false
if sub.drainingComplete != nil {
close(sub.drainingComplete)
sub.drainingComplete = nil
}
}()
if nc == nil || sub == nil {
return
}
Expand Down Expand Up @@ -4557,6 +4627,7 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
}

if drainMode {
s.draining = true
go nc.checkDrained(sub)
}

Expand Down
270 changes: 270 additions & 0 deletions test/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,273 @@ func TestDrainConnDuringReconnect(t *testing.T) {
t.Fatalf("Timeout waiting for closed state for connection")
}
}

func TestDrainStatus(t *testing.T) {
t.Run("subscribe", func(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Failed to create default connection: %v", err)
}
defer nc.Close()

if nc.IsDraining() {
t.Fatalf("Expected IsDraining to be false")
}

sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
time.Sleep(10 * time.Millisecond)
})
if err != nil {
t.Fatalf("Error creating subscription; %v", err)
}
for i := 0; i < 100; i++ {
nc.Publish("foo", []byte("hello"))
}
time.Sleep(100 * time.Millisecond)
sub.Drain()
ds := sub.DrainStatus()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe later can have some helper build on top like sub.WaitDrained(ctx) error that waits for drain to complete or timeout.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good idea. Not sure I want to add it in this PR though, it's easy enough to implement it yourself and I want to be careful about adding public APIs.

In the future though, that might be useful.


if !ds.Draining() {
t.Fatalf("Expected to be draining")
}
if ds.PendingMsgs() == 0 {
t.Fatalf("Expected pending messages")
}

select {
case <-ds.Complete():
if ds.PendingMsgs() != 0 {
t.Fatalf("Expected no pending messages")
}
if ds.Draining() {
t.Fatalf("Expected to be drained")
}
case <-time.After(10 * time.Second):
t.Fatalf("Timeout waiting for drain to complete")
}
})

t.Run("subscribe sync", func(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Failed to create default connection: %v", err)
}
defer nc.Close()

if nc.IsDraining() {
t.Fatalf("Expected IsDraining to be false")
}

sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error creating subscription; %v", err)
}
for i := 0; i < 100; i++ {
nc.Publish("foo", []byte("hello"))
}
time.Sleep(100 * time.Millisecond)
sub.Drain()
ds := sub.DrainStatus()

if !ds.Draining() {
t.Fatalf("Expected to be draining")
}
if ds.PendingMsgs() == 0 {
t.Fatalf("Expected pending messages")
}
for i := 0; i < 100; i++ {
_, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
}

select {
case <-ds.Complete():
if ds.PendingMsgs() != 0 {
t.Fatalf("Expected no pending messages")
}
if ds.Draining() {
t.Fatalf("Expected to be drained")
}
case <-time.After(10 * time.Second):
t.Fatalf("Timeout waiting for drain to complete")
}
})

t.Run("chan subscribe", func(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Failed to create default connection: %v", err)
}
defer nc.Close()

if nc.IsDraining() {
t.Fatalf("Expected IsDraining to be false")
}

ch := make(chan *nats.Msg, 100)
sub, err := nc.ChanSubscribe("foo", ch)
if err != nil {
t.Fatalf("Error creating subscription; %v", err)
}
for i := 0; i < 100; i++ {
nc.Publish("foo", []byte("hello"))
}
time.Sleep(100 * time.Millisecond)
sub.Drain()
ds := sub.DrainStatus()

select {
case <-ds.Complete():
if ds.PendingMsgs() != 0 {
t.Fatalf("Expected no pending messages")
}
if ds.Draining() {
t.Fatalf("Expected to be drained")
}
case <-time.After(10 * time.Second):
t.Fatalf("Timeout waiting for drain to complete")
}

// msgs should be available on the channel
for i := 0; i < 100; i++ {
select {
case <-ch:
case <-time.After(2 * time.Second):
t.Fatalf("Timeout waiting for message")
}
}
})

t.Run("subscription not draining", func(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Failed to create default connection: %v", err)
}
defer nc.Close()

sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
time.Sleep(10 * time.Millisecond)
})
if err != nil {
t.Fatalf("Error creating subscription; %v", err)
}
ds := sub.DrainStatus()

if ds.Draining() {
t.Fatalf("Expected not to be draining")
}
if ds.PendingMsgs() != 0 {
t.Fatalf("Expected no pending messages")
}
select {
case <-ds.Complete():
default:
t.Fatalf("Expected to be complete")
}
})

t.Run("subscription already unsubscribed", func(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Failed to create default connection: %v", err)
}
defer nc.Close()

sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {})
if err != nil {
t.Fatalf("Error creating subscription; %v", err)
}
sub.Unsubscribe()
ds := sub.DrainStatus()

if ds.Draining() {
t.Fatalf("Expected not to be draining")
}
if ds.PendingMsgs() != 0 {
t.Fatalf("Expected no pending messages")
}

select {
case <-ds.Complete():
default:
t.Fatalf("Expected to be complete")
}
})
}

func TestDrainStatus_ConcurrentComplete(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Failed to create default connection: %v", err)
}
defer nc.Close()

sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
time.Sleep(20 * time.Millisecond)
})
if err != nil {
t.Fatalf("Error creating subscription; %v", err)
}
for i := 0; i < 100; i++ {
nc.Publish("foo", []byte("hello"))
}
time.Sleep(100 * time.Millisecond)
sub.Drain()
wg := sync.WaitGroup{}
wg.Add(10)
done := make(chan struct{})
errs := make(chan error)
for i := 0; i < 10; i++ {
go func() {
ds := sub.DrainStatus()
<-ds.Complete()
if ds.PendingMsgs() != 0 {
errs <- fmt.Errorf("Expected no pending messages")
}
if ds.Draining() {
errs <- fmt.Errorf("Expected to be drained")
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(done)
}()

// if !ds.Draining() {
// t.Fatalf("Expected to be draining")
// }
// if ds.PendingMsgs() == 0 {
// t.Fatalf("Expected pending messages")
// }

// Now test concurrent calls to DrainStatus
select {
case <-done:
case err := <-errs:
t.Fatal(err)
case <-time.After(10 * time.Second):
t.Fatalf("Timeout waiting for drain to complete")
}
}
Loading