Skip to content

Commit

Permalink
Add test for partitionConsumer HWMO before consumption
Browse files Browse the repository at this point in the history
This will fail now, because HWMO is not set correctly
  • Loading branch information
grongor committed Dec 5, 2021
1 parent aca9acd commit 1c4079e
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func TestConsumerOffsetManual(t *testing.T) {
}

// Then
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
}
for i := int64(0); i < 10; i++ {
select {
case message := <-consumer.Messages():
Expand Down Expand Up @@ -109,6 +112,9 @@ func TestConsumerOffsetNewest(t *testing.T) {
}

// Then
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
}
assertMessageOffset(t, <-consumer.Messages(), 10)
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo)
Expand Down Expand Up @@ -154,6 +160,9 @@ func TestConsumerOffsetOldest(t *testing.T) {
}

// Then
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
}
assertMessageOffset(t, <-consumer.Messages(), int64(7))
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
Expand Down

0 comments on commit 1c4079e

Please sign in to comment.