From 6e449118ab47639ac7ebee4291ae20a61e9e9890 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 24 Feb 2021 16:23:15 -0700 Subject: [PATCH] [FIXED] Subscribe timeout should send a close request If an application gets a timeout on a Subscribe() call, it does not know if the server is actually going to process the request. The request could have been just processed but the client timed-out before getting the response back, or there is a bit of a backlog of requests in the server and the request will be processed after the request times out in the client. If that happens, and if the application maintains its connection, then it is possible that the server processes the subscription and keep that subscription alive, although the application does not have an handle on this subscription (the Subscribe() call has failed). On timeout, the library will now send a request to close the subscription. However, the protocol normally requires the AckInbox which is assigned by the server and sent back as part of the subscription response protocol. The library will send the request with the subscription inbox. Newer servers (v0.21.0+) will be able to handle that, older will report that no subscription was found for the subscription close request. Signed-off-by: Ivan Kozlovic --- stan_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++- sub.go | 18 +++++++++++++++++- 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/stan_test.go b/stan_test.go index e6c1d7d..a725a9b 100644 --- a/stan_test.go +++ b/stan_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2018 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2665,3 +2665,48 @@ func TestPublishAsyncTimeout(t *testing.T) { t.Fatalf("Ack handler was invoked only %v out of %v", c, total) } } + +func TestSubTimeout(t *testing.T) { + ns := natsd.RunDefaultServer() + defer ns.Shutdown() + + opts := server.GetDefaultOptions() + opts.NATSServerURL = nats.DefaultURL + opts.ID = clusterName + s := runServerWithOpts(opts) + defer s.Shutdown() + + sc, err := Connect(clusterName, clientName, ConnectWait(250*time.Millisecond)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer sc.Close() + + // Setup a subscription on the close subscription subject so we can + // check that the library sends a close request on subscribe timeout. + nc := sc.NatsConn() + scc := sc.(*conn) + scc.Lock() + subj := scc.subCloseRequests + scc.Unlock() + sub, err := nc.SubscribeSync(subj) + if err != nil { + t.Fatalf("Error on unsubscribe: %v", err) + } + + // Now shutdown STAN server just before trying to create a subscription. + s.Shutdown() + if _, err := sc.Subscribe("foo", func(_ *Msg) {}); err != ErrSubReqTimeout { + t.Fatalf("Expected %v, got %v", ErrSubReqTimeout, err) + } + // Now check that we got the subscription close request + msg, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + t.Fatalf("Error getting subscription close request: %v", err) + } + req := &pb.UnsubscribeRequest{} + req.Unmarshal(msg.Data) + if req.ClientID != clientName || req.Subject != "foo" || req.Inbox == "" { + t.Fatalf("Unexpected sub close request: %+v", req) + } +} diff --git a/sub.go b/sub.go index 454a8c2..a717e1d 100644 --- a/sub.go +++ b/sub.go @@ -1,4 +1,4 @@ -// Copyright 2016-2018 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -287,6 +287,22 @@ func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...Subs if err != nil { sub.inboxSub.Unsubscribe() if err == nats.ErrTimeout { + // On timeout, we don't know if the server got the request or + // not. So we will do best effort and send a "subscription close" + // request. However, since we don't have the AckInbox that is + // normally used to close a subscription, we will use the sub's + // inbox. Newer servers will fallback to lookup by inbox if they + // don't find the sub from the "AckInbox" lookup. + scr := &pb.UnsubscribeRequest{ + ClientID: sc.clientID, + Subject: subject, + Inbox: sub.inbox, + } + b, _ := scr.Marshal() + // Send to the subscription close request, not the unsubscribe subject. + sc.nc.Publish(sc.subCloseRequests, b) + + // Report this error to the user. err = ErrSubReqTimeout } return nil, err