diff --git a/js.go b/js.go index 6f843bef0..9e79c301b 100644 --- a/js.go +++ b/js.go @@ -812,7 +812,6 @@ type nextRequest struct { // jsSub includes JetStream subscription info. type jsSub struct { - mu sync.RWMutex js *js // For pull subscribers, this is the next message subject to send requests to. @@ -823,7 +822,7 @@ type jsSub struct { deliver string pull bool durable bool - attached bool + dc bool // Delete JS consumer // Ordered consumers ordered bool @@ -838,25 +837,22 @@ type jsSub struct { active bool fc bool cmeta string - fcs map[uint64]string + fcr string + fcd uint64 } -func (jsi *jsSub) unsubscribe(drainMode bool) error { - jsi.mu.Lock() - durable, attached := jsi.durable, jsi.attached - stream, consumer := jsi.stream, jsi.consumer - js := jsi.js - if jsi.hbc != nil { - jsi.hbc.Stop() - jsi.hbc = nil - } - jsi.mu.Unlock() - - if drainMode && (durable || attached) { - // Skip deleting consumer for durables/attached - // consumers when using drain mode. +// Deletes the JS Consumer. +// No connection nor subscription lock must be held on entry. +func (sub *Subscription) deleteConsumer() error { + sub.mu.Lock() + jsi := sub.jsi + if jsi == nil { + sub.mu.Unlock() return nil } + stream, consumer := jsi.stream, jsi.consumer + js := jsi.js + sub.mu.Unlock() return js.DeleteConsumer(stream, consumer) } @@ -941,7 +937,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, ccfg *ConsumerConfig info *ConsumerInfo deliver string - attached bool stream = o.stream consumer = o.consumer isDurable = o.cfg.Durable != _EMPTY_ @@ -1019,7 +1014,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, case info != nil: // Attach using the found consumer config. ccfg = &info.Config - attached = true // Make sure this new subject matches or is a subset. if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { @@ -1104,7 +1098,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, stream: stream, consumer: consumer, durable: isDurable, - attached: attached, deliver: deliver, hbs: hasHeartbeats, hbi: o.cfg.Heartbeat, @@ -1114,6 +1107,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, dseq: 1, pull: isPullMode, nms: nms, + dc: o.dc, } sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi) @@ -1196,9 +1190,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, ErrSubjectMismatch } - // Update attached status. - jsi.attached = true - // Use the deliver subject from latest consumer config to attach. if info.Config.DeliverSubject != _EMPTY_ { // We can't reuse the channel, so if one was passed, we need to create a new one. @@ -1267,10 +1258,11 @@ func isControlMessage(msg *Msg) bool { return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg } -func (jsi *jsSub) trackSequences(reply string) { - jsi.mu.Lock() - jsi.cmeta = reply - jsi.mu.Unlock() +// Keeps track of the incoming message's reply subject so that the consumer's +// state (deliver sequence, etc..) can be checked against heartbeats. +// Runs under the subscription lock +func (sub *Subscription) trackSequences(reply string) { + sub.jsi.cmeta = reply } // Check to make sure messages are arriving in order. @@ -1290,16 +1282,12 @@ func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { sseq, dseq := uint64(parseNum(tokens[5])), uint64(parseNum(tokens[6])) jsi := sub.jsi - jsi.mu.Lock() if dseq != jsi.dseq { - rseq := jsi.sseq + 1 - jsi.mu.Unlock() - sub.resetOrderedConsumer(rseq) + sub.resetOrderedConsumer(jsi.sseq + 1) return true } // Update our tracking here. jsi.dseq, jsi.sseq = dseq+1, sseq - jsi.mu.Unlock() return false } @@ -1328,8 +1316,7 @@ func (sub *Subscription) applyNewSID() (osid int64) { // Lock should be held. func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc := sub.conn - closed := sub.closed - if sub.jsi == nil || nc == nil || closed { + if sub.jsi == nil || nc == nil || sub.closed { return } @@ -1340,8 +1327,8 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { newDeliver := nc.newInbox() sub.Subject = newDeliver - // Snapshot jsi under sub lock here. - jsi := sub.jsi + // Snapshot the new sid under sub lock. + nsid := sub.sid // We are still in the low level readloop for the connection so we need // to spin a go routine to try to create the new consumer. @@ -1351,7 +1338,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { // This is done here in this go routine to prevent lock inversion. nc.mu.Lock() nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_)) - nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, sub.sid)) + nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid)) nc.kickFlusher() nc.mu.Unlock() @@ -1360,11 +1347,12 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc.unsubscribe(sub, 0, true) } - jsi.mu.Lock() + sub.mu.Lock() + jsi := sub.jsi // Reset some items in jsi. jsi.dseq = 1 jsi.cmeta = _EMPTY_ - jsi.fcs = nil + jsi.fcr, jsi.fcd = _EMPTY_, 0 jsi.deliver = newDeliver // Reset consumer request for starting policy. cfg := jsi.ccreq.Config @@ -1375,7 +1363,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream) j, err := json.Marshal(jsi.ccreq) js := jsi.js - jsi.mu.Unlock() + sub.mu.Unlock() if err != nil { pushErr(err) @@ -1403,86 +1391,70 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { return } - jsi.mu.Lock() + sub.mu.Lock() jsi.consumer = cinfo.Name - jsi.mu.Unlock() + sub.mu.Unlock() }() } // checkForFlowControlResponse will check to see if we should send a flow control response -// based on the delivered index. -// Lock should be held. -func (sub *Subscription) checkForFlowControlResponse(delivered uint64) { - jsi, nc := sub.jsi, sub.conn - if jsi == nil { - return - } - - jsi.mu.Lock() - defer jsi.mu.Unlock() - - if len(jsi.fcs) == 0 { - return - } - - if reply := jsi.fcs[delivered]; reply != _EMPTY_ { - delete(jsi.fcs, delivered) - nc.Publish(reply, nil) +// based on the subscription current delivered index and the target. +// Runs under subscription lock +func (sub *Subscription) checkForFlowControlResponse() string { + // Caller has verified that there is a sub.jsi and fc + jsi := sub.jsi + if jsi.fcd == sub.delivered { + fcr := jsi.fcr + jsi.fcr, jsi.fcd = _EMPTY_, 0 + return fcr } + return _EMPTY_ } // Record an inbound flow control message. -func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) { - jsi.mu.Lock() - if jsi.fcs == nil { - jsi.fcs = make(map[uint64]string) - } - jsi.fcs[dfuture] = reply - jsi.mu.Unlock() +// Runs under subscription lock +func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) { + jsi := sub.jsi + jsi.fcr, jsi.fcd = reply, dfuture } // Checks for activity from our consumer. // If we do not think we are active send an async error. func (sub *Subscription) activityCheck() { + sub.mu.Lock() jsi := sub.jsi if jsi == nil { + sub.mu.Unlock() return } - jsi.mu.Lock() active := jsi.active jsi.hbc.Reset(jsi.hbi) jsi.active = false - jsi.mu.Unlock() - - if !active { - sub.mu.Lock() - nc := sub.conn - closed := sub.closed - sub.mu.Unlock() + nc := sub.conn + closed := sub.closed + sub.mu.Unlock() - if !closed { - nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - if errCB != nil { - nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) - } - nc.mu.Unlock() + if !active && !closed { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) } + nc.mu.Unlock() } } // scheduleHeartbeatCheck sets up the timer check to make sure we are active // or receiving idle heartbeats.. func (sub *Subscription) scheduleHeartbeatCheck() { + sub.mu.Lock() + defer sub.mu.Unlock() + jsi := sub.jsi if jsi == nil { return } - jsi.mu.Lock() - defer jsi.mu.Unlock() - if jsi.hbc == nil { jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck) } else { @@ -1503,10 +1475,10 @@ func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) { // checkForSequenceMismatch will make sure we have not missed any messages since last seen. func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) { // Process heartbeat received, get latest control metadata if present. - jsi.mu.Lock() + s.mu.Lock() ctrl, ordered := jsi.cmeta, jsi.ordered jsi.active = true - jsi.mu.Unlock() + s.mu.Unlock() if ctrl == _EMPTY_ { return @@ -1591,6 +1563,8 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool + // User wants the library to delete the JS consumer on sub.Unsubscribe() + dc bool } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1788,6 +1762,15 @@ func IdleHeartbeat(duration time.Duration) SubOpt { }) } +// DeleteConsumer instructs the library to delete the JetStream consumer +// when calling Subscription.Unsubscribe(). +func DeleteConsumer() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.dc = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/nats.go b/nats.go index e7b456150..106e579db 100644 --- a/nats.go +++ b/nats.go @@ -2566,21 +2566,24 @@ func (nc *Conn) waitForMsgs(s *Subscription) { mcb := s.mcb max = s.max closed = s.closed + var fcReply string if !s.closed { s.delivered++ delivered = s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 - s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) + if s.jsi.fc { + fcReply = s.checkForFlowControlResponse() } + s.jsi.active = true } } s.mu.Unlock() + // Respond to flow control if applicable + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if closed { break } @@ -2682,6 +2685,7 @@ func (nc *Conn) processMsg(data []byte) { var ctrlMsg bool var hasFC bool var hasHBs bool + var fcReply string if nc.ps.ma.hdr > 0 { hbuf := msgPayload[:nc.ps.ma.hdr] @@ -2772,16 +2776,16 @@ func (nc *Conn) processMsg(data []byte) { if jsi != nil && hasHBs { // Store the ACK metadata from the message to // compare later on with the received heartbeat. - jsi.trackSequences(m.Reply) + sub.trackSequences(m.Reply) } } else if hasFC && m.Reply != _EMPTY_ { // This is a flow control message. // If we have no pending, go ahead and send in place. if sub.pMsgs <= 0 { - nc.Publish(m.Reply, nil) + fcReply = m.Reply } else { // Schedule a reply after the previous message is delivered. - jsi.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) + sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) } } @@ -2789,6 +2793,10 @@ func (nc *Conn) processMsg(data []byte) { sub.sc = false sub.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + // Handle control heartbeat messages. if ctrlMsg && hasHBs && m.Reply == _EMPTY_ { nc.checkForSequenceMismatch(m, sub, jsi) @@ -3800,6 +3808,12 @@ func (nc *Conn) removeSub(s *Subscription) { } s.mch = nil + // If JS subscription the stop HB timer. + if jsi := s.jsi; jsi != nil && jsi.hbc != nil { + jsi.hbc.Stop() + jsi.hbc = nil + } + // Mark as invalid s.closed = true if s.pCond != nil { @@ -3870,6 +3884,7 @@ func (s *Subscription) Unsubscribe() error { s.mu.Lock() conn := s.conn closed := s.closed + dc := s.jsi != nil && s.jsi.dc s.mu.Unlock() if conn == nil || conn.IsClosed() { return ErrConnectionClosed @@ -3880,7 +3895,13 @@ func (s *Subscription) Unsubscribe() error { if conn.IsDraining() { return ErrConnectionDraining } - return conn.unsubscribe(s, 0, false) + if err := conn.unsubscribe(s, 0, false); err != nil { + return err + } + if dc { + return s.deleteConsumer() + } + return nil } // checkDrained will watch for a subscription to be fully drained @@ -3894,6 +3915,12 @@ func (nc *Conn) checkDrained(sub *Subscription) { // is correct and the server will not send additional information. nc.Flush() + sub.mu.Lock() + // For JS subscriptions, check if we are going to delete the + // JS consumer when drain completes. + dc := sub.jsi != nil && sub.jsi.dc + sub.mu.Unlock() + // Once we are here we just wait for Pending to reach 0 or // any other state to exit this go routine. for { @@ -3913,6 +3940,15 @@ func (nc *Conn) checkDrained(sub *Subscription) { nc.mu.Lock() nc.removeSub(sub) nc.mu.Unlock() + if dc { + if err := sub.deleteConsumer(); err != nil { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, err) }) + } + nc.mu.Unlock() + } + } return } @@ -3951,18 +3987,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { sub.mu.Unlock() } - // For JetStream consumers, need to clean up ephemeral consumers - // or delete durable ones if called with Unsubscribe. - sub.mu.Lock() - jsi := sub.jsi - sub.mu.Unlock() - if jsi != nil && maxStr == _EMPTY_ { - err := jsi.unsubscribe(drainMode) - if err != nil { - return err - } - } - nc.mu.Lock() // ok here, but defer is expensive defer nc.mu.Unlock() @@ -4100,17 +4124,15 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { nc := s.conn max := s.max + var fcReply string // Update some stats. s.delivered++ delivered := s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 - s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) + if s.jsi.fc { + fcReply = s.checkForFlowControlResponse() } + s.jsi.active = true } if s.typ == SyncSubscription { @@ -4119,6 +4141,10 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { } s.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if max > 0 { if delivered > max { return ErrMaxMessages diff --git a/test/js_test.go b/test/js_test.go index 9e48c74bd..519b47fa6 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -372,7 +372,7 @@ func TestJetStreamSubscribe(t *testing.T) { if received == toSend { done <- true } - }) + }, nats.DeleteConsumer()) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3618,17 +3618,25 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { + fetchConsumers := func(t *testing.T, expected int) { t.Helper() - var infos []*nats.ConsumerInfo - for info := range js.ConsumersInfo("foo") { - infos = append(infos, info) - } - if len(infos) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) - } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("foo") { + infos = append(infos, info) + } + if len(infos) != expected { + return fmt.Errorf("Expected %d consumers, got: %d", expected, len(infos)) + } + return nil + }) + } - return infos + deleteAllConsumers := func(t *testing.T) { + t.Helper() + for cn := range js.ConsumerNames("foo") { + js.DeleteConsumer("foo", cn) + } } js.Publish("foo.A", []byte("A")) @@ -3636,7 +3644,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { js.Publish("foo.C", []byte("C")) t.Run("consumers deleted on unsubscribe", func(t *testing.T) { - subA, err := js.SubscribeSync("foo.A") + subA, err := js.SubscribeSync("foo.A", nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3645,7 +3653,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Unexpected error: %v", err) } - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + subB, err := js.SubscribeSync("foo.B", nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3654,9 +3662,10 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 0) + deleteAllConsumers(t) }) - t.Run("attached pull consumer deleted on unsubscribe", func(t *testing.T) { + t.Run("not deleted on unsubscribe if option not set", func(t *testing.T) { // Created by JetStreamManagement if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ Durable: "wq", @@ -3684,11 +3693,12 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Expected %v, got %v", expected, got) } subC.Unsubscribe() - fetchConsumers(t, 0) + fetchConsumers(t, 1) + deleteAllConsumers(t) }) t.Run("ephemeral consumers deleted on drain", func(t *testing.T) { - subA, err := js.SubscribeSync("foo.A") + subA, err := js.Subscribe("foo.A", func(_ *nats.Msg) {}, nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3697,10 +3707,11 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 0) + deleteAllConsumers(t) }) - t.Run("durable consumers not deleted on drain", func(t *testing.T) { - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + t.Run("durable consumers deleted on drain", func(t *testing.T) { + subB, err := js.Subscribe("foo.B", func(_ *nats.Msg) {}, nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3708,10 +3719,10 @@ func TestJetStream_Unsubscribe(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %v", err) } - fetchConsumers(t, 1) + fetchConsumers(t, 0) }) - t.Run("reattached durable consumers not deleted on drain", func(t *testing.T) { + t.Run("durable consumers not deleted on drain if option not set", func(t *testing.T) { subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) if err != nil { t.Fatal(err) @@ -3721,6 +3732,7 @@ func TestJetStream_Unsubscribe(t *testing.T) { t.Errorf("Unexpected error: %v", err) } fetchConsumers(t, 1) + deleteAllConsumers(t) }) } @@ -3777,7 +3789,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = js.SubscribeSync("foo.C") + _, err = js.SubscribeSync("foo.C", nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3804,11 +3816,11 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - _, err = js.SubscribeSync("foo.A") + _, err = js.SubscribeSync("foo.A", nats.DeleteConsumer()) if err != nil { t.Fatalf("Unexpected error: %v", err) } - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + subB, err := js.SubscribeSync("foo.B", nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3850,7 +3862,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { // The durable interest remains so have to attach now, // otherwise would get a stream already used error. - subB, err := js.SubscribeSync("foo.B", nats.Durable("B")) + subB, err := js.SubscribeSync("foo.B", nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3872,7 +3884,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { jsm.Publish("foo.B", []byte("B.3")) // Attach again to the same subject with the durable. - dupSub, err := js.SubscribeSync("foo.B", nats.Durable("B")) + dupSub, err := js.SubscribeSync("foo.B", nats.Durable("B"), nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -3947,7 +3959,7 @@ func TestJetStream_UnsubscribeDeleteNoPermissions(t *testing.T) { } defer nc.Close() - js, err := nc.JetStream() + js, err := nc.JetStream(nats.MaxWait(time.Second)) if err != nil { t.Fatal(err) } @@ -3957,7 +3969,7 @@ func TestJetStream_UnsubscribeDeleteNoPermissions(t *testing.T) { }) js.Publish("foo", []byte("test")) - sub, err := js.SubscribeSync("foo") + sub, err := js.SubscribeSync("foo", nats.DeleteConsumer()) if err != nil { t.Fatal(err) } @@ -6417,3 +6429,79 @@ func TestJetStreamMaxMsgsPerSubject(t *testing.T) { }) } } + +func TestJetStreamDrainFailsToDeleteConsumer(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errCh := make(chan error, 1) + nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + select { + case errCh <- err: + default: + } + })) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js.Publish("foo", []byte("hi")) + + // Normally, we would not call AddConsumer externally and then + // create a subscription with DeleteConsumer. But here we do so + // in order to be able to delete the consumer while the Drain + // is in process and make sure that an async error is returned. + if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "dur", + FilterSubject: "foo", + DeliverSubject: "bar", + }); err != nil { + t.Fatalf("Error adding consumer: %v", err) + } + + blockCh := make(chan struct{}) + sub, err := js.Subscribe("foo", func(m *nats.Msg) { + <-blockCh + }, nats.Bind("TEST", "dur"), nats.Durable("dur"), nats.DeleteConsumer()) + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + + // Initiate the drain... it won't complete because we have blocked the + // message callback. + sub.Drain() + + // Now delete the JS consumer + if err := js.DeleteConsumer("TEST", "dur"); err != nil { + t.Fatalf("Error deleting consumer: %v", err) + } + + // Now unblock and make sure we get the async error + close(blockCh) + + select { + case err := <-errCh: + if !strings.Contains(err.Error(), "consumer not found") { + t.Fatalf("Unexpected async error: %v", err) + } + case <-time.After(time.Second): + t.Fatal("Did not get async error") + } +}