Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Request(), allow for mux subject to possibly change. #523

Merged
merged 1 commit into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,7 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte
return nc.oldRequestWithContext(ctx, subj, data)
}

// Do setup for the new style.
if nc.respMap == nil {
nc.initNewResp()
}
// Create literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respToken(respInbox)
nc.respMap[token] = mch
createSub := nc.respMux == nil
ginbox := nc.respSub
nc.mu.Unlock()

if createSub {
// Make sure scoped subscription is setup only once.
var err error
nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
if err != nil {
return nil, err
}
}

err := nc.PublishRequest(subj, respInbox, data)
mch, token, err := nc.createNewRequestAndSend(subj, data)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/nats-io/nats.go

require (
github.com/nats-io/jwt v0.2.12
github.com/nats-io/jwt v0.3.0
github.com/nats-io/nkeys v0.1.0
github.com/nats-io/nuid v1.0.1
)
11 changes: 4 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
github.com/nats-io/jwt v0.2.12 h1:Y3YLoJey+Q/yMk/1Ig3xhWxYXE7vNSefozkArIcnSlU=
github.com/nats-io/jwt v0.2.12/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY=
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
78 changes: 52 additions & 26 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,11 @@ type Conn struct {

// New style response handler
respSub string // The wildcard subject
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respSetup sync.Once // Ensures response subscription occurs once
respRand *rand.Rand // Used for generating suffix.
respRand *rand.Rand // Used for generating suffix
}

// A Subscription represents interest in a given subject.
Expand Down Expand Up @@ -2647,21 +2648,28 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
// the appropriate channel based on the last token and place
// the message on the channel if possible.
func (nc *Conn) respHandler(m *Msg) {
rt := respToken(m.Subject)

nc.mu.Lock()

// Just return if closed.
if nc.isClosed() {
nc.mu.Unlock()
return
}

// Grab mch
rt := nc.respToken(m.Subject)
mch := nc.respMap[rt]
// Delete the key regardless, one response only.
// FIXME(dlc) - should we track responses past 1
// just statistics wise?
delete(nc.respMap, rt)
// If something went wrong and we only have one entry, use that.
// This can happen if the system rewrites the subject, e.g. js.
if mch == nil && len(nc.respMap) == 1 {
for k, v := range nc.respMap {
mch = v
delete(nc.respMap, k)
break
}
}
nc.mu.Unlock()

// Don't block, let Request timeout instead, mch is
Expand All @@ -2685,33 +2693,22 @@ func (nc *Conn) createRespMux(respSub string) error {
return err
}
nc.mu.Lock()
nc.respScanf = strings.Replace(respSub, "*", "%s", -1)
nc.respMux = s
nc.mu.Unlock()
return nil
}

// Request will send a request payload and deliver the response message,
// or an error, including a timeout if no message was received properly.
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
if nc == nil {
return nil, ErrInvalidConnection
}

nc.mu.Lock()
// If user wants the old style.
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequest(subj, data, timeout)
}

// Do setup for the new style.
// Helper to setup and send new request style requests. Return the chan to receive the response.
func (nc *Conn) createNewRequestAndSend(subj string, data []byte) (chan *Msg, string, error) {
// Do setup for the new style if needed.
if nc.respMap == nil {
nc.initNewResp()
}
// Create literal Inbox and map to a chan msg.
// Create new literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respToken(respInbox)
token := respInbox[respInboxPrefixLen:]
nc.respMap[token] = mch
createSub := nc.respMux == nil
ginbox := nc.respSub
Expand All @@ -2722,11 +2719,33 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg,
var err error
nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
if err != nil {
return nil, err
return nil, token, err
}
}

if err := nc.PublishRequest(subj, respInbox, data); err != nil {
return nil, token, err
}

return mch, token, nil
}

// Request will send a request payload and deliver the response message,
// or an error, including a timeout if no message was received properly.
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
if nc == nil {
return nil, ErrInvalidConnection
}

nc.mu.Lock()
// If user wants the old style.
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequest(subj, data, timeout)
}

mch, token, err := nc.createNewRequestAndSend(subj, data)
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -2829,9 +2848,16 @@ func (nc *Conn) NewRespInbox() string {
}

// respToken will return the last token of a literal response inbox
// which we use for the message channel lookup.
func respToken(respInbox string) string {
return respInbox[respInboxPrefixLen:]
// which we use for the message channel lookup. This needs to do a
// scan to protect itself against the server changing the subject.
// Lock should be held.
func (nc *Conn) respToken(respInbox string) string {
var token string
n, err := fmt.Sscanf(respInbox, nc.respScanf, &token)
if err != nil || n != 1 {
return ""
}
return token
}

// Subscribe will express interest in the given subject. The subject
Expand Down
34 changes: 34 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2217,3 +2217,37 @@ func TestStatsRace(t *testing.T) {
close(ch)
wg.Wait()
}

func TestRequestLeaksMapEntries(t *testing.T) {
o := natsserver.DefaultTestOptions
o.Port = -1
s := RunServerWithOptions(&o)
defer s.Shutdown()

nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

response := []byte("I will help you")
nc.Subscribe("foo", func(m *Msg) {
nc.Publish(m.Reply, response)
})

for i := 0; i < 100; i++ {
msg, err := nc.Request("foo", nil, 500*time.Millisecond)
if err != nil {
t.Fatalf("Received an error on Request test: %s", err)
}
if !bytes.Equal(msg.Data, response) {
t.Fatalf("Received invalid response")
}
}
nc.mu.Lock()
num := len(nc.respMap)
nc.mu.Unlock()
if num != 0 {
t.Fatalf("Expected 0 entries in response map, got %d", num)
}
}
2 changes: 1 addition & 1 deletion test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func TestRequestTimeout(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err == nil {
if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err != nats.ErrTimeout {
t.Fatalf("Expected to receive a timeout error")
}
}
Expand Down