Skip to content

Commit

Permalink
Merge pull request #614 from nats-io/jsi
Browse files Browse the repository at this point in the history
Assign jsSub during low level subscription
  • Loading branch information
derekcollison authored Dec 4, 2020
2 parents 0207179 + 090c71e commit 55cabbc
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat
inbox := NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
cbValue.Call(oV)
}

return c.Conn.subscribe(subject, queue, natsCB, nil, false)
return c.Conn.subscribe(subject, queue, natsCB, nil, false, nil)
}

// FlushTimeout allows a Flush operation to have an associated timeout.
Expand Down
3 changes: 1 addition & 2 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
cb = func(m *Msg) { ocb(m); m.Ack() }
}

sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil)
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
if err != nil {
return nil, err
}
sub.jsi = &jsSub{js: js}

// If we are creating or updating let's process that request.
if shouldCreate {
Expand Down
26 changes: 13 additions & 13 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3082,7 +3082,7 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
// Create the response subscription we will use for all new style responses.
// This will be on an _INBOX with an additional terminal token. The subscription
// will be on a wildcard.
s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, false)
s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, false, nil)
if err != nil {
nc.mu.Unlock()
return nil, token, err
Expand Down Expand Up @@ -3186,7 +3186,7 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration)
inbox := NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3274,14 +3274,14 @@ func (nc *Conn) respToken(respInbox string) string {
// can have wildcards (partial:*, full:>). Messages will be delivered
// to the associated MsgHandler.
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, cb, nil, false)
return nc.subscribe(subj, _EMPTY_, cb, nil, false, nil)
}

// ChanSubscribe will express interest in the given subject and place
// all messages received on the channel.
// You should not close the channel until sub.Unsubscribe() has been called.
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, nil, ch, false)
return nc.subscribe(subj, _EMPTY_, nil, ch, false, nil)
}

// ChanQueueSubscribe will express interest in the given subject.
Expand All @@ -3291,7 +3291,7 @@ func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than QueueSubscribeSyncWithChan.
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, group, nil, ch, false)
return nc.subscribe(subj, group, nil, ch, false, nil)
}

// SubscribeSync will express interest on the given subject. Messages will
Expand All @@ -3301,7 +3301,7 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
return nil, ErrInvalidConnection
}
mch := make(chan *Msg, nc.Opts.SubChanLen)
s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true)
s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true, nil)
return s, e
}

Expand All @@ -3310,7 +3310,7 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
// only one member of the group will be selected to receive any given
// message asynchronously.
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, queue, cb, nil, false)
return nc.subscribe(subj, queue, cb, nil, false, nil)
}

// QueueSubscribeSync creates a synchronous queue subscriber on the given
Expand All @@ -3319,7 +3319,7 @@ func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription
// given message synchronously using Subscription.NextMsg().
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
mch := make(chan *Msg, nc.Opts.SubChanLen)
s, e := nc.subscribe(subj, queue, nil, mch, true)
s, e := nc.subscribe(subj, queue, nil, mch, true, nil)
return s, e
}

Expand All @@ -3330,7 +3330,7 @@ func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than ChanQueueSubscribe.
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, queue, nil, ch, false)
return nc.subscribe(subj, queue, nil, ch, false, nil)
}

// badSubject will do quick test on whether a subject is acceptable.
Expand All @@ -3354,16 +3354,16 @@ func badQueue(qname string) bool {
}

// subscribe is the internal subscribe function that indicates interest in a subject.
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool) (*Subscription, error) {
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
nc.mu.Lock()
defer nc.mu.Unlock()
return nc.subscribeLocked(subj, queue, cb, ch, isSync)
return nc.subscribeLocked(subj, queue, cb, ch, isSync, js)
}

func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool) (*Subscription, error) {
func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
Expand All @@ -3386,7 +3386,7 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
return nil, ErrBadSubscription
}

sub := &Subscription{Subject: subj, Queue: queue, mcb: cb, conn: nc}
sub := &Subscription{Subject: subj, Queue: queue, mcb: cb, conn: nc, jsi: js}
// Set pending limits.
if ch != nil {
sub.pMsgsLimit = cap(ch)
Expand Down
2 changes: 1 addition & 1 deletion netchan.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,5 @@ func (c *EncodedConn) bindRecvChan(subject, queue string, channel interface{}) (
chVal.Send(oPtr)
}

return c.Conn.subscribe(subject, queue, cb, nil, false)
return c.Conn.subscribe(subject, queue, cb, nil, false, nil)
}

0 comments on commit 55cabbc

Please sign in to comment.