diff --git a/client_test.go b/client_test.go index 4bb70a78a0..c9d5b56ada 100644 --- a/client_test.go +++ b/client_test.go @@ -13,6 +13,7 @@ import ( ) func safeClose(t testing.TB, c io.Closer) { + t.Helper() err := c.Close() if err != nil { t.Error(err) @@ -51,6 +52,7 @@ func TestCachedPartitions(t *testing.T) { if err != nil { t.Fatal(err) } + defer safeClose(t, c) client := c.(*client) // Verify they aren't cached the same @@ -69,7 +71,6 @@ func TestCachedPartitions(t *testing.T) { } seedBroker.Close() - safeClose(t, client) } func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { diff --git a/functional_producer_test.go b/functional_producer_test.go index 8f7101580f..541b5642df 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -587,6 +587,7 @@ func TestFuncTxnAbortedProduce(t *testing.T) { client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) + defer client.Close() consumer, err := NewConsumerFromClient(client) require.NoError(t, err) @@ -745,6 +746,7 @@ func TestInterceptors(t *testing.T) { if err != nil { t.Fatal(err) } + defer safeClose(t, client) initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) if err != nil { @@ -821,6 +823,7 @@ func testProducingMessages(t *testing.T, config *Config) { if err != nil { t.Fatal(err) } + defer safeClose(t, client) // Keep in mind the current offset initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) @@ -885,7 +888,6 @@ func testProducingMessages(t *testing.T, config *Config) { validateConsumerMetrics(t, client) safeClose(t, consumer) - safeClose(t, client) } // TestAsyncProducerRemoteBrokerClosed ensures that the async producer can