From cfef5e6c50fe1ba7f6574f3ab9ea03c2ee58a3a9 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 9 Aug 2021 17:54:27 -0600 Subject: [PATCH] Reworked PullSubscribe implementation Use a SyncSubscription instead. The only visible change from the user is that calling Fetch() after Unsubscribe() returns ErrBadSubscription instead of timeout or context deadline exceeded, which makes more sense to me. This is the only test that I had to tweak. Signed-off-by: Ivan Kozlovic --- context.go | 22 ++- js.go | 446 +++++++++++++++--------------------------------- nats.go | 9 +- test/js_test.go | 2 +- 4 files changed, 167 insertions(+), 312 deletions(-) diff --git a/context.go b/context.go index b098a6d74..53e5ebb0e 100644 --- a/context.go +++ b/context.go @@ -110,10 +110,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat return s.NextMsgWithContext(ctx) } -// NextMsgWithContext takes a context and returns the next message -// available to a synchronous subscriber, blocking until it is delivered -// or context gets canceled. -func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { +func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, waitIfNoMsg bool) (*Msg, error) { if ctx == nil { return nil, ErrInvalidContext } @@ -126,6 +123,11 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { s.mu.Lock() err := s.validateNextMsgState() + // Unless this is from an internal call, reject use of this API. + // Users should use Fetch() instead. + if err == nil && !pullSubInternal && s.jsi != nil && s.jsi.pull { + err = ErrTypeSubscription + } if err != nil { s.mu.Unlock() return nil, err @@ -150,6 +152,11 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { return msg, nil } default: + // If internal and we don't want to wait, signal that there is no + // message in the internal queue. + if pullSubInternal && !waitIfNoMsg { + return nil, errNoMessages + } } select { @@ -167,6 +174,13 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { return msg, nil } +// NextMsgWithContext takes a context and returns the next message +// available to a synchronous subscriber, blocking until it is delivered +// or context gets canceled. +func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { + return s.nextMsgWithContext(ctx, false, true) +} + // FlushWithContext will allow a context to control the duration // of a Flush() call. This context should be non-nil and should // have a deadline set. We will return an error if none is present. diff --git a/js.go b/js.go index 85be70219..6f843bef0 100644 --- a/js.go +++ b/js.go @@ -400,26 +400,6 @@ func (paf *pubAckFuture) Msg() *Msg { return paf.msg } -// pullSubscribe creates the wildcard subscription used per pull subscriber -// to make fetch requests. -func (js *js) pullSubscribe(subj string, jsi *jsSub) (*Subscription, error) { - // Similar to async request handler we create a wildcard subscription for making requests, - // though we do not use the token based approach since we cannot match the response to - // the requestor due to JS subject being remapped on delivery. Instead, we just use an array - // of channels similar to how ping/pong interval is handled and send the message to the first - // available requestor via a channel. - jsi.pull = true - jsi.rr = rand.New(rand.NewSource(time.Now().UnixNano())) - jsi.rpre = fmt.Sprintf("%s.", NewInbox()) - sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", jsi.rpre), jsi.handleFetch) - if err != nil { - return nil, err - } - jsi.psub = sub - - return &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: jsi}, nil -} - // For quick token lookup etc. const aReplyPreLen = 14 const aReplyTokensize = 6 @@ -832,14 +812,11 @@ type nextRequest struct { // jsSub includes JetStream subscription info. type jsSub struct { + mu sync.RWMutex js *js - // To setup request mux handler for pull subscribers. - mu sync.RWMutex - psub *Subscription - rpre string - rr *rand.Rand - freqs []chan *Msg + // For pull subscribers, this is the next message subject to send requests to. + nms string consumer string stream string @@ -864,83 +841,9 @@ type jsSub struct { fcs map[uint64]string } -// newFetchReply generates a unique inbox used for a fetch request. -func (jsi *jsSub) newFetchReply() string { - jsi.mu.Lock() - rpre := jsi.rpre - rn := jsi.rr.Int63() - jsi.mu.Unlock() - var sb strings.Builder - sb.WriteString(rpre) - var b [aReplyTokensize]byte - for i, l := 0, rn; i < len(b); i++ { - b[i] = rdigits[l%base] - l /= base - } - sb.Write(b[:]) - return sb.String() -} - -// handleFetch is delivered a message requested by pull subscribers -// when calling Fetch. -func (jsi *jsSub) handleFetch(m *Msg) { - jsi.mu.Lock() - if len(jsi.freqs) == 0 { - nc := jsi.js.nc - sub := jsi.psub - nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - err := fmt.Errorf("nats: fetch response delivered but requestor has gone away") - if errCB != nil { - nc.ach.push(func() { errCB(nc, sub, err) }) - } - nc.mu.Unlock() - jsi.mu.Unlock() - return - } - mch := jsi.freqs[0] - if len(jsi.freqs) > 1 { - jsi.freqs = append(jsi.freqs[:0], jsi.freqs[1:]...) - } else { - jsi.freqs = jsi.freqs[:0] - } - jsi.mu.Unlock() - mch <- m -} - -// fetchNoWait makes a request to get a single message using no wait. -func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte) (*Msg, error) { - nc := jsi.js.nc - m := NewMsg(subj) - m.Reply = jsi.newFetchReply() - m.Data = payload - - mch := make(chan *Msg, 1) - jsi.mu.Lock() - jsi.freqs = append(jsi.freqs, mch) - jsi.mu.Unlock() - if err := nc.PublishMsg(m); err != nil { - return nil, err - } - - var ok bool - var msg *Msg - - select { - case msg, ok = <-mch: - if !ok { - return nil, ErrConnectionClosed - } - case <-ctx.Done(): - return nil, ctx.Err() - } - - return msg, nil -} - func (jsi *jsSub) unsubscribe(drainMode bool) error { jsi.mu.Lock() - durable, attached, psub := jsi.durable, jsi.attached, jsi.psub + durable, attached := jsi.durable, jsi.attached stream, consumer := jsi.stream, jsi.consumer js := jsi.js if jsi.hbc != nil { @@ -954,10 +857,6 @@ func (jsi *jsSub) unsubscribe(drainMode bool) error { // consumers when using drain mode. return nil } - // Clear the extra async pull subscription used for fetch requests. - if psub != nil { - psub.Drain() - } return js.DeleteConsumer(stream, consumer) } @@ -979,13 +878,13 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti if cb == nil { return nil, ErrBadSubscription } - return js.subscribe(subj, _EMPTY_, cb, nil, false, opts) + return js.subscribe(subj, _EMPTY_, cb, nil, false, false, opts) } // SubscribeSync will create a sync subscription to the appropriate stream and consumer. func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) - return js.subscribe(subj, _EMPTY_, nil, mch, true, opts) + return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts) } // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. @@ -993,31 +892,32 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) if cb == nil { return nil, ErrBadSubscription } - return js.subscribe(subj, queue, cb, nil, false, opts) + return js.subscribe(subj, queue, cb, nil, false, false, opts) } // QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics. func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) - return js.subscribe(subj, queue, nil, mch, true, opts) + return js.subscribe(subj, queue, nil, mch, true, false, opts) } // ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel. func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { - return js.subscribe(subj, _EMPTY_, nil, ch, false, opts) + return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts) } // ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel. func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { - return js.subscribe(subj, queue, nil, ch, false, opts) + return js.subscribe(subj, queue, nil, ch, false, false, opts) } // PullSubscribe creates a pull subscriber. func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) { - return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable))) + mch := make(chan *Msg, js.nc.Opts.SubChanLen) + return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable))) } -func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts []SubOpt) (*Subscription, error) { +func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} if len(opts) > 0 { @@ -1028,7 +928,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } - isPullMode := ch == nil && cb == nil && !isSync badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy hasHeartbeats := o.cfg.Heartbeat > 0 hasFC := o.cfg.FlowControl @@ -1049,6 +948,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync consumerBound = o.bound notFoundErr bool lookupErr bool + nc = js.nc + nms string ) // Do some quick checks here for ordered consumers. We do these here instead of spread out @@ -1134,7 +1035,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync if ccfg.DeliverSubject != _EMPTY_ { deliver = ccfg.DeliverSubject } else if !isPullMode { - deliver = NewInbox() + deliver = nc.newInbox() } case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): // If the consumer is being bound and we got an error on pull subscribe then allow the error. @@ -1147,7 +1048,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync if o.cfg.DeliverSubject != _EMPTY_ { deliver = o.cfg.DeliverSubject } else if !isPullMode { - deliver = NewInbox() + deliver = nc.newInbox() } } @@ -1193,6 +1094,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } + if isPullMode { + nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) + deliver = nc.newInbox() + } + jsi := &jsSub{ js: js, stream: stream, @@ -1206,17 +1112,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync ordered: o.ordered, ccreq: ccreq, dseq: 1, + pull: isPullMode, + nms: nms, } - if isPullMode { - sub, err = js.pullSubscribe(subj, jsi) - } else { - sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, jsi) - // Since JetStream sends on different subject, make sure this reflects the user's intentions. - sub.mu.Lock() - sub.Subject = subj - sub.mu.Unlock() - } + sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi) + // Since JetStream sends on different subject, make sure this reflects the user's intentions. + sub.mu.Lock() + sub.Subject = subj + sub.mu.Unlock() if err != nil { return nil, err } @@ -1253,7 +1157,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) } - resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait) + resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait) if err != nil { cleanUpSub() if err == ErrNoResponders { @@ -1303,7 +1207,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } jsi.deliver = info.Config.DeliverSubject // Recreate the subscription here. - sub, err = js.nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) + sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) if err != nil { return nil, err } @@ -1433,7 +1337,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { osid := sub.applyNewSID() // Grab new inbox. - newDeliver := NewInbox() + newDeliver := nc.newInbox() sub.Subject = newDeliver // Snapshot jsi under sub lock here. @@ -1920,6 +1824,45 @@ func PullMaxWaiting(n int) SubOpt { var errNoMessages = errors.New("nats: no messages") +// Returns if the given message is a user message or not, and if +// `checkSts` is true, returns appropriate error based on the +// content of the status (404, etc..) +func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) { + // Assume user message + usrMsg = true + + // If payload or no header, consider this a user message + if len(msg.Data) > 0 || len(msg.Header) == 0 { + return + } + // Look for status header + val := msg.Header.Get(statusHdr) + // If not present, then this is considered a user message + if val == _EMPTY_ { + return + } + // At this point, this is not a user message since there is + // no payload and a "Status" header. + usrMsg = false + + // If we don't care about status, we are done. + if !checkSts { + return + } + switch val { + case noResponders: + err = ErrNoResponders + case noMessagesSts: + // 404 indicates that there are no messages. + err = errNoMessages + case reqTimeoutSts: + err = ErrTimeout + default: + err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) + } + return +} + // Fetch pulls a batch of messages from a stream for a pull consumer. func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { if sub == nil { @@ -1938,14 +1881,18 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { sub.mu.Lock() jsi := sub.jsi - if jsi == nil || sub.typ != PullSubscription { + // Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription, + // so check for jsi.pull boolean instead. + if jsi == nil || !jsi.pull { sub.mu.Unlock() return nil, ErrTypeSubscription } nc := sub.conn - stream, consumer := sub.jsi.stream, sub.jsi.consumer + nms := sub.jsi.nms + rply := sub.jsi.deliver js := sub.jsi.js + pmc := len(sub.mch) > 0 ttl := o.ttl if ttl == 0 { @@ -1979,22 +1926,6 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { return nil, err } - // Check for empty payload message and process synchronously - // any status messages. - checkMsg := func(msg *Msg) error { - if len(msg.Data) == 0 { - switch msg.Header.Get(statusHdr) { - case noResponders: - return ErrNoResponders - case noMessages: - return errNoMessages - case "400", "408", "409": - return fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) - } - } - return nil - } - checkCtxErr := func(err error) error { if o.ctx == nil && err == context.DeadlineExceeded { return ErrTimeout @@ -2003,183 +1934,86 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { } var ( - msgs []*Msg - gotNoMessages bool - nr = &nextRequest{Batch: batch, NoWait: true} - req, _ = json.Marshal(nr) - reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer)) - expires = ttl - 10*time.Millisecond + msgs = make([]*Msg, 0, batch) + msg *Msg + start = time.Now() ) - - if batch == 1 { - // To optimize single message no wait fetch, we use a shared wildcard - // subscription per pull subscriber to wait for the response. - resp, err := jsi.fetchNoWait(ctx, reqNext, req) - if err != nil { - return nil, checkCtxErr(err) - } - - // In case of a no messages instant error, then fallback - // into longer version of pull batch request. - err = checkMsg(resp) + for pmc && len(msgs) < batch { + // Check next msg with booleans that say that this is an internal call + // for a pull subscribe (so don't reject it) and don't wait if there + // are no messages. + msg, err = sub.nextMsgWithContext(ctx, true, false) if err != nil { if err == errNoMessages { - // Use old request style for the retry of the pull request - // in order to use auto UNSUB 1 to prevent the server - // from delivering a message when there is no more interest. - nr.NoWait = false - nr.Expires = expires - req, _ = json.Marshal(nr) - resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req) - if err != nil { - return nil, checkCtxErr(err) - } - - // This next message, could also be an error - // (e.g. 408 due to request timeout). - err = checkMsg(resp) - if err != nil { - return nil, err - } - return []*Msg{resp}, nil - } else { - // Hard error - return nil, checkCtxErr(err) + err = nil } + break + } + // Check msg but just to determine if this is a user message + // or status message, however, we don't care about values of status + // messages at this point in the Fetch() call, so checkMsg can't + // return an error. + if usrMsg, _ := checkMsg(msg, false); usrMsg { + msgs = append(msgs, msg) } - return []*Msg{resp}, nil - } - - // Setup a request where we will wait for the first response - // in case of errors, then dispatch the rest of the replies - // to the channel. - inbox := NewInbox() - - mch := make(chan *Msg, batch) - msgs = make([]*Msg, 0, batch) - - s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil) - if err != nil { - return nil, err - } - - // Remove interest in the subscription at the end so that the - // this inbox does not get delivered the results intended - // for another request. - defer s.Unsubscribe() - - // Make a publish request to get results of the pull. - err = nc.publish(reqNext, inbox, nil, req) - if err != nil { - s.Unsubscribe() - return nil, err } + if err == nil && len(msgs) < batch { + // For batch real size of 1, it does not make sense to set no_wait in + // the request. + batchSize := batch - len(msgs) + noWait := batchSize > 1 + nr := &nextRequest{Batch: batchSize, NoWait: noWait} + req, _ := json.Marshal(nr) - // Try to get the first message or error with NoWait. - var ( - firstMsg *Msg - ok bool - ) - select { - case firstMsg, ok = <-mch: - if !ok { - err = s.getNextMsgErr() - } else { - err = s.processNextMsgDelivered(firstMsg) - if err == nil { - err = checkMsg(firstMsg) + err = nc.PublishRequest(nms, rply, req) + for err == nil && len(msgs) < batch { + ttl -= time.Since(start) + if ttl < 0 { + ttl = 0 } - } - case <-ctx.Done(): - err = checkCtxErr(ctx.Err()) - } - // If the first error is 'no more messages', then switch into - // longer form version of the request that waits for messages. - if err == errNoMessages { - gotNoMessages = true - } else if err != nil { - // We should be getting the response from the server - // in case we got a poll error, so stop and cleanup. - s.Unsubscribe() - return nil, err - } + // Ask for next message and waits if there are no messages + msg, err = sub.nextMsgWithContext(ctx, true, true) + if err == nil { + var usrMsg bool + + usrMsg, err = checkMsg(msg, true) + if err == nil && usrMsg { + msgs = append(msgs, msg) + } else if noWait && (err == errNoMessages) && len(msgs) == 0 { + // If we have a 404 for our "no_wait" request and have + // not collected any message, then resend request to + // wait this time. + noWait = false + + ttl -= time.Since(start) + if ttl < 0 { + // At this point consider that we have timed-out + err = context.DeadlineExceeded + break + } - if gotNoMessages { - // We started with a 404 response right away, so fallback into - // second request that waits longer for messages to delivered. - nr.NoWait = false - nr.Expires = expires - req, _ = json.Marshal(nr) + // Make our request expiration a bit shorter than the + // current timeout. + expires := ttl + if ttl >= 20*time.Millisecond { + expires = ttl - 10*time.Millisecond + } - // Make another request and wait for the messages... - err = nc.publish(reqNext, inbox, nil, req) - if err != nil { - s.Unsubscribe() - return nil, err - } + nr.Batch = batch - len(msgs) + nr.Expires = expires + nr.NoWait = false + req, _ = json.Marshal(nr) - // Try to get the first result again or return the error. - select { - case firstMsg, ok = <-mch: - if !ok { - err = s.getNextMsgErr() - } else { - err = s.processNextMsgDelivered(firstMsg) - if err == nil { - err = checkMsg(firstMsg) + err = nc.PublishRequest(nms, rply, req) } } - case <-ctx.Done(): - err = checkCtxErr(ctx.Err()) - } - if err != nil { - s.Unsubscribe() - return nil, err - } - // Check again if the delivered next message is a status error. - err = checkMsg(firstMsg) - if err != nil { - s.Unsubscribe() - return nil, err } } - - msgs = append(msgs, firstMsg) - for { - var ( - msg *Msg - ok bool - ) - select { - case msg, ok = <-mch: - if !ok { - err = s.getNextMsgErr() - } else { - err = s.processNextMsgDelivered(msg) - if err == nil { - err = checkMsg(msg) - } - } - case <-ctx.Done(): - return msgs, checkCtxErr(err) - } - if err != nil { - // Discard the error which may have been a timeout - // or 408 request timeout status from the server, - // and just the return delivered messages. - break - } - if msg != nil { - msgs = append(msgs, msg) - } - - if len(msgs) == batch { - // Done! - break - } + // If there is at least a message added to msgs, then need to return OK and no error + if err != nil && len(msgs) == 0 { + return nil, checkCtxErr(err) } - return msgs, nil } diff --git a/nats.go b/nats.go index 9b761b174..e7b456150 100644 --- a/nats.go +++ b/nats.go @@ -3154,7 +3154,8 @@ const ( lastConsumerSeqHdr = "Nats-Last-Consumer" lastStreamSeqHdr = "Nats-Last-Stream" noResponders = "503" - noMessages = "404" + noMessagesSts = "404" + reqTimeoutSts = "408" controlMsg = "100" statusLen = 3 // e.g. 20x, 40x, 50x ) @@ -3825,6 +3826,12 @@ func (s *Subscription) Type() SubscriptionType { } s.mu.Lock() defer s.mu.Unlock() + // Pull subscriptions are really a SyncSubscription and we want this + // type to be set internally for all delivered messages management, etc.. + // So check when to return PullSubscription to the user. + if s.jsi != nil && s.jsi.pull { + return PullSubscription + } return s.typ } diff --git a/test/js_test.go b/test/js_test.go index 98b3827c5..9e48c74bd 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5790,7 +5790,7 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { if err == nil { t.Fatal("Unexpected success") } - if err != nil && (err != nats.ErrTimeout && err != nats.ErrNoResponders) { + if err != nats.ErrBadSubscription { t.Fatalf("Unexpected error: %v", err) } })