Skip to content

Commit

Permalink
Merge pull request #528 from nats-io/fix_multiple_replies
Browse files Browse the repository at this point in the history
[FIXED] Request() may receive invalid reply if responder sends multiple replies
  • Loading branch information
kozlovic committed Oct 31, 2019
2 parents 41e7eae + a3c7525 commit 6063d67
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.9.0
go get github.com/nats-io/nats.go/@v1.9.1

# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
Expand Down
18 changes: 11 additions & 7 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (

// Default Constants
const (
Version = "1.9.0"
Version = "1.9.1"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
Expand Down Expand Up @@ -2656,14 +2656,18 @@ func (nc *Conn) respHandler(m *Msg) {
return
}

var mch chan *Msg

// Grab mch
rt := nc.respToken(m.Subject)
mch := nc.respMap[rt]
// Delete the key regardless, one response only.
delete(nc.respMap, rt)
// If something went wrong and we only have one entry, use that.
// This can happen if the system rewrites the subject, e.g. js.
if mch == nil && len(nc.respMap) == 1 {
if rt != _EMPTY_ {
mch = nc.respMap[rt]
// Delete the key regardless, one response only.
delete(nc.respMap, rt)
} else if len(nc.respMap) == 1 {
// If the server has rewritten the subject, the response token (rt)
// will not match (could be the case with JetStream). If that is the
// case and there is a single entry, use that.
for k, v := range nc.respMap {
mch = v
delete(nc.respMap, k)
Expand Down
44 changes: 44 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2264,3 +2264,47 @@ func TestRequestLeaksMapEntries(t *testing.T) {
t.Fatalf("Expected 0 entries in response map, got %d", num)
}
}

func TestRequestMultipleReplies(t *testing.T) {
o := natsserver.DefaultTestOptions
o.Port = -1
s := RunServerWithOptions(&o)
defer s.Shutdown()

nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

response := []byte("I will help you")
nc.Subscribe("foo", func(m *Msg) {
m.Respond(response)
m.Respond(response)
})
nc.Flush()

nc2, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()

errCh := make(chan error, 1)
// Send a request on bar and expect nothing
go func() {
if m, err := nc2.Request("bar", nil, 500*time.Millisecond); m != nil || err == nil {
errCh <- fmt.Errorf("Expected no reply, got m=%+v err=%v", m, err)
return
}
errCh <- nil
}()

// Send a request on foo, we use only one of the 2 replies
if _, err := nc2.Request("foo", nil, time.Second); err != nil {
t.Fatalf("Received an error on Request test: %s", err)
}
if e := <-errCh; e != nil {
t.Fatal(e.Error())
}
}

0 comments on commit 6063d67

Please sign in to comment.