diff --git a/context.go b/context.go index b098a6d74..53e5ebb0e 100644 --- a/context.go +++ b/context.go @@ -110,10 +110,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat return s.NextMsgWithContext(ctx) } -// NextMsgWithContext takes a context and returns the next message -// available to a synchronous subscriber, blocking until it is delivered -// or context gets canceled. -func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { +func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, waitIfNoMsg bool) (*Msg, error) { if ctx == nil { return nil, ErrInvalidContext } @@ -126,6 +123,11 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { s.mu.Lock() err := s.validateNextMsgState() + // Unless this is from an internal call, reject use of this API. + // Users should use Fetch() instead. + if err == nil && !pullSubInternal && s.jsi != nil && s.jsi.pull { + err = ErrTypeSubscription + } if err != nil { s.mu.Unlock() return nil, err @@ -150,6 +152,11 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { return msg, nil } default: + // If internal and we don't want to wait, signal that there is no + // message in the internal queue. + if pullSubInternal && !waitIfNoMsg { + return nil, errNoMessages + } } select { @@ -167,6 +174,13 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { return msg, nil } +// NextMsgWithContext takes a context and returns the next message +// available to a synchronous subscriber, blocking until it is delivered +// or context gets canceled. +func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { + return s.nextMsgWithContext(ctx, false, true) +} + // FlushWithContext will allow a context to control the duration // of a Flush() call. This context should be non-nil and should // have a deadline set. We will return an error if none is present. diff --git a/js.go b/js.go index 85be70219..6f843bef0 100644 --- a/js.go +++ b/js.go @@ -400,26 +400,6 @@ func (paf *pubAckFuture) Msg() *Msg { return paf.msg } -// pullSubscribe creates the wildcard subscription used per pull subscriber -// to make fetch requests. -func (js *js) pullSubscribe(subj string, jsi *jsSub) (*Subscription, error) { - // Similar to async request handler we create a wildcard subscription for making requests, - // though we do not use the token based approach since we cannot match the response to - // the requestor due to JS subject being remapped on delivery. Instead, we just use an array - // of channels similar to how ping/pong interval is handled and send the message to the first - // available requestor via a channel. - jsi.pull = true - jsi.rr = rand.New(rand.NewSource(time.Now().UnixNano())) - jsi.rpre = fmt.Sprintf("%s.", NewInbox()) - sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", jsi.rpre), jsi.handleFetch) - if err != nil { - return nil, err - } - jsi.psub = sub - - return &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: jsi}, nil -} - // For quick token lookup etc. const aReplyPreLen = 14 const aReplyTokensize = 6 @@ -832,14 +812,11 @@ type nextRequest struct { // jsSub includes JetStream subscription info. type jsSub struct { + mu sync.RWMutex js *js - // To setup request mux handler for pull subscribers. - mu sync.RWMutex - psub *Subscription - rpre string - rr *rand.Rand - freqs []chan *Msg + // For pull subscribers, this is the next message subject to send requests to. + nms string consumer string stream string @@ -864,83 +841,9 @@ type jsSub struct { fcs map[uint64]string } -// newFetchReply generates a unique inbox used for a fetch request. -func (jsi *jsSub) newFetchReply() string { - jsi.mu.Lock() - rpre := jsi.rpre - rn := jsi.rr.Int63() - jsi.mu.Unlock() - var sb strings.Builder - sb.WriteString(rpre) - var b [aReplyTokensize]byte - for i, l := 0, rn; i < len(b); i++ { - b[i] = rdigits[l%base] - l /= base - } - sb.Write(b[:]) - return sb.String() -} - -// handleFetch is delivered a message requested by pull subscribers -// when calling Fetch. -func (jsi *jsSub) handleFetch(m *Msg) { - jsi.mu.Lock() - if len(jsi.freqs) == 0 { - nc := jsi.js.nc - sub := jsi.psub - nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - err := fmt.Errorf("nats: fetch response delivered but requestor has gone away") - if errCB != nil { - nc.ach.push(func() { errCB(nc, sub, err) }) - } - nc.mu.Unlock() - jsi.mu.Unlock() - return - } - mch := jsi.freqs[0] - if len(jsi.freqs) > 1 { - jsi.freqs = append(jsi.freqs[:0], jsi.freqs[1:]...) - } else { - jsi.freqs = jsi.freqs[:0] - } - jsi.mu.Unlock() - mch <- m -} - -// fetchNoWait makes a request to get a single message using no wait. -func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte) (*Msg, error) { - nc := jsi.js.nc - m := NewMsg(subj) - m.Reply = jsi.newFetchReply() - m.Data = payload - - mch := make(chan *Msg, 1) - jsi.mu.Lock() - jsi.freqs = append(jsi.freqs, mch) - jsi.mu.Unlock() - if err := nc.PublishMsg(m); err != nil { - return nil, err - } - - var ok bool - var msg *Msg - - select { - case msg, ok = <-mch: - if !ok { - return nil, ErrConnectionClosed - } - case <-ctx.Done(): - return nil, ctx.Err() - } - - return msg, nil -} - func (jsi *jsSub) unsubscribe(drainMode bool) error { jsi.mu.Lock() - durable, attached, psub := jsi.durable, jsi.attached, jsi.psub + durable, attached := jsi.durable, jsi.attached stream, consumer := jsi.stream, jsi.consumer js := jsi.js if jsi.hbc != nil { @@ -954,10 +857,6 @@ func (jsi *jsSub) unsubscribe(drainMode bool) error { // consumers when using drain mode. return nil } - // Clear the extra async pull subscription used for fetch requests. - if psub != nil { - psub.Drain() - } return js.DeleteConsumer(stream, consumer) } @@ -979,13 +878,13 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti if cb == nil { return nil, ErrBadSubscription } - return js.subscribe(subj, _EMPTY_, cb, nil, false, opts) + return js.subscribe(subj, _EMPTY_, cb, nil, false, false, opts) } // SubscribeSync will create a sync subscription to the appropriate stream and consumer. func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) - return js.subscribe(subj, _EMPTY_, nil, mch, true, opts) + return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts) } // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. @@ -993,31 +892,32 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) if cb == nil { return nil, ErrBadSubscription } - return js.subscribe(subj, queue, cb, nil, false, opts) + return js.subscribe(subj, queue, cb, nil, false, false, opts) } // QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics. func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) - return js.subscribe(subj, queue, nil, mch, true, opts) + return js.subscribe(subj, queue, nil, mch, true, false, opts) } // ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel. func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { - return js.subscribe(subj, _EMPTY_, nil, ch, false, opts) + return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts) } // ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel. func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { - return js.subscribe(subj, queue, nil, ch, false, opts) + return js.subscribe(subj, queue, nil, ch, false, false, opts) } // PullSubscribe creates a pull subscriber. func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) { - return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable))) + mch := make(chan *Msg, js.nc.Opts.SubChanLen) + return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable))) } -func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts []SubOpt) (*Subscription, error) { +func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} if len(opts) > 0 { @@ -1028,7 +928,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } - isPullMode := ch == nil && cb == nil && !isSync badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy hasHeartbeats := o.cfg.Heartbeat > 0 hasFC := o.cfg.FlowControl @@ -1049,6 +948,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync consumerBound = o.bound notFoundErr bool lookupErr bool + nc = js.nc + nms string ) // Do some quick checks here for ordered consumers. We do these here instead of spread out @@ -1134,7 +1035,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync if ccfg.DeliverSubject != _EMPTY_ { deliver = ccfg.DeliverSubject } else if !isPullMode { - deliver = NewInbox() + deliver = nc.newInbox() } case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): // If the consumer is being bound and we got an error on pull subscribe then allow the error. @@ -1147,7 +1048,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync if o.cfg.DeliverSubject != _EMPTY_ { deliver = o.cfg.DeliverSubject } else if !isPullMode { - deliver = NewInbox() + deliver = nc.newInbox() } } @@ -1193,6 +1094,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } + if isPullMode { + nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) + deliver = nc.newInbox() + } + jsi := &jsSub{ js: js, stream: stream, @@ -1206,17 +1112,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync ordered: o.ordered, ccreq: ccreq, dseq: 1, + pull: isPullMode, + nms: nms, } - if isPullMode { - sub, err = js.pullSubscribe(subj, jsi) - } else { - sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, jsi) - // Since JetStream sends on different subject, make sure this reflects the user's intentions. - sub.mu.Lock() - sub.Subject = subj - sub.mu.Unlock() - } + sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi) + // Since JetStream sends on different subject, make sure this reflects the user's intentions. + sub.mu.Lock() + sub.Subject = subj + sub.mu.Unlock() if err != nil { return nil, err } @@ -1253,7 +1157,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) } - resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait) + resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait) if err != nil { cleanUpSub() if err == ErrNoResponders { @@ -1303,7 +1207,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } jsi.deliver = info.Config.DeliverSubject // Recreate the subscription here. - sub, err = js.nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) + sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) if err != nil { return nil, err } @@ -1433,7 +1337,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { osid := sub.applyNewSID() // Grab new inbox. - newDeliver := NewInbox() + newDeliver := nc.newInbox() sub.Subject = newDeliver // Snapshot jsi under sub lock here. @@ -1920,6 +1824,45 @@ func PullMaxWaiting(n int) SubOpt { var errNoMessages = errors.New("nats: no messages") +// Returns if the given message is a user message or not, and if +// `checkSts` is true, returns appropriate error based on the +// content of the status (404, etc..) +func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) { + // Assume user message + usrMsg = true + + // If payload or no header, consider this a user message + if len(msg.Data) > 0 || len(msg.Header) == 0 { + return + } + // Look for status header + val := msg.Header.Get(statusHdr) + // If not present, then this is considered a user message + if val == _EMPTY_ { + return + } + // At this point, this is not a user message since there is + // no payload and a "Status" header. + usrMsg = false + + // If we don't care about status, we are done. + if !checkSts { + return + } + switch val { + case noResponders: + err = ErrNoResponders + case noMessagesSts: + // 404 indicates that there are no messages. + err = errNoMessages + case reqTimeoutSts: + err = ErrTimeout + default: + err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) + } + return +} + // Fetch pulls a batch of messages from a stream for a pull consumer. func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { if sub == nil { @@ -1938,14 +1881,18 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { sub.mu.Lock() jsi := sub.jsi - if jsi == nil || sub.typ != PullSubscription { + // Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription, + // so check for jsi.pull boolean instead. + if jsi == nil || !jsi.pull { sub.mu.Unlock() return nil, ErrTypeSubscription } nc := sub.conn - stream, consumer := sub.jsi.stream, sub.jsi.consumer + nms := sub.jsi.nms + rply := sub.jsi.deliver js := sub.jsi.js + pmc := len(sub.mch) > 0 ttl := o.ttl if ttl == 0 { @@ -1979,22 +1926,6 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { return nil, err } - // Check for empty payload message and process synchronously - // any status messages. - checkMsg := func(msg *Msg) error { - if len(msg.Data) == 0 { - switch msg.Header.Get(statusHdr) { - case noResponders: - return ErrNoResponders - case noMessages: - return errNoMessages - case "400", "408", "409": - return fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) - } - } - return nil - } - checkCtxErr := func(err error) error { if o.ctx == nil && err == context.DeadlineExceeded { return ErrTimeout @@ -2003,183 +1934,86 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { } var ( - msgs []*Msg - gotNoMessages bool - nr = &nextRequest{Batch: batch, NoWait: true} - req, _ = json.Marshal(nr) - reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer)) - expires = ttl - 10*time.Millisecond + msgs = make([]*Msg, 0, batch) + msg *Msg + start = time.Now() ) - - if batch == 1 { - // To optimize single message no wait fetch, we use a shared wildcard - // subscription per pull subscriber to wait for the response. - resp, err := jsi.fetchNoWait(ctx, reqNext, req) - if err != nil { - return nil, checkCtxErr(err) - } - - // In case of a no messages instant error, then fallback - // into longer version of pull batch request. - err = checkMsg(resp) + for pmc && len(msgs) < batch { + // Check next msg with booleans that say that this is an internal call + // for a pull subscribe (so don't reject it) and don't wait if there + // are no messages. + msg, err = sub.nextMsgWithContext(ctx, true, false) if err != nil { if err == errNoMessages { - // Use old request style for the retry of the pull request - // in order to use auto UNSUB 1 to prevent the server - // from delivering a message when there is no more interest. - nr.NoWait = false - nr.Expires = expires - req, _ = json.Marshal(nr) - resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req) - if err != nil { - return nil, checkCtxErr(err) - } - - // This next message, could also be an error - // (e.g. 408 due to request timeout). - err = checkMsg(resp) - if err != nil { - return nil, err - } - return []*Msg{resp}, nil - } else { - // Hard error - return nil, checkCtxErr(err) + err = nil } + break + } + // Check msg but just to determine if this is a user message + // or status message, however, we don't care about values of status + // messages at this point in the Fetch() call, so checkMsg can't + // return an error. + if usrMsg, _ := checkMsg(msg, false); usrMsg { + msgs = append(msgs, msg) } - return []*Msg{resp}, nil - } - - // Setup a request where we will wait for the first response - // in case of errors, then dispatch the rest of the replies - // to the channel. - inbox := NewInbox() - - mch := make(chan *Msg, batch) - msgs = make([]*Msg, 0, batch) - - s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil) - if err != nil { - return nil, err - } - - // Remove interest in the subscription at the end so that the - // this inbox does not get delivered the results intended - // for another request. - defer s.Unsubscribe() - - // Make a publish request to get results of the pull. - err = nc.publish(reqNext, inbox, nil, req) - if err != nil { - s.Unsubscribe() - return nil, err } + if err == nil && len(msgs) < batch { + // For batch real size of 1, it does not make sense to set no_wait in + // the request. + batchSize := batch - len(msgs) + noWait := batchSize > 1 + nr := &nextRequest{Batch: batchSize, NoWait: noWait} + req, _ := json.Marshal(nr) - // Try to get the first message or error with NoWait. - var ( - firstMsg *Msg - ok bool - ) - select { - case firstMsg, ok = <-mch: - if !ok { - err = s.getNextMsgErr() - } else { - err = s.processNextMsgDelivered(firstMsg) - if err == nil { - err = checkMsg(firstMsg) + err = nc.PublishRequest(nms, rply, req) + for err == nil && len(msgs) < batch { + ttl -= time.Since(start) + if ttl < 0 { + ttl = 0 } - } - case <-ctx.Done(): - err = checkCtxErr(ctx.Err()) - } - // If the first error is 'no more messages', then switch into - // longer form version of the request that waits for messages. - if err == errNoMessages { - gotNoMessages = true - } else if err != nil { - // We should be getting the response from the server - // in case we got a poll error, so stop and cleanup. - s.Unsubscribe() - return nil, err - } + // Ask for next message and waits if there are no messages + msg, err = sub.nextMsgWithContext(ctx, true, true) + if err == nil { + var usrMsg bool + + usrMsg, err = checkMsg(msg, true) + if err == nil && usrMsg { + msgs = append(msgs, msg) + } else if noWait && (err == errNoMessages) && len(msgs) == 0 { + // If we have a 404 for our "no_wait" request and have + // not collected any message, then resend request to + // wait this time. + noWait = false + + ttl -= time.Since(start) + if ttl < 0 { + // At this point consider that we have timed-out + err = context.DeadlineExceeded + break + } - if gotNoMessages { - // We started with a 404 response right away, so fallback into - // second request that waits longer for messages to delivered. - nr.NoWait = false - nr.Expires = expires - req, _ = json.Marshal(nr) + // Make our request expiration a bit shorter than the + // current timeout. + expires := ttl + if ttl >= 20*time.Millisecond { + expires = ttl - 10*time.Millisecond + } - // Make another request and wait for the messages... - err = nc.publish(reqNext, inbox, nil, req) - if err != nil { - s.Unsubscribe() - return nil, err - } + nr.Batch = batch - len(msgs) + nr.Expires = expires + nr.NoWait = false + req, _ = json.Marshal(nr) - // Try to get the first result again or return the error. - select { - case firstMsg, ok = <-mch: - if !ok { - err = s.getNextMsgErr() - } else { - err = s.processNextMsgDelivered(firstMsg) - if err == nil { - err = checkMsg(firstMsg) + err = nc.PublishRequest(nms, rply, req) } } - case <-ctx.Done(): - err = checkCtxErr(ctx.Err()) - } - if err != nil { - s.Unsubscribe() - return nil, err - } - // Check again if the delivered next message is a status error. - err = checkMsg(firstMsg) - if err != nil { - s.Unsubscribe() - return nil, err } } - - msgs = append(msgs, firstMsg) - for { - var ( - msg *Msg - ok bool - ) - select { - case msg, ok = <-mch: - if !ok { - err = s.getNextMsgErr() - } else { - err = s.processNextMsgDelivered(msg) - if err == nil { - err = checkMsg(msg) - } - } - case <-ctx.Done(): - return msgs, checkCtxErr(err) - } - if err != nil { - // Discard the error which may have been a timeout - // or 408 request timeout status from the server, - // and just the return delivered messages. - break - } - if msg != nil { - msgs = append(msgs, msg) - } - - if len(msgs) == batch { - // Done! - break - } + // If there is at least a message added to msgs, then need to return OK and no error + if err != nil && len(msgs) == 0 { + return nil, checkCtxErr(err) } - return msgs, nil } diff --git a/nats.go b/nats.go index 9b761b174..e7b456150 100644 --- a/nats.go +++ b/nats.go @@ -3154,7 +3154,8 @@ const ( lastConsumerSeqHdr = "Nats-Last-Consumer" lastStreamSeqHdr = "Nats-Last-Stream" noResponders = "503" - noMessages = "404" + noMessagesSts = "404" + reqTimeoutSts = "408" controlMsg = "100" statusLen = 3 // e.g. 20x, 40x, 50x ) @@ -3825,6 +3826,12 @@ func (s *Subscription) Type() SubscriptionType { } s.mu.Lock() defer s.mu.Unlock() + // Pull subscriptions are really a SyncSubscription and we want this + // type to be set internally for all delivered messages management, etc.. + // So check when to return PullSubscription to the user. + if s.jsi != nil && s.jsi.pull { + return PullSubscription + } return s.typ } diff --git a/test/js_test.go b/test/js_test.go index 98b3827c5..9e48c74bd 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5790,7 +5790,7 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { if err == nil { t.Fatal("Unexpected success") } - if err != nil && (err != nats.ErrTimeout && err != nats.ErrNoResponders) { + if err != nats.ErrBadSubscription { t.Fatalf("Unexpected error: %v", err) } })