Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Possible lock inversion and incorrect delete of JS consumer #792

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 72 additions & 85 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,6 @@ type nextRequest struct {

// jsSub includes JetStream subscription info.
type jsSub struct {
mu sync.RWMutex
js *js

// For pull subscribers, this is the next message subject to send requests to.
Expand All @@ -823,7 +822,7 @@ type jsSub struct {
deliver string
pull bool
durable bool
attached bool
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
delCons bool

// Ordered consumers
ordered bool
Expand All @@ -838,27 +837,34 @@ type jsSub struct {
active bool
fc bool
cmeta string
fcs map[uint64]string
fcr string
fcd uint64
}

func (jsi *jsSub) unsubscribe(drainMode bool) error {
jsi.mu.Lock()
durable, attached := jsi.durable, jsi.attached
func (sub *Subscription) jsiUnsubscribe(jsi *jsSub, drainMode bool) error {
sub.mu.Lock()
durable, delCons := jsi.durable, jsi.delCons
stream, consumer := jsi.stream, jsi.consumer
js := jsi.js
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
}
jsi.mu.Unlock()
sub.mu.Unlock()

if drainMode && (durable || attached) {
// Skip deleting consumer for durables/attached
// consumers when using drain mode.
// Delete the JS consumer only if the library created the JS consumer,
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
// in which case delCons==true, but even then, in drain mode if this
// is a durable, do not delete.
if !delCons || (drainMode && durable) {
return nil
}

return js.DeleteConsumer(stream, consumer)
// We don't want to possibly fail a drain because we were not able to
// delete the consumer.
err := js.DeleteConsumer(stream, consumer)
if drainMode {
return nil
}
return err
}

// SubOpt configures options for subscribing to JetStream consumers.
Expand Down Expand Up @@ -941,7 +947,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
Expand Down Expand Up @@ -1019,7 +1024,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
case info != nil:
// Attach using the found consumer config.
ccfg = &info.Config
attached = true

// Make sure this new subject matches or is a subset.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
Expand Down Expand Up @@ -1104,7 +1108,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
stream: stream,
consumer: consumer,
durable: isDurable,
attached: attached,
deliver: deliver,
hbs: hasHeartbeats,
hbi: o.cfg.Heartbeat,
Expand Down Expand Up @@ -1196,9 +1199,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, ErrSubjectMismatch
}

// Update attached status.
jsi.attached = true

// Use the deliver subject from latest consumer config to attach.
if info.Config.DeliverSubject != _EMPTY_ {
// We can't reuse the channel, so if one was passed, we need to create a new one.
Expand All @@ -1222,10 +1222,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
} else if consumer == _EMPTY_ {
// Update our consumer name here which is filled in when we create the consumer.
} else {
// Since we created the JS consumer internally, mark that we should
// delete it on Unsubscribe().
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
sub.mu.Lock()
sub.jsi.consumer = info.Name
sub.jsi.delCons = true
// Update our consumer name here which is filled in when we create the consumer.
if consumer == _EMPTY_ {
sub.jsi.consumer = info.Name
}
sub.mu.Unlock()
}
}
Expand Down Expand Up @@ -1267,10 +1272,11 @@ func isControlMessage(msg *Msg) bool {
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
}

func (jsi *jsSub) trackSequences(reply string) {
jsi.mu.Lock()
jsi.cmeta = reply
jsi.mu.Unlock()
// Keeps track of the incoming message's reply subject so that the consumer's
// state (deliver sequence, etc..) can be checked against heartbeats.
// Runs under the subscription lock
func (sub *Subscription) trackSequences(reply string) {
sub.jsi.cmeta = reply
}

// Check to make sure messages are arriving in order.
Expand All @@ -1290,16 +1296,12 @@ func (sub *Subscription) checkOrderedMsgs(m *Msg) bool {
sseq, dseq := uint64(parseNum(tokens[5])), uint64(parseNum(tokens[6]))

jsi := sub.jsi
jsi.mu.Lock()
if dseq != jsi.dseq {
rseq := jsi.sseq + 1
jsi.mu.Unlock()
sub.resetOrderedConsumer(rseq)
sub.resetOrderedConsumer(jsi.sseq + 1)
return true
}
// Update our tracking here.
jsi.dseq, jsi.sseq = dseq+1, sseq
jsi.mu.Unlock()
return false
}

Expand Down Expand Up @@ -1328,8 +1330,7 @@ func (sub *Subscription) applyNewSID() (osid int64) {
// Lock should be held.
func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc := sub.conn
closed := sub.closed
if sub.jsi == nil || nc == nil || closed {
if sub.jsi == nil || nc == nil || sub.closed {
return
}

Expand All @@ -1340,8 +1341,8 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
newDeliver := nc.newInbox()
sub.Subject = newDeliver

// Snapshot jsi under sub lock here.
jsi := sub.jsi
// Snapshot the new sid under sub lock.
nsid := sub.sid

// We are still in the low level readloop for the connection so we need
// to spin a go routine to try to create the new consumer.
Expand All @@ -1351,7 +1352,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
// This is done here in this go routine to prevent lock inversion.
nc.mu.Lock()
nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_))
nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, sub.sid))
nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid))
nc.kickFlusher()
nc.mu.Unlock()

