Skip to content

Commit

Permalink
fix(mock consumer): HighWaterMarkOffset (#2447)
Browse files Browse the repository at this point in the history
  • Loading branch information
gr8web authored Mar 23, 2023
1 parent 7dbf0b5 commit 2f8dcd0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
2 changes: 1 addition & 1 deletion mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
}

func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
return atomic.LoadInt64(&pc.highWaterMarkOffset)
}

// Pause implements the Pause method from the sarama.PartitionConsumer interface.
Expand Down
13 changes: 9 additions & 4 deletions mocks/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) {
t.Error("Expected sarama.ErrOutOfBrokers, found:", test0_err.Err)
}

if pc_test0.HighWaterMarkOffset() != 1 {
if pc_test0.HighWaterMarkOffset() != 0 {
t.Error("High water mark offset with value different from the expected: ", pc_test0.HighWaterMarkOffset())
}

Expand All @@ -112,7 +112,7 @@ func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) {
t.Error("Message was not as expected:", test1_msg)
}

if pc_test1.HighWaterMarkOffset() != 2 {
if pc_test1.HighWaterMarkOffset() != 1 {
t.Error("High water mark offset with value different from the expected: ", pc_test1.HighWaterMarkOffset())
}

Expand All @@ -125,7 +125,7 @@ func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) {
t.Error("Message was not as expected:", other0_msg)
}

if pc_other0.HighWaterMarkOffset() != AnyOffset+2 {
if pc_other0.HighWaterMarkOffset() != AnyOffset+1 {
t.Error("High water mark offset with value different from the expected: ", pc_other0.HighWaterMarkOffset())
}

Expand All @@ -140,7 +140,7 @@ func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) {
t.Error("Message was not as expected:", test0_msg2)
}

if pc_test0.HighWaterMarkOffset() != 3 {
if pc_test0.HighWaterMarkOffset() != 2 {
t.Error("High water mark offset with value different from the expected: ", pc_test0.HighWaterMarkOffset())
}
}
Expand Down Expand Up @@ -395,6 +395,11 @@ func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) {
if len(trm.errors) != 0 {
t.Errorf("Expected to not report any errors, found: %v", trm.errors)
}

if pc.HighWaterMarkOffset() != message2.Offset+1 {
diff := pc.HighWaterMarkOffset() - message2.Offset
t.Errorf("Difference between highwatermarkoffset and last message offset greater than 1, got: %v", diff)
}
}

func TestConsumerInvalidConfiguration(t *testing.T) {
Expand Down

0 comments on commit 2f8dcd0

Please sign in to comment.