diff --git a/stan_test.go b/stan_test.go index e6c1d7d..a725a9b 100644 --- a/stan_test.go +++ b/stan_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2018 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2665,3 +2665,48 @@ func TestPublishAsyncTimeout(t *testing.T) { t.Fatalf("Ack handler was invoked only %v out of %v", c, total) } } + +func TestSubTimeout(t *testing.T) { + ns := natsd.RunDefaultServer() + defer ns.Shutdown() + + opts := server.GetDefaultOptions() + opts.NATSServerURL = nats.DefaultURL + opts.ID = clusterName + s := runServerWithOpts(opts) + defer s.Shutdown() + + sc, err := Connect(clusterName, clientName, ConnectWait(250*time.Millisecond)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer sc.Close() + + // Setup a subscription on the close subscription subject so we can + // check that the library sends a close request on subscribe timeout. + nc := sc.NatsConn() + scc := sc.(*conn) + scc.Lock() + subj := scc.subCloseRequests + scc.Unlock() + sub, err := nc.SubscribeSync(subj) + if err != nil { + t.Fatalf("Error on unsubscribe: %v", err) + } + + // Now shutdown STAN server just before trying to create a subscription. + s.Shutdown() + if _, err := sc.Subscribe("foo", func(_ *Msg) {}); err != ErrSubReqTimeout { + t.Fatalf("Expected %v, got %v", ErrSubReqTimeout, err) + } + // Now check that we got the subscription close request + msg, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + t.Fatalf("Error getting subscription close request: %v", err) + } + req := &pb.UnsubscribeRequest{} + req.Unmarshal(msg.Data) + if req.ClientID != clientName || req.Subject != "foo" || req.Inbox == "" { + t.Fatalf("Unexpected sub close request: %+v", req) + } +} diff --git a/sub.go b/sub.go index 454a8c2..a717e1d 100644 --- a/sub.go +++ b/sub.go @@ -1,4 +1,4 @@ -// Copyright 2016-2018 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -287,6 +287,22 @@ func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...Subs if err != nil { sub.inboxSub.Unsubscribe() if err == nats.ErrTimeout { + // On timeout, we don't know if the server got the request or + // not. So we will do best effort and send a "subscription close" + // request. However, since we don't have the AckInbox that is + // normally used to close a subscription, we will use the sub's + // inbox. Newer servers will fallback to lookup by inbox if they + // don't find the sub from the "AckInbox" lookup. + scr := &pb.UnsubscribeRequest{ + ClientID: sc.clientID, + Subject: subject, + Inbox: sub.inbox, + } + b, _ := scr.Marshal() + // Send to the subscription close request, not the unsubscribe subject. + sc.nc.Publish(sc.subCloseRequests, b) + + // Report this error to the user. err = ErrSubReqTimeout } return nil, err