Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for no responders error on requests #576

Merged
merged 1 commit into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,37 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [
return nil, ctx.Err()
}

nc.mu.Lock()
// If user wants the old style.
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequestWithContext(ctx, subj, hdr, data)
}

mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
if err != nil {
return nil, err
}
var m *Msg
var err error

var ok bool
var msg *Msg
// If user wants the old style.
if nc.useOldRequestStyle() {
m, err = nc.oldRequestWithContext(ctx, subj, hdr, data)
} else {
mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
if err != nil {
return nil, err
}

select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
var ok bool

select {
case m, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
case <-ctx.Done():
nc.mu.Lock()
delete(nc.respMap, token)
nc.mu.Unlock()
return nil, ctx.Err()
}
case <-ctx.Done():
nc.mu.Lock()
delete(nc.respMap, token)
nc.mu.Unlock()
return nil, ctx.Err()
}

return msg, nil
// Check for no responder status.
if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
m, err = nil, ErrNoResponders
}
return m, err
}

// oldRequestWithContext utilizes inbox and subscription per request.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed h1:nnV8Mw23aNwNpKuQWuVBEuAqyBOEY21hLWKpVdNr6dQ=
github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 h1:nexMtKbOeM+w3vGQMNF0BEt+2xZDmVCtYXql2Ym+RWg=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3 h1:aDJ5IrBlq4KHBgwWZtKXi1lvY2EkRYOiC2KQfdLTJL8=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3/go.mod h1:uXGA6y1uxwW755SK+LoDZggh+UUVsbVoxh8ZG8MqbsI=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=
Expand All @@ -35,6 +41,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
77 changes: 51 additions & 26 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ var (
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
)

