diff --git a/README.md b/README.md index 1e944316d..83cbbd27d 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ When using or transitioning to Go modules support: ```bash # Go client latest or explicit version go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.9.0 +go get github.com/nats-io/nats.go/@v1.9.1 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 diff --git a/nats.go b/nats.go index 756baa7cb..911df590e 100644 --- a/nats.go +++ b/nats.go @@ -45,7 +45,7 @@ import ( // Default Constants const ( - Version = "1.9.0" + Version = "1.9.1" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -2656,14 +2656,18 @@ func (nc *Conn) respHandler(m *Msg) { return } + var mch chan *Msg + // Grab mch rt := nc.respToken(m.Subject) - mch := nc.respMap[rt] - // Delete the key regardless, one response only. - delete(nc.respMap, rt) - // If something went wrong and we only have one entry, use that. - // This can happen if the system rewrites the subject, e.g. js. - if mch == nil && len(nc.respMap) == 1 { + if rt != _EMPTY_ { + mch = nc.respMap[rt] + // Delete the key regardless, one response only. + delete(nc.respMap, rt) + } else if len(nc.respMap) == 1 { + // If the server has rewritten the subject, the response token (rt) + // will not match (could be the case with JetStream). If that is the + // case and there is a single entry, use that. for k, v := range nc.respMap { mch = v delete(nc.respMap, k) diff --git a/nats_test.go b/nats_test.go index f5edec8a1..f8ba7a0ff 100644 --- a/nats_test.go +++ b/nats_test.go @@ -2264,3 +2264,47 @@ func TestRequestLeaksMapEntries(t *testing.T) { t.Fatalf("Expected 0 entries in response map, got %d", num) } } + +func TestRequestMultipleReplies(t *testing.T) { + o := natsserver.DefaultTestOptions + o.Port = -1 + s := RunServerWithOptions(&o) + defer s.Shutdown() + + nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + response := []byte("I will help you") + nc.Subscribe("foo", func(m *Msg) { + m.Respond(response) + m.Respond(response) + }) + nc.Flush() + + nc2, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + errCh := make(chan error, 1) + // Send a request on bar and expect nothing + go func() { + if m, err := nc2.Request("bar", nil, 500*time.Millisecond); m != nil || err == nil { + errCh <- fmt.Errorf("Expected no reply, got m=%+v err=%v", m, err) + return + } + errCh <- nil + }() + + // Send a request on foo, we use only one of the 2 replies + if _, err := nc2.Request("foo", nil, time.Second); err != nil { + t.Fatalf("Received an error on Request test: %s", err) + } + if e := <-errCh; e != nil { + t.Fatal(e.Error()) + } +}