Skip to content

Commit

Permalink
Merge pull request #2575 from IBM/dnwe/mockbroker
Browse files Browse the repository at this point in the history
chore(test): ensure MockBroker closed within test
  • Loading branch information
dnwe authored Aug 8, 2023
2 parents e8808a6 + f4f435c commit 8a09ef3
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 33 deletions.
3 changes: 3 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,7 @@ func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics broke

func BenchmarkBroker_Open(b *testing.B) {
mb := NewMockBroker(nil, 0)
defer mb.Close()
broker := NewBroker(mb.Addr())
// Set the broker id in order to validate local broker metrics
broker.id = 0
Expand All @@ -1422,6 +1423,7 @@ func BenchmarkBroker_Open(b *testing.B) {

func BenchmarkBroker_No_Metrics_Open(b *testing.B) {
mb := NewMockBroker(nil, 0)
defer mb.Close()
broker := NewBroker(mb.Addr())
broker.id = 0
metrics.UseNilMetrics = true
Expand All @@ -1438,6 +1440,7 @@ func BenchmarkBroker_No_Metrics_Open(b *testing.B) {

func Test_handleThrottledResponse(t *testing.T) {
mb := NewMockBroker(nil, 0)
defer mb.Close()
broker := NewBroker(mb.Addr())
broker.id = 0
conf := NewTestConfig()
Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@ func TestInitProducerIDConnectionRefused(t *testing.T) {

func TestMetricsCleanup(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
seedBroker.Returns(new(MetadataResponse))

config := NewTestConfig()
Expand Down
1 change: 1 addition & 0 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestConsumerGroupSessionDoesNotRetryForever(t *testing.T) {
config.Consumer.Group.Rebalance.Retry.Backoff = 0

broker0 := NewMockBroker(t, 0)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
Expand Down
67 changes: 34 additions & 33 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"time"
)

func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {
func initOffsetManagerWithBackoffFunc(
t *testing.T,
retention time.Duration,
backoffFunc func(retries, maxRetries int) time.Duration, config *Config,
) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) {
config.Metadata.Retry.Max = 1
if backoffFunc != nil {
config.Metadata.Retry.BackoffFunc = backoffFunc
Expand Down Expand Up @@ -50,12 +52,14 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
}

func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {
testClient Client, broker, coordinator *MockBroker,
) {
return initOffsetManagerWithBackoffFunc(t, retention, nil, NewTestConfig())
}

func initPartitionOffsetManager(t *testing.T, om OffsetManager,
coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
coordinator *MockBroker, initialOffset int64, metadata string,
) PartitionOffsetManager {
fetchResponse := new(OffsetFetchResponse)
fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
Err: ErrNoError,
Expand Down Expand Up @@ -127,6 +131,8 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
config.Consumer.Offsets.AutoCommit.Enable = tt.enable
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// Wait long enough for the test not to fail..
Expand Down Expand Up @@ -160,9 +166,6 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
}
}

broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
Expand All @@ -177,6 +180,8 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
config.Consumer.Offsets.AutoCommit.Enable = false

om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// Wait long enough for the test not to fail..
Expand Down Expand Up @@ -219,10 +224,6 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
t.Errorf("No request received for after waiting for %v", timeout)
}

// Close up
broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
Expand All @@ -233,6 +234,8 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
// on first fetchInitialOffset call
func TestOffsetManagerFetchInitialFail(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()

// Error on first fetchInitialOffset call
responseBlock := OffsetFetchResponseBlock{
Expand All @@ -247,6 +250,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {

// Refresh coordinator
newCoordinator := NewMockBroker(t, 3)
defer newCoordinator.Close()
broker.Returns(&ConsumerMetadataResponse{
CoordinatorID: newCoordinator.BrokerID(),
CoordinatorHost: "127.0.0.1",
Expand All @@ -265,9 +269,6 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {
t.Error(err)
}

broker.Close()
coordinator.Close()
newCoordinator.Close()
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
Expand All @@ -281,6 +282,8 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
return 0
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewTestConfig())
defer broker.Close()
defer coordinator.Close()

// Error on first fetchInitialOffset call
responseBlock := OffsetFetchResponseBlock{
Expand All @@ -305,8 +308,6 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
t.Error(err)
}

broker.Close()
coordinator.Close()
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
Expand All @@ -318,6 +319,8 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {

func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
testClient.Config().Consumer.Offsets.Initial = OffsetOldest

// Kafka returns -1 if no offset has been stored for this partition yet.
Expand All @@ -333,13 +336,13 @@ func TestPartitionOffsetManagerInitialOffset(t *testing.T) {

safeClose(t, pom)
safeClose(t, om)
broker.Close()
coordinator.Close()
safeClose(t, testClient)
}

func TestPartitionOffsetManagerNextOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")

offset, meta := pom.NextOffset()
Expand All @@ -352,13 +355,13 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {

safeClose(t, pom)
safeClose(t, om)
broker.Close()
coordinator.Close()
safeClose(t, testClient)
}

func TestPartitionOffsetManagerResetOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
Expand All @@ -379,12 +382,12 @@ func TestPartitionOffsetManagerResetOffset(t *testing.T) {
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
Expand Down Expand Up @@ -415,12 +418,12 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
Expand All @@ -440,12 +443,12 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
Expand Down Expand Up @@ -475,12 +478,12 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerCommitErr(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")

// Error on one partition
Expand All @@ -490,6 +493,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
coordinator.Returns(ocResponse)

newCoordinator := NewMockBroker(t, 3)
defer newCoordinator.Close()

// For RefreshCoordinator()
broker.Returns(&ConsumerMetadataResponse{
Expand Down Expand Up @@ -535,16 +539,14 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
t.Error(err)
}

broker.Close()
coordinator.Close()
newCoordinator.Close()
safeClose(t, om)
safeClose(t, testClient)
}

// Test of recovery from abort
func TestAbortPartitionOffsetManager(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")

// this triggers an error in the CommitOffset request,
Expand All @@ -568,6 +570,5 @@ func TestAbortPartitionOffsetManager(t *testing.T) {

safeClose(t, pom)
safeClose(t, om)
broker.Close()
safeClose(t, testClient)
}
19 changes: 19 additions & 0 deletions sarama_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//go:build !functional
// +build !functional

package sarama

import (
"flag"
"log"
"os"
"testing"
)

func TestMain(m *testing.M) {
flag.Parse()
if f := flag.Lookup("test.v"); f != nil && f.Value.String() == "true" {
Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
}
os.Exit(m.Run())
}

0 comments on commit 8a09ef3

Please sign in to comment.