Expand All @@ -1360,11 +1361,12 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc.unsubscribe(sub, 0, true)
}

jsi.mu.Lock()
sub.mu.Lock()
jsi := sub.jsi
// Reset some items in jsi.
jsi.dseq = 1
jsi.cmeta = _EMPTY_
jsi.fcs = nil
jsi.fcr, jsi.fcd = _EMPTY_, 0
jsi.deliver = newDeliver
// Reset consumer request for starting policy.
cfg := jsi.ccreq.Config
Expand All @@ -1375,7 +1377,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream)
j, err := json.Marshal(jsi.ccreq)
js := jsi.js
jsi.mu.Unlock()
sub.mu.Unlock()

if err != nil {
pushErr(err)
Expand Down Expand Up @@ -1403,86 +1405,71 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
return
}

jsi.mu.Lock()
sub.mu.Lock()
jsi.consumer = cinfo.Name
jsi.mu.Unlock()
sub.mu.Unlock()
}()
}

// checkForFlowControlResponse will check to see if we should send a flow control response
// based on the delivered index.
// Lock should be held.
func (sub *Subscription) checkForFlowControlResponse(delivered uint64) {
jsi, nc := sub.jsi, sub.conn
if jsi == nil {
return
}

jsi.mu.Lock()
defer jsi.mu.Unlock()

if len(jsi.fcs) == 0 {
return
}

if reply := jsi.fcs[delivered]; reply != _EMPTY_ {
delete(jsi.fcs, delivered)
nc.Publish(reply, nil)
// based on the subscription current delivered index and the target.
// Runs under subscription lock
func (sub *Subscription) checkForFlowControlResponse() string {
// Caller has verified that there is a sub.jsi and fc
jsi := sub.jsi
if jsi.fcd == sub.delivered {
fcr := jsi.fcr
jsi.fcr, jsi.fcd = _EMPTY_, 0
return fcr
}
return _EMPTY_
}

// Record an inbound flow control message.
func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) {
jsi.mu.Lock()
if jsi.fcs == nil {
jsi.fcs = make(map[uint64]string)
}
jsi.fcs[dfuture] = reply
jsi.mu.Unlock()
// Runs under subscription lock
func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) {
jsi := sub.jsi
jsi.fcr, jsi.fcd = reply, dfuture
}

// Checks for activity from our consumer.
// If we do not think we are active send an async error.
func (sub *Subscription) activityCheck() {
sub.mu.Lock()
jsi := sub.jsi
if jsi == nil {
sub.mu.Unlock()
return
}

jsi.mu.Lock()
active := jsi.active
jsi.hbc.Reset(jsi.hbi)
jsi.active = false
jsi.mu.Unlock()

if !active {
sub.mu.Lock()
nc := sub.conn
closed := sub.closed
sub.mu.Unlock()
nc := sub.conn
closed := sub.closed
sub.mu.Unlock()

if !closed {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
if !active && !closed {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
}
}

// scheduleHeartbeatCheck sets up the timer check to make sure we are active
// or receiving idle heartbeats..
func (sub *Subscription) scheduleHeartbeatCheck() {
sub.mu.Lock()
defer sub.mu.Unlock()

jsi := sub.jsi
if jsi == nil {
return
}

jsi.mu.Lock()
defer jsi.mu.Unlock()

if jsi.hbc == nil {
jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck)
} else {
Expand All @@ -1503,10 +1490,10 @@ func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
// checkForSequenceMismatch will make sure we have not missed any messages since last seen.
func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
// Process heartbeat received, get latest control metadata if present.
jsi.mu.Lock()
s.mu.Lock()
ctrl, ordered := jsi.cmeta, jsi.ordered
jsi.active = true
jsi.mu.Unlock()
s.mu.Unlock()

if ctrl == _EMPTY_ {
return
Expand Down
Loading