diff --git a/message/sequencer.go b/message/sequencer.go index f280a7ca..71b999c4 100644 --- a/message/sequencer.go +++ b/message/sequencer.go @@ -29,9 +29,10 @@ import ( // sufficient to directly read committed messages. When recovering from a // checkpoint, or if a very long sequence or old producer is acknowledged, it // may be necessary to start a replay of already-read messages. In this case: -// * QueueUncommitted will return QueueAckCommitReplay. -// * The client calls ReplayRange to determine the exact offset range required. -// * The client must then supply an appropriate Iterator to StartReplay. +// - QueueUncommitted will return QueueAckCommitReplay. +// - The client calls ReplayRange to determine the exact offset range required. +// - The client must then supply an appropriate Iterator to StartReplay. +// // Having done this, calls to Step may resume to drain messages. type Sequencer struct { // Dequeued is non-nil if (and only if) the Sequencer is in the @@ -50,10 +51,11 @@ type Sequencer struct { dequeuedClock Clock // Offsets of the next (uncommitted) message to process in each journal. offsets pb.Offsets - // Offsets read through as-of the last Checkpoint taken. - lastOffsets pb.Offsets // partials is partial and un-acknowledged sequences of JournalProducers. partials map[JournalProducer]*partialSeq + // pending is partial sequences which have been started or extended + // since the last Checkpoint was taken. + pending map[*partialSeq]struct{} // emit is an acknowledged message sequence which is ready for dequeue. emit *partialSeq replayIt Iterator // Iterator supplied to StartReplay. @@ -74,13 +76,13 @@ func NewSequencer(offsets pb.Offsets, states []ProducerState, buffer int) *Seque } var s = &Sequencer{ - offsets: offsets, - lastOffsets: offsets.Copy(), - partials: make(map[JournalProducer]*partialSeq, len(states)), - emit: nil, - ring: make([]Envelope, 0, buffer), - next: make([]int, 0, buffer), - head: 0, + offsets: offsets, + partials: make(map[JournalProducer]*partialSeq, len(states)), + pending: make(map[*partialSeq]struct{}), + emit: nil, + ring: make([]Envelope, 0, buffer), + next: make([]int, 0, buffer), + head: 0, } for _, state := range states { s.partials[state.JournalProducer] = &partialSeq{ @@ -280,6 +282,7 @@ func (w *Sequencer) QueueUncommitted(env Envelope) QueueOutcome { // Track an uncommitted, transactional span. if outcome == QueueContinueBeginSpan { partial.begin = env.Begin + w.pending[partial] = struct{}{} // Mark as pending since the last commit. } w.addAtHead(env, partial) partial.maxClock = clock @@ -297,6 +300,7 @@ func (w *Sequencer) QueueUncommitted(env Envelope) QueueOutcome { ringStart: -1, ringStop: -1, } + delete(w.pending, partial) // No longer pending. w.offsets[env.Journal.Name] = env.End case QueueAckEmpty, @@ -438,6 +442,7 @@ func (w *Sequencer) Step() error { ringStart: -1, ringStop: -1, } + delete(w.pending, w.emit) w.Dequeued = nil w.emit = nil @@ -475,12 +480,7 @@ func (w *Sequencer) StartReplay(it Iterator) { // Assuming liveness of producers, it hints that further messages are // forthcoming. func (w *Sequencer) HasPending() bool { - for jp, partial := range w.partials { - if partial.begin >= w.lastOffsets[jp.Journal] { - return true - } - } - return false + return len(w.pending) != 0 } // Checkpoint returns a snapshot of read-through offsets, journal producers, @@ -518,10 +518,12 @@ func (w *Sequencer) Checkpoint(pruneHorizon time.Duration) (pb.Offsets, []Produc } } - // Retain offsets at which this checkpoint was produced, - // for future evaluations of HasPending. - for j, o := range w.offsets { - w.lastOffsets[j] = o + // Clear partial sequences which were marked as pending between the last + // Checkpoint and now such that HasPending will no longer return true. + // A future consumer transaction will thus not block to wait for their + // completion, since they already didn't complete in *this* transaction. + for p := range w.pending { + delete(w.pending, p) } return w.offsets, states diff --git a/message/sequencer_test.go b/message/sequencer_test.go index a80871a8..c15688e5 100644 --- a/message/sequencer_test.go +++ b/message/sequencer_test.go @@ -11,7 +11,7 @@ import ( pb "go.gazette.dev/core/broker/protocol" ) -func TestSequencerRingAddAndEvictTwo(t *testing.T) { +func TestSequencerRingAddAndEvict(t *testing.T) { var ( seq = NewSequencer(nil, nil, 5) generate = newTestMsgGenerator() @@ -57,6 +57,7 @@ func TestSequencerRingAddAndEvictTwo(t *testing.T) { jpA: {jp: jpA, begin: e1.Begin, ringStart: 0, ringStop: 0, minClock: 99, maxClock: 100}, jpB: {jp: jpB, begin: e2.Begin, ringStart: 1, ringStop: 1, minClock: 199, maxClock: 200}, }, seq.partials) + require.Len(t, seq.pending, 2) // There are now two pending sequences. require.Equal(t, QueueContinueExtendSpan, seq.QueueUncommitted(e3)) // A. require.Equal(t, []Envelope{e1, e2, e3}, seq.ring) @@ -135,7 +136,7 @@ func TestSequencerRingAddAndEvictTwo(t *testing.T) { require.Equal(t, pb.Offsets{"test/journal": e11.End}, seq.offsets) } -func TestSequencerTxnSequenceCasesTwo(t *testing.T) { +func TestSequencerTxnSequenceCases(t *testing.T) { var ( generate = newTestMsgGenerator() seq = NewSequencer(nil, nil, 3) @@ -160,14 +161,14 @@ func TestSequencerTxnSequenceCasesTwo(t *testing.T) { QueueContinueTxnClockLarger, QueueAckCommitRing, }, - queueFoo(seq, a1, a2, a1Dup, a2Dup, a3ACK)) - expectDequeFoo(t, seq, a1, a2, a3ACK) + queue(seq, a1, a2, a1Dup, a2Dup, a3ACK)) + expectDeque(t, seq, a1, a2, a3ACK) require.False(t, seq.HasPending()) // Case: ACK w/o preceding CONTINUE. Unusual but allowed. var a4ACK = generate(A, 4, Flag_ACK_TXN) - require.Equal(t, []QueueOutcome{QueueAckEmpty}, queueFoo(seq, a4ACK)) - expectDequeFoo(t, seq, a4ACK) + require.Equal(t, []QueueOutcome{QueueAckEmpty}, queue(seq, a4ACK)) + expectDeque(t, seq, a4ACK) // Case: Partial ACK of preceding messages. var ( @@ -181,8 +182,9 @@ func TestSequencerTxnSequenceCasesTwo(t *testing.T) { QueueContinueExtendSpan, QueueAckCommitRing, }, - queueFoo(seq, a5, a7NoACK, a6ACK)) - expectDequeFoo(t, seq, a5, a6ACK) + queue(seq, a5, a7NoACK, a6ACK)) + expectDeque(t, seq, a5, a6ACK) + require.False(t, seq.HasPending()) // Case: Rollback with interleaved producer B. var ( @@ -201,8 +203,9 @@ func TestSequencerTxnSequenceCasesTwo(t *testing.T) { QueueContinueExtendSpan, QueueAckRollback, }, - queueFoo(seq, a7Rollback, b1, a7Rollback, a8Rollback, b2, a6Abort)) - require.Nil(t, seq.emit) // No messages to dequeue. + queue(seq, a7Rollback, b1, a7Rollback, a8Rollback, b2, a6Abort)) + require.Nil(t, seq.emit) // No messages to dequeue. + require.True(t, seq.HasPending()) // B is still pending. require.Equal(t, map[JournalProducer]*partialSeq{ jpA: {jp: jpA, begin: -1, ringStart: -1, ringStop: -1, minClock: 6, maxClock: 6}, @@ -211,9 +214,10 @@ func TestSequencerTxnSequenceCasesTwo(t *testing.T) { // Case: Interleaved producer ACKs. A replay is required due to eviction. var b3ACK = generate(B, 3, Flag_ACK_TXN) - require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queueFoo(seq, b3ACK)) - expectReplayFoo(t, seq, b1.Begin, b2.Begin, b1, a7Rollback, a8Rollback) - expectDequeFoo(t, seq, b1, b2, b3ACK) + require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queue(seq, b3ACK)) + expectReplay(t, seq, b1.Begin, b2.Begin, b1, a7Rollback, a8Rollback) + expectDeque(t, seq, b1, b2, b3ACK) + require.False(t, seq.HasPending()) // Case: Sequence which requires replay, with duplicates internal // to the sequence and from before it, which are encountered in @@ -239,9 +243,10 @@ func TestSequencerTxnSequenceCasesTwo(t *testing.T) { QueueContinueExtendSpan, QueueAckCommitReplay, }, - queueFoo(seq, b4, b1Dup, b4Dup, b5, b6, b2Dup, b7, b8ACK)) - expectReplayFoo(t, seq, b4.Begin, b6.Begin, b4, b1Dup, b4Dup, b5) - expectDequeFoo(t, seq, b4, b5, b6, b7, b8ACK) + queue(seq, b4, b1Dup, b4Dup, b5, b6, b2Dup, b7, b8ACK)) + expectReplay(t, seq, b4.Begin, b6.Begin, b4, b1Dup, b4Dup, b5) + expectDeque(t, seq, b4, b5, b6, b7, b8ACK) + require.False(t, seq.HasPending()) // Case: Partial rollback where all ring entries are skipped. var ( @@ -259,9 +264,9 @@ func TestSequencerTxnSequenceCasesTwo(t *testing.T) { QueueContinueExtendSpan, QueueAckCommitReplay, }, - queueFoo(seq, b9, b11NoACK, b12NoACK, b13NoACK, b10ACK)) - expectReplayFoo(t, seq, b9.Begin, b12NoACK.Begin, b9, b11NoACK) - expectDequeFoo(t, seq, b9, b10ACK) + queue(seq, b9, b11NoACK, b12NoACK, b13NoACK, b10ACK)) + expectReplay(t, seq, b9.Begin, b12NoACK.Begin, b9, b11NoACK) + expectDeque(t, seq, b9, b10ACK) require.False(t, seq.HasPending()) // Case: Interleaved ACK'd sequences requiring two replays. @@ -280,14 +285,14 @@ func TestSequencerTxnSequenceCasesTwo(t *testing.T) { QueueContinueExtendSpan, QueueAckCommitReplay, }, - queueFoo(seq, b11, a7, a8, b12, a9ACK)) - expectReplayFoo(t, seq, a7.Begin, a8.Begin, a7) - expectDequeFoo(t, seq, a7, a8, a9ACK) + queue(seq, b11, a7, a8, b12, a9ACK)) + expectReplay(t, seq, a7.Begin, a8.Begin, a7) + expectDeque(t, seq, a7, a8, a9ACK) require.True(t, seq.HasPending()) - require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queueFoo(seq, b13ACK)) - expectReplayFoo(t, seq, b11.Begin, b12.Begin, b11, a7, a8) - expectDequeFoo(t, seq, b11, b12, b13ACK) + require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queue(seq, b13ACK)) + expectReplay(t, seq, b11.Begin, b12.Begin, b11, a7, a8) + expectDeque(t, seq, b11, b12, b13ACK) require.False(t, seq.HasPending()) // Case: Reset to earlier ACK, followed by re-use of SeqNos. @@ -303,11 +308,12 @@ func TestSequencerTxnSequenceCasesTwo(t *testing.T) { QueueContinueBeginSpan, QueueAckCommitRing, }, - queueFoo(seq, b8ACKReset, b9Reuse, b10ACKReuse)) - expectDequeFoo(t, seq, b9Reuse, b10ACKReuse) + queue(seq, b8ACKReset, b9Reuse, b10ACKReuse)) + expectDeque(t, seq, b9Reuse, b10ACKReuse) + require.False(t, seq.HasPending()) } -func TestSequencerTxnWithoutBufferTwo(t *testing.T) { +func TestSequencerTxnWithoutBuffer(t *testing.T) { var ( generate = newTestMsgGenerator() seq = NewSequencer(nil, nil, 0) @@ -331,17 +337,19 @@ func TestSequencerTxnWithoutBufferTwo(t *testing.T) { QueueContinueTxnClockLarger, QueueAckCommitReplay, }, - queueFoo(seq, a1, a2, b1, a1Dup, a2Dup, a3ACK)) - expectReplayFoo(t, seq, a1.Begin, a3ACK.Begin, a1, a2, b1, a1Dup, a2Dup) - expectDequeFoo(t, seq, a1, a2, a3ACK) + queue(seq, a1, a2, b1, a1Dup, a2Dup, a3ACK)) + expectReplay(t, seq, a1.Begin, a3ACK.Begin, a1, a2, b1, a1Dup, a2Dup) + expectDeque(t, seq, a1, a2, a3ACK) + require.True(t, seq.HasPending()) // B still pending. require.Equal(t, []QueueOutcome{QueueContinueExtendSpan, QueueAckCommitReplay}, - queueFoo(seq, b2, b3ACK)) - expectReplayFoo(t, seq, b1.Begin, b3ACK.Begin, b1, a1Dup, a2Dup, a3ACK, b2) - expectDequeFoo(t, seq, b1, b2, b3ACK) + queue(seq, b2, b3ACK)) + expectReplay(t, seq, b1.Begin, b3ACK.Begin, b1, a1Dup, a2Dup, a3ACK, b2) + expectDeque(t, seq, b1, b2, b3ACK) + require.False(t, seq.HasPending()) } -func TestSequencerOutsideTxnCasesTwo(t *testing.T) { +func TestSequencerOutsideTxnCases(t *testing.T) { var ( generate = newTestMsgGenerator() seq = NewSequencer(nil, nil, 0) @@ -354,10 +362,11 @@ func TestSequencerOutsideTxnCasesTwo(t *testing.T) { a1 = generate(A, 1, Flag_OUTSIDE_TXN) a2 = generate(A, 2, Flag_OUTSIDE_TXN) ) - require.Equal(t, []QueueOutcome{QueueOutsideCommit}, queueFoo(seq, a1)) - expectDequeFoo(t, seq, a1) - require.Equal(t, []QueueOutcome{QueueOutsideCommit}, queueFoo(seq, a2)) - expectDequeFoo(t, seq, a2) + require.Equal(t, []QueueOutcome{QueueOutsideCommit}, queue(seq, a1)) + expectDeque(t, seq, a1) + require.Equal(t, []QueueOutcome{QueueOutsideCommit}, queue(seq, a2)) + expectDeque(t, seq, a2) + require.False(t, seq.HasPending()) require.Equal(t, pb.Offsets{"test/journal": a2.End}, seq.offsets) @@ -370,7 +379,7 @@ func TestSequencerOutsideTxnCasesTwo(t *testing.T) { []QueueOutcome{ QueueOutsideAlreadyAcked, QueueOutsideAlreadyAcked, - }, queueFoo(seq, a1Dup, a2Dup)) + }, queue(seq, a1Dup, a2Dup)) require.Nil(t, seq.emit) // No messages to dequeue. require.Equal(t, pb.Offsets{"test/journal": a2Dup.End}, seq.offsets) @@ -386,8 +395,9 @@ func TestSequencerOutsideTxnCasesTwo(t *testing.T) { QueueContinueBeginSpan, QueueContinueExtendSpan, QueueOutsideCommit, - }, queueFoo(seq, a3Discard, a4Discard, a5)) - expectDequeFoo(t, seq, a5) + }, queue(seq, a3Discard, a4Discard, a5)) + expectDeque(t, seq, a5) + require.False(t, seq.HasPending()) require.Equal(t, pb.Offsets{"test/journal": a5.End}, seq.offsets) @@ -401,10 +411,10 @@ func TestSequencerOutsideTxnCasesTwo(t *testing.T) { []QueueOutcome{ QueueContinueBeginSpan, QueueOutsideCommit, - }, queueFoo(seq, a6Discard, a7BadBits)) - expectDequeFoo(t, seq, a7BadBits) + }, queue(seq, a6Discard, a7BadBits)) + expectDeque(t, seq, a7BadBits) - require.Equal(t, []QueueOutcome{QueueOutsideAlreadyAcked}, queueFoo(seq, a7BadBitsDup)) + require.Equal(t, []QueueOutcome{QueueOutsideAlreadyAcked}, queue(seq, a7BadBitsDup)) require.Nil(t, seq.emit) // No messages to dequeue. // Case: Messages with a zero UUID always dequeue. @@ -415,10 +425,11 @@ func TestSequencerOutsideTxnCasesTwo(t *testing.T) { z1.SetUUID(UUID{}) z2.SetUUID(UUID{}) - require.Equal(t, []QueueOutcome{QueueOutsideCommit}, queueFoo(seq, z1)) - expectDequeFoo(t, seq, z1) - require.Equal(t, []QueueOutcome{QueueOutsideCommit}, queueFoo(seq, z2)) - expectDequeFoo(t, seq, z2) + require.Equal(t, []QueueOutcome{QueueOutsideCommit}, queue(seq, z1)) + expectDeque(t, seq, z1) + require.Equal(t, []QueueOutcome{QueueOutsideCommit}, queue(seq, z2)) + expectDeque(t, seq, z2) + require.False(t, seq.HasPending()) // A producer for the zero-valued UUID is not tracked, but it still updates offsets. require.Equal(t, map[JournalProducer]*partialSeq{ @@ -427,7 +438,7 @@ func TestSequencerOutsideTxnCasesTwo(t *testing.T) { require.Equal(t, pb.Offsets{"test/journal": z2.End}, seq.offsets) } -func TestSequencerProducerStatesRoundTripTwo(t *testing.T) { +func TestSequencerProducerStatesRoundTrip(t *testing.T) { var ( generate = newTestMsgGenerator() seq1 = NewSequencer(nil, nil, 12) @@ -455,13 +466,14 @@ func TestSequencerProducerStatesRoundTripTwo(t *testing.T) { QueueContinueBeginSpan, QueueContinueExtendSpan, QueueAckEmpty, - }, queueFoo(seq1, a1, a2, b1, b2, c1ACK)) - expectDequeFoo(t, seq1, c1ACK) + }, queue(seq1, a1, a2, b1, b2, c1ACK)) + expectDeque(t, seq1, c1ACK) + require.True(t, seq1.HasPending()) require.Equal(t, []QueueOutcome{ QueueOutsideCommit, - }, queueFoo(seq1, z1)) - expectDequeFoo(t, seq1, z1) + }, queue(seq1, z1)) + expectDeque(t, seq1, z1) // Take a checkpoint. The act of taking one causes pending // sequences to no longer be pending. To be pending, they must @@ -473,6 +485,7 @@ func TestSequencerProducerStatesRoundTripTwo(t *testing.T) { // Recover a new Sequencer from persisted states & offsets. var seq2 = NewSequencer(offsets, states, 12) var _, states2 = seq2.Checkpoint(0) + require.False(t, seq2.HasPending()) // Expect |seq1| and |seq2| now produce identical states. var expect = []ProducerState{ @@ -493,26 +506,26 @@ func TestSequencerProducerStatesRoundTripTwo(t *testing.T) { // Expect both Sequencers produce the same output from here, // though |seq2| requires replays while |seq1| does not. - require.Equal(t, []QueueOutcome{QueueAckCommitRing}, queueFoo(seq1, b3ACK)) - require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queueFoo(seq2, b3ACK)) - expectReplayFoo(t, seq2, b1.Begin, b3ACK.Begin, b1, b2, c1ACK) + require.Equal(t, []QueueOutcome{QueueAckCommitRing}, queue(seq1, b3ACK)) + require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queue(seq2, b3ACK)) + expectReplay(t, seq2, b1.Begin, b3ACK.Begin, b1, b2, c1ACK) - expectDequeFoo(t, seq1, b1, b2, b3ACK) - expectDequeFoo(t, seq2, b1, b2, b3ACK) + expectDeque(t, seq1, b1, b2, b3ACK) + expectDeque(t, seq2, b1, b2, b3ACK) - require.Equal(t, []QueueOutcome{QueueContinueBeginSpan, QueueAckRollback}, queueFoo(seq1, c2, c1Rollback)) - require.Equal(t, []QueueOutcome{QueueContinueBeginSpan, QueueAckRollback}, queueFoo(seq2, c2, c1Rollback)) + require.Equal(t, []QueueOutcome{QueueContinueBeginSpan, QueueAckRollback}, queue(seq1, c2, c1Rollback)) + require.Equal(t, []QueueOutcome{QueueContinueBeginSpan, QueueAckRollback}, queue(seq2, c2, c1Rollback)) // No messages to dequeue. require.Nil(t, seq1.emit) require.Nil(t, seq2.emit) - require.Equal(t, []QueueOutcome{QueueAckCommitRing}, queueFoo(seq1, a3ACK)) - require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queueFoo(seq2, a3ACK)) - expectReplayFoo(t, seq2, a1.Begin, a3ACK.Begin, a1, a2, b1, b2, c1ACK) + require.Equal(t, []QueueOutcome{QueueAckCommitRing}, queue(seq1, a3ACK)) + require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queue(seq2, a3ACK)) + expectReplay(t, seq2, a1.Begin, a3ACK.Begin, a1, a2, b1, b2, c1ACK) - expectDequeFoo(t, seq1, a1, a2, a3ACK) - expectDequeFoo(t, seq2, a1, a2, a3ACK) + expectDeque(t, seq1, a1, a2, a3ACK) + expectDeque(t, seq2, a1, a2, a3ACK) } func TestSequencerProducerStatesRoundTripDuringDequeue(t *testing.T) { @@ -534,7 +547,7 @@ func TestSequencerProducerStatesRoundTripDuringDequeue(t *testing.T) { QueueContinueBeginSpan, QueueContinueExtendSpan, QueueAckCommitRing, - }, queueFoo(seq1, a1, a2, a3ACK)) + }, queue(seq1, a1, a2, a3ACK)) require.NotNil(t, seq1.emit) require.NoError(t, seq1.Step()) // Step to a1. @@ -558,14 +571,14 @@ func TestSequencerProducerStatesRoundTripDuringDequeue(t *testing.T) { } // |seq2| begins by reading a3ACK again. - require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queueFoo(seq2, a3ACK)) - expectReplayFoo(t, seq2, a1.Begin, a3ACK.Begin, a1, a2) + require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queue(seq2, a3ACK)) + expectReplay(t, seq2, a1.Begin, a3ACK.Begin, a1, a2) // Now both sequencers are ready for dequeue. require.NoError(t, seq1.Step()) // Step to a2. require.NoError(t, seq1.Step()) // Step to a3ACK. require.Equal(t, io.EOF, seq1.Step()) // Done. - expectDequeFoo(t, seq2, a1, a2, a3ACK) + expectDeque(t, seq2, a1, a2, a3ACK) // Suppose |seq1| generates a checkpoint partway through // dequeue of the next sequence. @@ -573,7 +586,7 @@ func TestSequencerProducerStatesRoundTripDuringDequeue(t *testing.T) { QueueContinueBeginSpan, QueueContinueExtendSpan, QueueAckCommitRing, - }, queueFoo(seq1, a4, a5, a6ACK)) + }, queue(seq1, a4, a5, a6ACK)) require.NotNil(t, seq1.emit) require.NoError(t, seq1.Step()) // Step to a4. @@ -596,12 +609,12 @@ func TestSequencerProducerStatesRoundTripDuringDequeue(t *testing.T) { } // |seq2| begins by reading a6ACK again, then dequeues from a5 (and not a4). - require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queueFoo(seq2, a6ACK)) - expectReplayFoo(t, seq2, a5.Begin, a6ACK.Begin, a5) - expectDequeFoo(t, seq2, a5, a6ACK) + require.Equal(t, []QueueOutcome{QueueAckCommitReplay}, queue(seq2, a6ACK)) + expectReplay(t, seq2, a5.Begin, a6ACK.Begin, a5) + expectDeque(t, seq2, a5, a6ACK) } -func TestSequencerProducerPruningFoo(t *testing.T) { +func TestSequencerProducerPruning(t *testing.T) { var ( generate = newTestMsgGenerator() seq1 = NewSequencer(nil, nil, 12) @@ -620,10 +633,10 @@ func TestSequencerProducerPruningFoo(t *testing.T) { QueueContinueBeginSpan, QueueContinueBeginSpan, QueueAckCommitRing, - }, queueFoo(seq1, aCont, bCont, bACK)) - expectDequeFoo(t, seq1, bCont, bACK) + }, queue(seq1, aCont, bCont, bACK)) + expectDeque(t, seq1, bCont, bACK) - require.Equal(t, []QueueOutcome{QueueContinueBeginSpan}, queueFoo(seq1, cCont)) + require.Equal(t, []QueueOutcome{QueueContinueBeginSpan}, queue(seq1, cCont)) var expect = func(a, b []ProducerState) { sort.Slice(a, func(i, j int) bool { @@ -660,7 +673,7 @@ func TestSequencerProducerPruningFoo(t *testing.T) { require.Len(t, seq1.partials, 1) } -func TestSequencerReplayReaderErrorsTwo(t *testing.T) { +func TestSequencerReplayReaderErrors(t *testing.T) { var A, B = NewProducerID(), NewProducerID() var cases = []struct { wrap func(Iterator) func() (Envelope, error) @@ -721,8 +734,8 @@ func TestSequencerReplayReaderErrorsTwo(t *testing.T) { QueueContinueBeginSpan, QueueContinueBeginSpan, QueueAckCommitReplay, - }, queueFoo(seq, b1, a2, a3ACK)) - expectReplayFoo(t, seq, a2.Begin, a3ACK.Begin, a2) + }, queue(seq, b1, a2, a3ACK)) + expectReplay(t, seq, a2.Begin, a3ACK.Begin, a2) seq.replayIt = fnIterator(tc.wrap(seq.replayIt)) var err = seq.Step() @@ -739,7 +752,7 @@ type fnIterator func() (Envelope, error) func (fn fnIterator) Next() (Envelope, error) { return fn() } -func queueFoo(seq *Sequencer, envs ...Envelope) []QueueOutcome { +func queue(seq *Sequencer, envs ...Envelope) []QueueOutcome { var out = make([]QueueOutcome, len(envs)) for i, e := range envs { out[i] = seq.QueueUncommitted(e) @@ -747,7 +760,7 @@ func queueFoo(seq *Sequencer, envs ...Envelope) []QueueOutcome { return out } -func expectDequeFoo(t *testing.T, seq *Sequencer, expect ...Envelope) { +func expectDeque(t *testing.T, seq *Sequencer, expect ...Envelope) { for i := 0; i != len(expect); i++ { require.NoError(t, seq.Step()) require.Equal(t, &expect[i], seq.Dequeued) @@ -785,7 +798,7 @@ func expectDequeFoo(t *testing.T, seq *Sequencer, expect ...Envelope) { require.Equal(t, last.End, seq.offsets[last.Journal.Name]) } -func expectReplayFoo(t *testing.T, seq *Sequencer, expectBegin, expectEnd pb.Offset, envs ...Envelope) { +func expectReplay(t *testing.T, seq *Sequencer, expectBegin, expectEnd pb.Offset, envs ...Envelope) { var journal, begin, end = seq.ReplayRange() require.Equal(t, envs[0].Journal.Name, journal) require.Equal(t, expectBegin, begin)