From 6cc1ed47caa52466d0f3d3d011e4a7e39c17a868 Mon Sep 17 00:00:00 2001 From: Hugon Sknadaj Date: Wed, 7 Sep 2022 16:30:54 +0200 Subject: [PATCH 1/2] Support key in MockFetchResponse. --- consumer_test.go | 89 ++++++++++++++++++++++++++++++++++++++++++++++-- mockresponses.go | 31 +++++++++++++---- 2 files changed, 111 insertions(+), 9 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index 68b6398fc..5e02925aa 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1,6 +1,7 @@ package sarama import ( + "bytes" "errors" "log" "os" @@ -13,6 +14,7 @@ import ( ) var testMsg = StringEncoder("Foo") +var testKey = StringEncoder("Bar") // If a particular offset is provided then messages are consumed starting from // that offset. @@ -27,10 +29,10 @@ func TestConsumerOffsetManual(t *testing.T) { mockFetchResponse := NewMockFetchResponse(t, 1) // skipped because parseRecords(): offset < child.offset - mockFetchResponse.SetMessage("my_topic", 0, manualOffset-1, testMsg) + mockFetchResponse.SetMessageWithKey("my_topic", 0, manualOffset-1, testKey, testMsg) for i := int64(0); i < 10; i++ { - mockFetchResponse.SetMessage("my_topic", 0, i+manualOffset, testMsg) + mockFetchResponse.SetMessageWithKey("my_topic", 0, i+manualOffset, testKey, testMsg) } mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest) @@ -78,6 +80,71 @@ func TestConsumerOffsetManual(t *testing.T) { broker0.Close() } +// If a message is given a key, it can be correctly collected while consuming. +func TestConsumerMessageWithKey(t *testing.T) { + // Given + broker0 := NewMockBroker(t, 0) + + manualOffset := int64(1234) + offsetNewest := int64(2345) + offsetNewestAfterFetchRequest := int64(3456) + + mockFetchResponse := NewMockFetchResponse(t, 1) + + // skipped because parseRecords(): offset < child.offset + mockFetchResponse.SetMessageWithKey("my_topic", 0, manualOffset-1, testKey, testMsg) + + for i := int64(0); i < 10; i++ { + mockFetchResponse.SetMessageWithKey("my_topic", 0, i+manualOffset, testKey, testMsg) + } + + mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest) + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 0). + SetOffset("my_topic", 0, OffsetNewest, offsetNewest), + "FetchRequest": mockFetchResponse, + }) + + // When + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + + consumer, err := master.ConsumePartition("my_topic", 0, manualOffset) + if err != nil { + t.Fatal(err) + } + + // Then + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) + } + for i := int64(0); i < 10; i++ { + select { + case message := <-consumer.Messages(): + assertMessageOffset(t, message, i+manualOffset) + assertMessageKey(t, message, testKey) + assertMessageValue(t, message, testMsg) + case err := <-consumer.Errors(): + t.Error(err) + } + } + + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) + } + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() +} + func TestPauseResumeConsumption(t *testing.T) { // Given broker0 := NewMockBroker(t, 0) @@ -1795,6 +1862,24 @@ func TestExcludeUncommitted(t *testing.T) { broker0.Close() } +func assertMessageKey(t *testing.T, msg *ConsumerMessage, expectedKey Encoder) { + t.Helper() + + wantKey, _ := expectedKey.Encode() + if bytes.Compare(msg.Key, wantKey) != 0 { + t.Fatalf("Incorrect key for message. expected=%s, actual=%s", expectedKey, msg.Key) + } +} + +func assertMessageValue(t *testing.T, msg *ConsumerMessage, expectedValue Encoder) { + t.Helper() + + wantValue, _ := expectedValue.Encode() + if bytes.Compare(msg.Value, wantValue) != 0 { + t.Fatalf("Incorrect value for message. expected=%s, actual=%s", expectedValue, msg.Key) + } +} + func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) { t.Helper() if msg.Offset != expectedOffset { diff --git a/mockresponses.go b/mockresponses.go index a9c8519a7..7cfd8094b 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -256,9 +256,22 @@ func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int return offset } +// mockMessage is a message that used to be mocked for `FetchResponse` +type mockMessage struct { + key Encoder + msg Encoder +} + +func newMockMessage(key, msg Encoder) *mockMessage { + return &mockMessage{ + key: key, + msg: msg, + } +} + // MockFetchResponse is a `FetchResponse` builder. type MockFetchResponse struct { - messages map[string]map[int32]map[int64]Encoder + messages map[string]map[int32]map[int64]*mockMessage messagesLock *sync.RWMutex highWaterMarks map[string]map[int32]int64 t TestReporter @@ -267,7 +280,7 @@ type MockFetchResponse struct { func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse { return &MockFetchResponse{ - messages: make(map[string]map[int32]map[int64]Encoder), + messages: make(map[string]map[int32]map[int64]*mockMessage), messagesLock: &sync.RWMutex{}, highWaterMarks: make(map[string]map[int32]int64), t: t, @@ -276,19 +289,23 @@ func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse { } func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse { + return mfr.SetMessageWithKey(topic, partition, offset, nil, msg) +} + +func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse { mfr.messagesLock.Lock() defer mfr.messagesLock.Unlock() partitions := mfr.messages[topic] if partitions == nil { - partitions = make(map[int32]map[int64]Encoder) + partitions = make(map[int32]map[int64]*mockMessage) mfr.messages[topic] = partitions } messages := partitions[partition] if messages == nil { - messages = make(map[int64]Encoder) + messages = make(map[int64]*mockMessage) partitions[partition] = messages } - messages[offset] = msg + messages[offset] = newMockMessage(key, msg) return mfr } @@ -315,7 +332,7 @@ func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader { for i := 0; i < mfr.batchSize && offset < maxOffset; { msg := mfr.getMessage(topic, partition, offset) if msg != nil { - res.AddMessage(topic, partition, nil, msg, offset) + res.AddMessage(topic, partition, msg.key, msg.msg, offset) i++ } offset++ @@ -331,7 +348,7 @@ func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader { return res } -func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder { +func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) *mockMessage { mfr.messagesLock.RLock() defer mfr.messagesLock.RUnlock() partitions := mfr.messages[topic] From 29be3d94b979929e6e696b852c79d33d529bccf4 Mon Sep 17 00:00:00 2001 From: Hugon Sknadaj Date: Wed, 7 Sep 2022 16:39:38 +0200 Subject: [PATCH 2/2] Revert old test changes. --- consumer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index 5e02925aa..1c44cb005 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -29,10 +29,10 @@ func TestConsumerOffsetManual(t *testing.T) { mockFetchResponse := NewMockFetchResponse(t, 1) // skipped because parseRecords(): offset < child.offset - mockFetchResponse.SetMessageWithKey("my_topic", 0, manualOffset-1, testKey, testMsg) + mockFetchResponse.SetMessage("my_topic", 0, manualOffset-1, testMsg) for i := int64(0); i < 10; i++ { - mockFetchResponse.SetMessageWithKey("my_topic", 0, i+manualOffset, testKey, testMsg) + mockFetchResponse.SetMessage("my_topic", 0, i+manualOffset, testMsg) } mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest)