From d5ed91cb926bc11b9eaca6e2a7556439e88773cc Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 8 Oct 2019 13:03:47 -0700 Subject: [PATCH] Simplify request, allow for mux subject to change Signed-off-by: Derek Collison --- context.go | 24 +------------- go.mod | 2 +- go.sum | 11 +++---- nats.go | 78 ++++++++++++++++++++++++++++++---------------- nats_test.go | 34 ++++++++++++++++++++ test/basic_test.go | 2 +- 6 files changed, 93 insertions(+), 58 deletions(-) diff --git a/context.go b/context.go index 773605f03..c921d6be7 100644 --- a/context.go +++ b/context.go @@ -43,29 +43,7 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte return nc.oldRequestWithContext(ctx, subj, data) } - // Do setup for the new style. - if nc.respMap == nil { - nc.initNewResp() - } - // Create literal Inbox and map to a chan msg. - mch := make(chan *Msg, RequestChanLen) - respInbox := nc.newRespInbox() - token := respToken(respInbox) - nc.respMap[token] = mch - createSub := nc.respMux == nil - ginbox := nc.respSub - nc.mu.Unlock() - - if createSub { - // Make sure scoped subscription is setup only once. - var err error - nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) }) - if err != nil { - return nil, err - } - } - - err := nc.PublishRequest(subj, respInbox, data) + mch, token, err := nc.createNewRequestAndSend(subj, data) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index 42b94992d..f82ceee6d 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/nats-io/nats.go require ( - github.com/nats-io/jwt v0.2.12 + github.com/nats-io/jwt v0.3.0 github.com/nats-io/nkeys v0.1.0 github.com/nats-io/nuid v1.0.1 ) diff --git a/go.sum b/go.sum index 29fd0cb2c..d34647347 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,12 @@ -github.com/nats-io/jwt v0.2.12 h1:Y3YLoJey+Q/yMk/1Ig3xhWxYXE7vNSefozkArIcnSlU= -github.com/nats-io/jwt v0.2.12/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY= -github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= +github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo= +github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0= +golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/nats.go b/nats.go index 685670b1f..81e9aa360 100644 --- a/nats.go +++ b/nats.go @@ -412,10 +412,11 @@ type Conn struct { // New style response handler respSub string // The wildcard subject + respScanf string // The scanf template to extract mux token respMux *Subscription // A single response subscription respMap map[string]chan *Msg // Request map for the response msg channels respSetup sync.Once // Ensures response subscription occurs once - respRand *rand.Rand // Used for generating suffix. + respRand *rand.Rand // Used for generating suffix } // A Subscription represents interest in a given subject. @@ -2647,9 +2648,8 @@ func (nc *Conn) publish(subj, reply string, data []byte) error { // the appropriate channel based on the last token and place // the message on the channel if possible. func (nc *Conn) respHandler(m *Msg) { - rt := respToken(m.Subject) - nc.mu.Lock() + // Just return if closed. if nc.isClosed() { nc.mu.Unlock() @@ -2657,11 +2657,19 @@ func (nc *Conn) respHandler(m *Msg) { } // Grab mch + rt := nc.respToken(m.Subject) mch := nc.respMap[rt] // Delete the key regardless, one response only. - // FIXME(dlc) - should we track responses past 1 - // just statistics wise? delete(nc.respMap, rt) + // If something went wrong and we only have one entry, use that. + // This can happen if the system rewrites the subject, e.g. js. + if mch == nil && len(nc.respMap) == 1 { + for k, v := range nc.respMap { + mch = v + delete(nc.respMap, k) + break + } + } nc.mu.Unlock() // Don't block, let Request timeout instead, mch is @@ -2685,33 +2693,22 @@ func (nc *Conn) createRespMux(respSub string) error { return err } nc.mu.Lock() + nc.respScanf = strings.Replace(respSub, "*", "%s", -1) nc.respMux = s nc.mu.Unlock() return nil } -// Request will send a request payload and deliver the response message, -// or an error, including a timeout if no message was received properly. -func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) { - if nc == nil { - return nil, ErrInvalidConnection - } - - nc.mu.Lock() - // If user wants the old style. - if nc.Opts.UseOldRequestStyle { - nc.mu.Unlock() - return nc.oldRequest(subj, data, timeout) - } - - // Do setup for the new style. +// Helper to setup and send new request style requests. Return the chan to receive the response. +func (nc *Conn) createNewRequestAndSend(subj string, data []byte) (chan *Msg, string, error) { + // Do setup for the new style if needed. if nc.respMap == nil { nc.initNewResp() } - // Create literal Inbox and map to a chan msg. + // Create new literal Inbox and map to a chan msg. mch := make(chan *Msg, RequestChanLen) respInbox := nc.newRespInbox() - token := respToken(respInbox) + token := respInbox[respInboxPrefixLen:] nc.respMap[token] = mch createSub := nc.respMux == nil ginbox := nc.respSub @@ -2722,11 +2719,33 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, var err error nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) }) if err != nil { - return nil, err + return nil, token, err } } if err := nc.PublishRequest(subj, respInbox, data); err != nil { + return nil, token, err + } + + return mch, token, nil +} + +// Request will send a request payload and deliver the response message, +// or an error, including a timeout if no message was received properly. +func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) { + if nc == nil { + return nil, ErrInvalidConnection + } + + nc.mu.Lock() + // If user wants the old style. + if nc.Opts.UseOldRequestStyle { + nc.mu.Unlock() + return nc.oldRequest(subj, data, timeout) + } + + mch, token, err := nc.createNewRequestAndSend(subj, data) + if err != nil { return nil, err } @@ -2829,9 +2848,16 @@ func (nc *Conn) NewRespInbox() string { } // respToken will return the last token of a literal response inbox -// which we use for the message channel lookup. -func respToken(respInbox string) string { - return respInbox[respInboxPrefixLen:] +// which we use for the message channel lookup. This needs to do a +// scan to protect itself against the server changing the subject. +// Lock should be held. +func (nc *Conn) respToken(respInbox string) string { + var token string + n, err := fmt.Sscanf(respInbox, nc.respScanf, &token) + if err != nil || n != 1 { + return "" + } + return token } // Subscribe will express interest in the given subject. The subject diff --git a/nats_test.go b/nats_test.go index 0eb649e30..35b4c9fca 100644 --- a/nats_test.go +++ b/nats_test.go @@ -2217,3 +2217,37 @@ func TestStatsRace(t *testing.T) { close(ch) wg.Wait() } + +func TestRequestLeaksMapEntries(t *testing.T) { + o := natsserver.DefaultTestOptions + o.Port = -1 + s := RunServerWithOptions(&o) + defer s.Shutdown() + + nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + response := []byte("I will help you") + nc.Subscribe("foo", func(m *Msg) { + nc.Publish(m.Reply, response) + }) + + for i := 0; i < 100; i++ { + msg, err := nc.Request("foo", nil, 500*time.Millisecond) + if err != nil { + t.Fatalf("Received an error on Request test: %s", err) + } + if !bytes.Equal(msg.Data, response) { + t.Fatalf("Received invalid response") + } + } + nc.mu.Lock() + num := len(nc.respMap) + nc.mu.Unlock() + if num != 0 { + t.Fatalf("Expected 0 entries in response map, got %d", num) + } +} diff --git a/test/basic_test.go b/test/basic_test.go index b73f681ba..849e80039 100644 --- a/test/basic_test.go +++ b/test/basic_test.go @@ -559,7 +559,7 @@ func TestRequestTimeout(t *testing.T) { nc := NewDefaultConnection(t) defer nc.Close() - if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err == nil { + if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err != nats.ErrTimeout { t.Fatalf("Expected to receive a timeout error") } }