func init() {
Expand Down Expand Up @@ -589,21 +590,22 @@ const (
)

type connectInfo struct {
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
UserJWT string `json:"jwt,omitempty"`
Nkey string `json:"nkey,omitempty"`
Signature string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
Token string `json:"auth_token,omitempty"`
TLS bool `json:"tls_required"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
Headers bool `json:"headers"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
UserJWT string `json:"jwt,omitempty"`
Nkey string `json:"nkey,omitempty"`
Signature string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
Token string `json:"auth_token,omitempty"`
TLS bool `json:"tls_required"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
Headers bool `json:"headers"`
NoResponders bool `json:"no_responders"`
}

// MsgHandler is a callback function that processes messages delivered to
Expand Down Expand Up @@ -1711,8 +1713,10 @@ func (nc *Conn) connectProto() (string, error) {
token = nc.Opts.TokenHandler()
}

// If our server does not support headers then we can't do them or no responders.
hdrs := nc.info.Headers
cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, true}
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs}

b, err := json.Marshal(cinfo)
if err != nil {
Expand Down Expand Up @@ -2689,21 +2693,28 @@ func NewMsg(subject string) *Msg {
}

const (
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
noResponders = "503"
)

// decodeHeadersMsg will decode and headers.
func decodeHeadersMsg(data []byte) (http.Header, error) {
tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data)))
if l, err := tp.ReadLine(); err != nil || l != hdrLine[:hdrPreEnd] {
l, err := tp.ReadLine()
if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on not previously checking for len...

return nil, ErrBadHeaderMsg
}
mh, err := tp.ReadMIMEHeader()
if err != nil {
return nil, ErrBadHeaderMsg
}
// Check if we have an inlined status.
if len(l) > hdrPreEnd {
mh.Add(statusHdr, strings.TrimLeft(l[hdrPreEnd:], " "))
}
return http.Header(mh), nil
}

Expand Down Expand Up @@ -2904,6 +2915,7 @@ func (nc *Conn) respHandler(m *Msg) {

// Helper to setup and send new request style requests. Return the chan to receive the response.
func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) {
nc.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to go back and check pre-PR, that was weird how we got the lock in the caller but released in that function...

// Do setup for the new style if needed.
if nc.respMap == nil {
nc.initNewResp()
Expand Down Expand Up @@ -2944,7 +2956,6 @@ func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) {
if !nc.info.Headers {
return nil, ErrHeadersNotSupported
}

hdr, err = msg.headerBytes()
if err != nil {
return nil, err
Expand All @@ -2960,18 +2971,32 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg,
return nc.request(subj, nil, data, timeout)
}

func (nc *Conn) useOldRequestStyle() bool {
nc.mu.RLock()
r := nc.Opts.UseOldRequestStyle
nc.mu.RUnlock()
return r
}

func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
if nc == nil {
return nil, ErrInvalidConnection
}

nc.mu.Lock()
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequest(subj, hdr, data, timeout)
var m *Msg
var err error

if nc.useOldRequestStyle() {
m, err = nc.oldRequest(subj, hdr, data, timeout)
} else {
m, err = nc.newRequest(subj, hdr, data, timeout)
}

return nc.newRequest(subj, hdr, data, timeout)
// Check for no responder status.
if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did check that even if m.Header is 'nil' (because there is no header), m.Header.Get() will work (returns ""), but I got concerned when I saw that line.

m, err = nil, ErrNoResponders
}
return m, err
}

func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
Expand Down
49 changes: 48 additions & 1 deletion test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package test

import (
"bytes"
"context"
"fmt"
"math"
"regexp"
Expand Down Expand Up @@ -555,11 +556,54 @@ func TestRequestTimeout(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// We now need a responder by default otherwise we will get a no responders error.
nc.SubscribeSync("foo")

if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err != nats.ErrTimeout {
t.Fatalf("Expected to receive a timeout error")
}
}

func TestBasicNoRespondersSupport(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc.Close()

// Normal new style
if m, err := nc.Request("foo", nil, time.Second); err != nats.ErrNoResponders {
t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err)
}
// New style with context
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if m, err := nc.RequestWithContext(ctx, "foo", nil); err != nats.ErrNoResponders {
t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err)
}

// Now do old request style as well.
nc, err = nats.Connect(s.ClientURL(), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc.Close()

// Normal old request style
if m, err := nc.Request("foo", nil, time.Second); err != nats.ErrNoResponders {
t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err)
}
// Old request style with context
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
if m, err := nc.RequestWithContext(ctx, "foo", nil); err != nats.ErrNoResponders {
t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err)
}
}

func TestOldRequest(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
Expand All @@ -586,13 +630,15 @@ func TestOldRequest(t *testing.T) {
errCh := make(chan error, 1)
start := time.Now()
go func() {
sub, _ := nc.SubscribeSync("checkClose")
defer sub.Unsubscribe()
_, err := nc.Request("checkClose", []byte("should be kicked out on close"), time.Second)
errCh <- err
}()
time.Sleep(100 * time.Millisecond)
nc.Close()
if e := <-errCh; e != nats.ErrConnectionClosed {
t.Fatalf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", e)
}
if dur := time.Since(start); dur >= time.Second {
t.Fatalf("Request took too long to bail out: %v", dur)
Expand Down Expand Up @@ -677,6 +723,7 @@ func TestRequestClose(t *testing.T) {
time.Sleep(100 * time.Millisecond)
nc.Close()
}()
nc.SubscribeSync("foo")
if _, err := nc.Request("foo", []byte("help"), 2*time.Second); err != nats.ErrInvalidConnection && err != nats.ErrConnectionClosed {
t.Fatalf("Expected connection error: got %v", err)
}
Expand Down
10 changes: 7 additions & 3 deletions test/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2116,11 +2116,13 @@ func TestGetClientID(t *testing.T) {
optsA := test.DefaultTestOptions
optsA.Port = -1
optsA.Cluster.Port = -1
optsA.Cluster.Name = "test"

srvA := RunServerWithOptions(optsA)
defer srvA.Shutdown()

ch := make(chan bool, 1)
nc1, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvA.Addr().(*net.TCPAddr).Port),
nc1, err := nats.Connect(srvA.ClientURL(),
nats.DiscoveredServersHandler(func(_ *nats.Conn) {
ch <- true
}),
Expand All @@ -2144,13 +2146,15 @@ func TestGetClientID(t *testing.T) {
optsB := test.DefaultTestOptions
optsB.Port = -1
optsB.Cluster.Port = -1
optsB.Cluster.Name = "test"

optsB.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB := RunServerWithOptions(optsB)
defer srvB.Shutdown()

// Wait for the discovered callback to fire
if err := Wait(ch); err != nil {
t.Fatal("Did not the discovered callback")
t.Fatal("Did not fire the discovered callback")
}
// Now check CID should be valid and same as before
newCID, err := nc1.GetClientID()
Expand All @@ -2162,7 +2166,7 @@ func TestGetClientID(t *testing.T) {
}

// Create a client to server B
nc2, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvB.Addr().(*net.TCPAddr).Port))
nc2, err := nats.Connect(srvB.ClientURL())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion test/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,15 @@ func TestContextOldRequestClosed(t *testing.T) {
errCh := make(chan error, 1)
start := time.Now()
go func() {
sub, _ := nc.SubscribeSync("checkClose")
defer sub.Unsubscribe()
_, err = nc.RequestWithContext(ctx, "checkClose", []byte("should be kicked out on close"))
errCh <- err
}()
time.Sleep(100 * time.Millisecond)
nc.Close()
if e := <-errCh; e != nats.ErrConnectionClosed {
t.Fatalf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", e)
}
if dur := time.Since(start); dur >= time.Second {
t.Fatalf("Request took too long to bail out: %v", dur)
Expand Down
Loading