From 00b033f31eea055bd78aaa388660ee559fb36532 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 10 Aug 2021 16:29:12 -0600 Subject: [PATCH 1/2] [FIXED] Possible lock inversion and incorrect delete of JS consumer - Some refactoring to prevent lock inversion (no connection publish should be done under the subscription lock). - Remove used of jsSub mutex since so far everything can be done under the protection of the subscription's lock. - Attempt to delete JS consumer on Unsubscribe *only* if the library called AddConsumer and got a success. Resolves #775 Resolves #776 Signed-off-by: Ivan Kozlovic --- js.go | 157 ++++++++++++++++++++++-------------------------- nats.go | 45 ++++++++------ test/js_test.go | 33 ++++++---- 3 files changed, 121 insertions(+), 114 deletions(-) diff --git a/js.go b/js.go index 6f843bef0..4c1b860e7 100644 --- a/js.go +++ b/js.go @@ -812,7 +812,6 @@ type nextRequest struct { // jsSub includes JetStream subscription info. type jsSub struct { - mu sync.RWMutex js *js // For pull subscribers, this is the next message subject to send requests to. @@ -823,7 +822,7 @@ type jsSub struct { deliver string pull bool durable bool - attached bool + delCons bool // Ordered consumers ordered bool @@ -838,27 +837,34 @@ type jsSub struct { active bool fc bool cmeta string - fcs map[uint64]string + fcr string + fcd uint64 } -func (jsi *jsSub) unsubscribe(drainMode bool) error { - jsi.mu.Lock() - durable, attached := jsi.durable, jsi.attached +func (sub *Subscription) jsiUnsubscribe(jsi *jsSub, drainMode bool) error { + sub.mu.Lock() + durable, delCons := jsi.durable, jsi.delCons stream, consumer := jsi.stream, jsi.consumer js := jsi.js if jsi.hbc != nil { jsi.hbc.Stop() jsi.hbc = nil } - jsi.mu.Unlock() + sub.mu.Unlock() - if drainMode && (durable || attached) { - // Skip deleting consumer for durables/attached - // consumers when using drain mode. + // Delete the JS consumer only if the library created the JS consumer, + // in which case delCons==true, but even then, in drain mode if this + // is a durable, do not delete. + if !delCons || (drainMode && durable) { return nil } - - return js.DeleteConsumer(stream, consumer) + // We don't want to possibly fail a drain because we were not able to + // delete the consumer. + err := js.DeleteConsumer(stream, consumer) + if drainMode { + return nil + } + return err } // SubOpt configures options for subscribing to JetStream consumers. @@ -941,7 +947,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, ccfg *ConsumerConfig info *ConsumerInfo deliver string - attached bool stream = o.stream consumer = o.consumer isDurable = o.cfg.Durable != _EMPTY_ @@ -1019,7 +1024,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, case info != nil: // Attach using the found consumer config. ccfg = &info.Config - attached = true // Make sure this new subject matches or is a subset. if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { @@ -1104,7 +1108,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, stream: stream, consumer: consumer, durable: isDurable, - attached: attached, deliver: deliver, hbs: hasHeartbeats, hbi: o.cfg.Heartbeat, @@ -1196,9 +1199,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, ErrSubjectMismatch } - // Update attached status. - jsi.attached = true - // Use the deliver subject from latest consumer config to attach. if info.Config.DeliverSubject != _EMPTY_ { // We can't reuse the channel, so if one was passed, we need to create a new one. @@ -1222,10 +1222,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) } - } else if consumer == _EMPTY_ { - // Update our consumer name here which is filled in when we create the consumer. + } else { + // Since we created the JS consumer internally, mark that we should + // delete it on Unsubscribe(). sub.mu.Lock() - sub.jsi.consumer = info.Name + sub.jsi.delCons = true + // Update our consumer name here which is filled in when we create the consumer. + if consumer == _EMPTY_ { + sub.jsi.consumer = info.Name + } sub.mu.Unlock() } } @@ -1267,10 +1272,11 @@ func isControlMessage(msg *Msg) bool { return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg } -func (jsi *jsSub) trackSequences(reply string) { - jsi.mu.Lock() - jsi.cmeta = reply - jsi.mu.Unlock() +// Keeps track of the incoming message's reply subject so that the consumer's +// state (deliver sequence, etc..) can be checked against heartbeats. +// Runs under the subscription lock +func (sub *Subscription) trackSequences(reply string) { + sub.jsi.cmeta = reply } // Check to make sure messages are arriving in order. @@ -1290,16 +1296,12 @@ func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { sseq, dseq := uint64(parseNum(tokens[5])), uint64(parseNum(tokens[6])) jsi := sub.jsi - jsi.mu.Lock() if dseq != jsi.dseq { - rseq := jsi.sseq + 1 - jsi.mu.Unlock() - sub.resetOrderedConsumer(rseq) + sub.resetOrderedConsumer(jsi.sseq + 1) return true } // Update our tracking here. jsi.dseq, jsi.sseq = dseq+1, sseq - jsi.mu.Unlock() return false } @@ -1328,8 +1330,7 @@ func (sub *Subscription) applyNewSID() (osid int64) { // Lock should be held. func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc := sub.conn - closed := sub.closed - if sub.jsi == nil || nc == nil || closed { + if sub.jsi == nil || nc == nil || sub.closed { return } @@ -1340,8 +1341,8 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { newDeliver := nc.newInbox() sub.Subject = newDeliver - // Snapshot jsi under sub lock here. - jsi := sub.jsi + // Snapshot the new sid under sub lock. + nsid := sub.sid // We are still in the low level readloop for the connection so we need // to spin a go routine to try to create the new consumer. @@ -1351,7 +1352,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { // This is done here in this go routine to prevent lock inversion. nc.mu.Lock() nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_)) - nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, sub.sid)) + nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid)) nc.kickFlusher() nc.mu.Unlock() @@ -1360,11 +1361,12 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc.unsubscribe(sub, 0, true) } - jsi.mu.Lock() + sub.mu.Lock() + jsi := sub.jsi // Reset some items in jsi. jsi.dseq = 1 jsi.cmeta = _EMPTY_ - jsi.fcs = nil + jsi.fcr, jsi.fcd = _EMPTY_, 0 jsi.deliver = newDeliver // Reset consumer request for starting policy. cfg := jsi.ccreq.Config @@ -1375,7 +1377,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream) j, err := json.Marshal(jsi.ccreq) js := jsi.js - jsi.mu.Unlock() + sub.mu.Unlock() if err != nil { pushErr(err) @@ -1403,86 +1405,71 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { return } - jsi.mu.Lock() + sub.mu.Lock() jsi.consumer = cinfo.Name - jsi.mu.Unlock() + sub.mu.Unlock() }() } // checkForFlowControlResponse will check to see if we should send a flow control response -// based on the delivered index. -// Lock should be held. -func (sub *Subscription) checkForFlowControlResponse(delivered uint64) { - jsi, nc := sub.jsi, sub.conn - if jsi == nil { - return - } - - jsi.mu.Lock() - defer jsi.mu.Unlock() - - if len(jsi.fcs) == 0 { - return - } - - if reply := jsi.fcs[delivered]; reply != _EMPTY_ { - delete(jsi.fcs, delivered) - nc.Publish(reply, nil) +// based on the subscription current delivered index and the target. +// Runs under subscription lock +func (sub *Subscription) checkForFlowControlResponse() string { + // Caller has verified that there is a sub.jsi and fc + jsi := sub.jsi + if jsi.fcd == sub.delivered { + fcr := jsi.fcr + jsi.fcr, jsi.fcd = _EMPTY_, 0 + return fcr } + return _EMPTY_ } // Record an inbound flow control message. -func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) { - jsi.mu.Lock() - if jsi.fcs == nil { - jsi.fcs = make(map[uint64]string) - } - jsi.fcs[dfuture] = reply - jsi.mu.Unlock() +// Runs under subscription lock +func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) { + jsi := sub.jsi + jsi.fcr, jsi.fcd = reply, dfuture } // Checks for activity from our consumer. // If we do not think we are active send an async error. func (sub *Subscription) activityCheck() { + sub.mu.Lock() jsi := sub.jsi if jsi == nil { + sub.mu.Unlock() return } - jsi.mu.Lock() active := jsi.active jsi.hbc.Reset(jsi.hbi) jsi.active = false - jsi.mu.Unlock() - - if !active { - sub.mu.Lock() - nc := sub.conn - closed := sub.closed - sub.mu.Unlock() + nc := sub.conn + closed := sub.closed + sub.mu.Unlock() - if !closed { - nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - if errCB != nil { - nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) - } - nc.mu.Unlock() + if !active && !closed { + nc.mu.Lock() + errCB := nc.Opts.AsyncErrorCB + if errCB != nil { + nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) } + nc.mu.Unlock() } } // scheduleHeartbeatCheck sets up the timer check to make sure we are active // or receiving idle heartbeats.. func (sub *Subscription) scheduleHeartbeatCheck() { + sub.mu.Lock() + defer sub.mu.Unlock() + jsi := sub.jsi if jsi == nil { return } - jsi.mu.Lock() - defer jsi.mu.Unlock() - if jsi.hbc == nil { jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck) } else { @@ -1503,10 +1490,10 @@ func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) { // checkForSequenceMismatch will make sure we have not missed any messages since last seen. func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) { // Process heartbeat received, get latest control metadata if present. - jsi.mu.Lock() + s.mu.Lock() ctrl, ordered := jsi.cmeta, jsi.ordered jsi.active = true - jsi.mu.Unlock() + s.mu.Unlock() if ctrl == _EMPTY_ { return diff --git a/nats.go b/nats.go index e7b456150..413468fdb 100644 --- a/nats.go +++ b/nats.go @@ -2566,21 +2566,24 @@ func (nc *Conn) waitForMsgs(s *Subscription) { mcb := s.mcb max = s.max closed = s.closed + var fcReply string if !s.closed { s.delivered++ delivered = s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 - s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) + if s.jsi.fc { + fcReply = s.checkForFlowControlResponse() } + s.jsi.active = true } } s.mu.Unlock() + // Respond to flow control if applicable + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if closed { break } @@ -2682,6 +2685,7 @@ func (nc *Conn) processMsg(data []byte) { var ctrlMsg bool var hasFC bool var hasHBs bool + var fcReply string if nc.ps.ma.hdr > 0 { hbuf := msgPayload[:nc.ps.ma.hdr] @@ -2772,16 +2776,16 @@ func (nc *Conn) processMsg(data []byte) { if jsi != nil && hasHBs { // Store the ACK metadata from the message to // compare later on with the received heartbeat. - jsi.trackSequences(m.Reply) + sub.trackSequences(m.Reply) } } else if hasFC && m.Reply != _EMPTY_ { // This is a flow control message. // If we have no pending, go ahead and send in place. if sub.pMsgs <= 0 { - nc.Publish(m.Reply, nil) + fcReply = m.Reply } else { // Schedule a reply after the previous message is delivered. - jsi.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) + sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) } } @@ -2789,6 +2793,10 @@ func (nc *Conn) processMsg(data []byte) { sub.sc = false sub.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + // Handle control heartbeat messages. if ctrlMsg && hasHBs && m.Reply == _EMPTY_ { nc.checkForSequenceMismatch(m, sub, jsi) @@ -3951,13 +3959,12 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { sub.mu.Unlock() } - // For JetStream consumers, need to clean up ephemeral consumers - // or delete durable ones if called with Unsubscribe. + // For JetStream consumers, need to clean up ephemeral consumers. sub.mu.Lock() jsi := sub.jsi sub.mu.Unlock() if jsi != nil && maxStr == _EMPTY_ { - err := jsi.unsubscribe(drainMode) + err := sub.jsiUnsubscribe(jsi, drainMode) if err != nil { return err } @@ -4100,17 +4107,15 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { nc := s.conn max := s.max + var fcReply string // Update some stats. s.delivered++ delivered := s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 - s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) + if s.jsi.fc { + fcReply = s.checkForFlowControlResponse() } + s.jsi.active = true } if s.typ == SyncSubscription { @@ -4119,6 +4124,10 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { } s.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if max > 0 { if delivered > max { return ErrMaxMessages diff --git a/test/js_test.go b/test/js_test.go index 9e48c74bd..8b3ec17c3 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -3631,6 +3631,13 @@ func TestJetStream_Unsubscribe(t *testing.T) { return infos } + deleteAllConsumers := func(t *testing.T) { + t.Helper() + for cn := range js.ConsumerNames("foo") { + js.DeleteConsumer("foo", cn) + } + } + js.Publish("foo.A", []byte("A")) js.Publish("foo.B", []byte("B")) js.Publish("foo.C", []byte("C")) @@ -3654,9 +3661,10 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 0) + deleteAllConsumers(t) }) - t.Run("attached pull consumer deleted on unsubscribe", func(t *testing.T) { + t.Run("attached pull consumer not deleted on unsubscribe", func(t *testing.T) { // Created by JetStreamManagement if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ Durable: "wq", @@ -3684,7 +3692,8 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Expected %v, got %v", expected, got) } subC.Unsubscribe() - fetchConsumers(t, 0) + fetchConsumers(t, 1) + deleteAllConsumers(t) }) t.Run("ephemeral consumers deleted on drain", func(t *testing.T) { @@ -3697,6 +3706,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 0) + deleteAllConsumers(t) }) t.Run("durable consumers not deleted on drain", func(t *testing.T) { @@ -3721,6 +3731,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 1) + deleteAllConsumers(t) }) } @@ -3895,23 +3906,23 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { if got != expected { t.Errorf("Expected %v, got: %v", expected, got) } - // Delete durable consumer. + // We do not attempt to delete a JS consumer that the SubscribeX() call + // did not actually create, so that will still leave the JS consumer. err = subB.Unsubscribe() if err != nil { t.Errorf("Unexpected error: %v", err) } - err = dupSub.Unsubscribe() - if err == nil { - t.Fatalf("Unexpected success") - } - if !errors.Is(err, nats.ErrConsumerNotFound) { - t.Errorf("Expected consumer not found error, got: %v", err) + // Since dupSub did not create the JS consumer (already existed), it will + // not try to delete it, so the fact that it is already deleted when + // subB.Unsubscribe() is called, should not call a failure here. + if err := dupSub.Unsubscribe(); err != nil { + t.Fatalf("Error on unsubscribe: %v", err) } // Remains an ephemeral consumer that did not get deleted // when Close() was called. - fetchConsumers(t, 1) + fetchConsumers(t, 2) }) } @@ -3947,7 +3958,7 @@ func TestJetStream_UnsubscribeDeleteNoPermissions(t *testing.T) { } defer nc.Close() - js, err := nc.JetStream() + js, err := nc.JetStream(nats.MaxWait(time.Second)) if err != nil { t.Fatal(err) } From edbc1cd8407d025754563d0d6222088454d0b0b3 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 10 Aug 2021 20:39:41 -0600 Subject: [PATCH 2/2] Added DeleteConsumer() subscription option If set, the JS consumer will be deleted: - On Unsubscribe(), if error occurs, error is returned. - After Drain (connection and/or subscription) completes. If error occurs there, error is reported through async error cb. Signed-off-by: Ivan Kozlovic --- js.go | 56 +++++++++---------- nats.go | 41 ++++++++++---- test/js_test.go | 143 +++++++++++++++++++++++++++++++++++++----------- 3 files changed, 165 insertions(+), 75 deletions(-) diff --git a/js.go b/js.go index 4c1b860e7..9e79c301b 100644 --- a/js.go +++ b/js.go @@ -822,7 +822,7 @@ type jsSub struct { deliver string pull bool durable bool - delCons bool + dc bool // Delete JS consumer // Ordered consumers ordered bool @@ -841,30 +841,20 @@ type jsSub struct { fcd uint64 } -func (sub *Subscription) jsiUnsubscribe(jsi *jsSub, drainMode bool) error { +// Deletes the JS Consumer. +// No connection nor subscription lock must be held on entry. +func (sub *Subscription) deleteConsumer() error { sub.mu.Lock() - durable, delCons := jsi.durable, jsi.delCons + jsi := sub.jsi + if jsi == nil { + sub.mu.Unlock() + return nil + } stream, consumer := jsi.stream, jsi.consumer js := jsi.js - if jsi.hbc != nil { - jsi.hbc.Stop() - jsi.hbc = nil - } sub.mu.Unlock() - // Delete the JS consumer only if the library created the JS consumer, - // in which case delCons==true, but even then, in drain mode if this - // is a durable, do not delete. - if !delCons || (drainMode && durable) { - return nil - } - // We don't want to possibly fail a drain because we were not able to - // delete the consumer. - err := js.DeleteConsumer(stream, consumer) - if drainMode { - return nil - } - return err + return js.DeleteConsumer(stream, consumer) } // SubOpt configures options for subscribing to JetStream consumers. @@ -1117,6 +1107,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, dseq: 1, pull: isPullMode, nms: nms, + dc: o.dc, } sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi) @@ -1222,15 +1213,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) } - } else { - // Since we created the JS consumer internally, mark that we should - // delete it on Unsubscribe(). - sub.mu.Lock() - sub.jsi.delCons = true + } else if consumer == _EMPTY_ { // Update our consumer name here which is filled in when we create the consumer. - if consumer == _EMPTY_ { - sub.jsi.consumer = info.Name - } + sub.mu.Lock() + sub.jsi.consumer = info.Name sub.mu.Unlock() } } @@ -1451,8 +1437,7 @@ func (sub *Subscription) activityCheck() { if !active && !closed { nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - if errCB != nil { + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) } nc.mu.Unlock() @@ -1578,6 +1563,8 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool + // User wants the library to delete the JS consumer on sub.Unsubscribe() + dc bool } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1775,6 +1762,15 @@ func IdleHeartbeat(duration time.Duration) SubOpt { }) } +// DeleteConsumer instructs the library to delete the JetStream consumer +// when calling Subscription.Unsubscribe(). +func DeleteConsumer() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.dc = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/nats.go b/nats.go index 413468fdb..106e579db 100644 --- a/nats.go +++ b/nats.go @@ -3808,6 +3808,12 @@ func (nc *Conn) removeSub(s *Subscription) { } s.mch = nil + // If JS subscription the stop HB timer. + if jsi := s.jsi; jsi != nil && jsi.hbc != nil { + jsi.hbc.Stop() + jsi.hbc = nil + } + // Mark as invalid s.closed = true if s.pCond != nil { @@ -3878,6 +3884,7 @@ func (s *Subscription) Unsubscribe() error { s.mu.Lock() conn := s.conn closed := s.closed + dc := s.jsi != nil && s.jsi.dc s.mu.Unlock() if conn == nil || conn.IsClosed() { return ErrConnectionClosed @@ -3888,7 +3895,13 @@ func (s *Subscription) Unsubscribe() error { if conn.IsDraining() { return ErrConnectionDraining } - return conn.unsubscribe(s, 0, false) + if err := conn.unsubscribe(s, 0, false); err != nil { + return err + } + if dc { + return s.deleteConsumer() + } + return nil } // checkDrained will watch for a subscription to be fully drained @@ -3902,6 +3915,12 @@ func (nc *Conn) checkDrained(sub *Subscription) { // is correct and the server will not send additional information. nc.Flush() + sub.mu.Lock() + // For JS subscriptions, check if we are going to delete the + // JS consumer when drain completes. + dc := sub.jsi != nil && sub.jsi.dc + sub.mu.Unlock() + // Once we are here we just wait for Pending to reach 0 or // any other state to exit this go routine. for { @@ -3921,6 +3940,15 @@ func (nc *Conn) checkDrained(sub *Subscription) { nc.mu.Lock() nc.removeSub(sub) nc.mu.Unlock() + if dc { + if err := sub.deleteConsumer(); err != nil { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, err) }) + } + nc.mu.Unlock() + } + } return } @@ -3959,17 +3987,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { sub.mu.Unlock() } - // For JetStream consumers, need to clean up ephemeral consumers. - sub.mu.Lock() - jsi := sub.jsi - sub.mu.Unlock() - if jsi != nil && maxStr == _EMPTY_ { - err := sub.jsiUnsubscribe(jsi, drainMode) - if err != nil { - return err - } - } - nc.mu.Lock() // ok here, but defer is expensive defer nc.mu.Unlock() diff --git a/test/js_test.go b/test/js_test.go index 8b3ec17c3..519b47fa6 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -372,7 +372,7 @@ func TestJetStreamSubscribe(t *testing.T) { if received == toSend { done <- true } - }) + }, nats.DeleteConsumer()) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3618,17 +3618,18 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { + fetchConsumers := func(t *testing.T, expected int) { t.Helper() - var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("foo") { - infos = append(infos, info) - } - if len(infos) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) - } - - return infos + checkFor(t, time.Second, 15*time.Millisecond, func() error { + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("foo") { + infos = append(infos, info) + } + if len(infos) != expected { + return fmt.Errorf("Expected %d consumers, got: %d", expected, len(infos)) + } + return nil + }) } deleteAllConsumers := func(t *testing.T) { @@ -3643,7 +3644,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { js.Publish("foo.C", []byte("C")) t.Run("consumers deleted on unsubscribe", func(t *testing.T) { - subA, err := js.SubscribeSync("foo.A") + subA, err := js.SubscribeSync("foo.A", nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3652,7 +3653,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Unexpected error: %v", err) } - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + subB, err := js.SubscribeSync("foo.B", nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3664,7 +3665,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { deleteAllConsumers(t) }) - t.Run("attached pull consumer not deleted on unsubscribe", func(t *testing.T) { + t.Run("not deleted on unsubscribe if option not set", func(t *testing.T) { // Created by JetStreamManagement if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ Durable: "wq", @@ -3697,7 +3698,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { }) t.Run("ephemeral consumers deleted on drain", func(t *testing.T) { - subA, err := js.SubscribeSync("foo.A") + subA, err := js.Subscribe("foo.A", func(_ *nats.Msg) {}, nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3709,8 +3710,8 @@ func TestJetStream_Unsubscribe(t *testing.T) { deleteAllConsumers(t) }) - t.Run("durable consumers not deleted on drain", func(t *testing.T) { - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + t.Run("durable consumers deleted on drain", func(t *testing.T) { + subB, err := js.Subscribe("foo.B", func(_ *nats.Msg) {}, nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3718,10 +3719,10 @@ func TestJetStream_Unsubscribe(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %v", err) } - fetchConsumers(t, 1) + fetchConsumers(t, 0) }) - t.Run("reattached durable consumers not deleted on drain", func(t *testing.T) { + t.Run("durable consumers not deleted on drain if option not set", func(t *testing.T) { subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) if err != nil { t.Fatal(err) @@ -3788,7 +3789,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = js.SubscribeSync("foo.C") + _, err = js.SubscribeSync("foo.C", nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3815,11 +3816,11 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - _, err = js.SubscribeSync("foo.A") + _, err = js.SubscribeSync("foo.A", nats.DeleteConsumer()) if err != nil { t.Fatalf("Unexpected error: %v", err) } - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + subB, err := js.SubscribeSync("foo.B", nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3861,7 +3862,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { // The durable interest remains so have to attach now, // otherwise would get a stream already used error. - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + subB, err := js.SubscribeSync("foo.B", nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3883,7 +3884,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { jsm.Publish("foo.B", []byte("B.3")) // Attach again to the same subject with the durable. - dupSub, err := js.SubscribeSync("foo.B", nats.Durable("B")) + dupSub, err := js.SubscribeSync("foo.B", nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3906,23 +3907,23 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { if got != expected { t.Errorf("Expected %v, got: %v", expected, got) } - // We do not attempt to delete a JS consumer that the SubscribeX() call - // did not actually create, so that will still leave the JS consumer. + // Delete durable consumer. err = subB.Unsubscribe() if err != nil { t.Errorf("Unexpected error: %v", err) } - // Since dupSub did not create the JS consumer (already existed), it will - // not try to delete it, so the fact that it is already deleted when - // subB.Unsubscribe() is called, should not call a failure here. - if err := dupSub.Unsubscribe(); err != nil { - t.Fatalf("Error on unsubscribe: %v", err) + err = dupSub.Unsubscribe() + if err == nil { + t.Fatalf("Unexpected success") + } + if !errors.Is(err, nats.ErrConsumerNotFound) { + t.Errorf("Expected consumer not found error, got: %v", err) } // Remains an ephemeral consumer that did not get deleted // when Close() was called. - fetchConsumers(t, 2) + fetchConsumers(t, 1) }) } @@ -3968,7 +3969,7 @@ func TestJetStream_UnsubscribeDeleteNoPermissions(t *testing.T) { }) js.Publish("foo", []byte("test")) - sub, err := js.SubscribeSync("foo") + sub, err := js.SubscribeSync("foo", nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -6428,3 +6429,79 @@ func TestJetStreamMaxMsgsPerSubject(t *testing.T) { }) } } + +func TestJetStreamDrainFailsToDeleteConsumer(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errCh := make(chan error, 1) + nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + select { + case errCh <- err: + default: + } + })) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js.Publish("foo", []byte("hi")) + + // Normally, we would not call AddConsumer externally and then + // create a subscription with DeleteConsumer. But here we do so + // in order to be able to delete the consumer while the Drain + // is in process and make sure that an async error is returned. + if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "dur", + FilterSubject: "foo", + DeliverSubject: "bar", + }); err != nil { + t.Fatalf("Error adding consumer: %v", err) + } + + blockCh := make(chan struct{}) + sub, err := js.Subscribe("foo", func(m *nats.Msg) { + <-blockCh + }, nats.Bind("TEST", "dur"), nats.Durable("dur"), nats.DeleteConsumer()) + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + + // Initiate the drain... it won't complete because we have blocked the + // message callback. + sub.Drain() + + // Now delete the JS consumer + if err := js.DeleteConsumer("TEST", "dur"); err != nil { + t.Fatalf("Error deleting consumer: %v", err) + } + + // Now unblock and make sure we get the async error + close(blockCh) + + select { + case err := <-errCh: + if !strings.Contains(err.Error(), "consumer not found") { + t.Fatalf("Unexpected async error: %v", err) + } + case <-time.After(time.Second): + t.Fatal("Did not get async error") + } +}