From 1c4079e7e3fcd97a677a83c14d50ef9f80cc4292 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ch=C3=A1bek?= Date: Sun, 5 Dec 2021 12:07:33 +0100 Subject: [PATCH 1/2] Add test for partitionConsumer HWMO before consumption This will fail now, because HWMO is not set correctly --- consumer_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/consumer_test.go b/consumer_test.go index 8d8c33b8c..0f752c4e0 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -57,6 +57,9 @@ func TestConsumerOffsetManual(t *testing.T) { } // Then + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) + } for i := int64(0); i < 10; i++ { select { case message := <-consumer.Messages(): @@ -109,6 +112,9 @@ func TestConsumerOffsetNewest(t *testing.T) { } // Then + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) + } assertMessageOffset(t, <-consumer.Messages(), 10) if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) @@ -154,6 +160,9 @@ func TestConsumerOffsetOldest(t *testing.T) { } // Then + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) + } assertMessageOffset(t, <-consumer.Messages(), int64(7)) if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) From 179dc9d28899a06b7afab22781a3ed3d1a7c1a10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ch=C3=A1bek?= Date: Sun, 5 Dec 2021 12:14:17 +0100 Subject: [PATCH 2/2] Set HWMO during creation of partitionConsumer --- consumer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/consumer.go b/consumer.go index 1cb910deb..87a135072 100644 --- a/consumer.go +++ b/consumer.go @@ -400,6 +400,9 @@ func (child *partitionConsumer) chooseStartingOffset(offset int64) error { if err != nil { return err } + + child.highWaterMarkOffset = newestOffset + oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) if err != nil { return err