-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IMPROVED] Fetch and FetchBatch for draining and closed subscriptions #1582
Conversation
7458bb1
to
4daf90f
Compare
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
4daf90f
to
f069202
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -2861,7 +2861,13 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { | |||
} | |||
var hbTimer *time.Timer | |||
var hbErr error | |||
if err == nil && len(msgs) < batch { | |||
sub.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe extract following logic
sub.mu.Lock()
subClosed := sub.closed || sub.draining
sub.mu.Unlock()
in a new function inside Subscription e.g. isClosedOrDraining()
or similar
and reuse it in both places: Fetch
and FetchBatch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's such a tiny piece of logic only used in 2 places, I would honestly just leave it inlined for clarity :)
With those kinds of checks I usually wait until I have to repeat it at least 3 times before extracting to a separate method, but that's just preference.
|
||
// now drain the subscription, messages should be in the buffer | ||
sub.Drain() | ||
msgs, err := sub.Fetch(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling Fetch
immediately after Drain
returns messages instead of an error. Is this normal behaviour and why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drain
is not blocking, so calling sub.Drain()
essentially just starts the draining process without waiting for completion. Fetch()
first checks if there are any messages in the buffer (those messages should be accessible while draining) and returns if there are any - it will never send a new request when draining though.
Only when we're fully drained/closed and there are no more messages waiting in the buffer we're returning an error. This tests simulates this behavior.
…#1582) Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
…#1582) Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
…#1582) Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
…#1582) Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Signed-off-by: Piotr Piotrowski piotr@synadia.com