Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

message: Sequencer incrementally tracks pending partial sequences #345

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions message/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
// sufficient to directly read committed messages. When recovering from a
// checkpoint, or if a very long sequence or old producer is acknowledged, it
// may be necessary to start a replay of already-read messages. In this case:
// * QueueUncommitted will return QueueAckCommitReplay.
// * The client calls ReplayRange to determine the exact offset range required.
// * The client must then supply an appropriate Iterator to StartReplay.
// - QueueUncommitted will return QueueAckCommitReplay.
// - The client calls ReplayRange to determine the exact offset range required.
// - The client must then supply an appropriate Iterator to StartReplay.
//
// Having done this, calls to Step may resume to drain messages.
type Sequencer struct {
// Dequeued is non-nil if (and only if) the Sequencer is in the
Expand All @@ -50,10 +51,11 @@ type Sequencer struct {
dequeuedClock Clock
// Offsets of the next (uncommitted) message to process in each journal.
offsets pb.Offsets
// Offsets read through as-of the last Checkpoint taken.
lastOffsets pb.Offsets
// partials is partial and un-acknowledged sequences of JournalProducers.
partials map[JournalProducer]*partialSeq
// pending is partial sequences which have been started or extended
// since the last Checkpoint was taken.
pending map[*partialSeq]struct{}
// emit is an acknowledged message sequence which is ready for dequeue.
emit *partialSeq
replayIt Iterator // Iterator supplied to StartReplay.
Expand All @@ -74,13 +76,13 @@ func NewSequencer(offsets pb.Offsets, states []ProducerState, buffer int) *Seque
}

var s = &Sequencer{
offsets: offsets,
lastOffsets: offsets.Copy(),
partials: make(map[JournalProducer]*partialSeq, len(states)),
emit: nil,
ring: make([]Envelope, 0, buffer),
next: make([]int, 0, buffer),
head: 0,
offsets: offsets,
partials: make(map[JournalProducer]*partialSeq, len(states)),
pending: make(map[*partialSeq]struct{}),
emit: nil,
ring: make([]Envelope, 0, buffer),
next: make([]int, 0, buffer),
head: 0,
}
for _, state := range states {
s.partials[state.JournalProducer] = &partialSeq{
Expand Down Expand Up @@ -280,6 +282,7 @@ func (w *Sequencer) QueueUncommitted(env Envelope) QueueOutcome {
// Track an uncommitted, transactional span.
if outcome == QueueContinueBeginSpan {
partial.begin = env.Begin
w.pending[partial] = struct{}{} // Mark as pending since the last commit.
}
w.addAtHead(env, partial)
partial.maxClock = clock
Expand All @@ -297,6 +300,7 @@ func (w *Sequencer) QueueUncommitted(env Envelope) QueueOutcome {
ringStart: -1,
ringStop: -1,
}
delete(w.pending, partial) // No longer pending.
w.offsets[env.Journal.Name] = env.End

case QueueAckEmpty,
Expand Down Expand Up @@ -438,6 +442,7 @@ func (w *Sequencer) Step() error {
ringStart: -1,
ringStop: -1,
}
delete(w.pending, w.emit)

w.Dequeued = nil
w.emit = nil
Expand Down Expand Up @@ -475,12 +480,7 @@ func (w *Sequencer) StartReplay(it Iterator) {
// Assuming liveness of producers, it hints that further messages are
// forthcoming.
func (w *Sequencer) HasPending() bool {
for jp, partial := range w.partials {
if partial.begin >= w.lastOffsets[jp.Journal] {
return true
}
}
return false
return len(w.pending) != 0
}

// Checkpoint returns a snapshot of read-through offsets, journal producers,
Expand Down Expand Up @@ -518,10 +518,12 @@ func (w *Sequencer) Checkpoint(pruneHorizon time.Duration) (pb.Offsets, []Produc
}
}

// Retain offsets at which this checkpoint was produced,
// for future evaluations of HasPending.
for j, o := range w.offsets {
w.lastOffsets[j] = o
// Clear partial sequences which were marked as pending between the last
// Checkpoint and now such that HasPending will no longer return true.
// A future consumer transaction will thus not block to wait for their
// completion, since they already didn't complete in *this* transaction.
for p := range w.pending {
delete(w.pending, p)
}

return w.offsets, states
Expand Down
Loading
Loading