From 31a86934aa4064be2b260467d2d23211679d66fc Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Fri, 11 Aug 2023 22:40:51 +0100 Subject: [PATCH 1/2] fix(fvt): disable keepalive on toxiproxy client We don't want to keep the tcp conn alive as we only make very few requests and it'll show up as a leaked goroutine in-between tests. Signed-off-by: Dominic Evans --- internal/toxiproxy/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/toxiproxy/client.go b/internal/toxiproxy/client.go index e916bcbca..a30557048 100644 --- a/internal/toxiproxy/client.go +++ b/internal/toxiproxy/client.go @@ -24,7 +24,8 @@ func NewClient(endpoint string) *Client { KeepAlive: 30 * time.Second, }).DialContext, ForceAttemptHTTP2: true, - MaxIdleConns: 100, + MaxIdleConns: -1, + DisableKeepAlives: true, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, From b0363d1b3c6d13f201df360514bb6fa8357c0f55 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Fri, 11 Aug 2023 22:22:10 +0100 Subject: [PATCH 2/2] fix(test): ensure some more clients are closed Signed-off-by: Dominic Evans --- client_test.go | 3 ++- functional_producer_test.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/client_test.go b/client_test.go index 4bb70a78a..c9d5b56ad 100644 --- a/client_test.go +++ b/client_test.go @@ -13,6 +13,7 @@ import ( ) func safeClose(t testing.TB, c io.Closer) { + t.Helper() err := c.Close() if err != nil { t.Error(err) @@ -51,6 +52,7 @@ func TestCachedPartitions(t *testing.T) { if err != nil { t.Fatal(err) } + defer safeClose(t, c) client := c.(*client) // Verify they aren't cached the same @@ -69,7 +71,6 @@ func TestCachedPartitions(t *testing.T) { } seedBroker.Close() - safeClose(t, client) } func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { diff --git a/functional_producer_test.go b/functional_producer_test.go index 8f7101580..003a48c12 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -587,6 +587,7 @@ func TestFuncTxnAbortedProduce(t *testing.T) { client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) + defer client.Close() consumer, err := NewConsumerFromClient(client) require.NoError(t, err) @@ -745,6 +746,7 @@ func TestInterceptors(t *testing.T) { if err != nil { t.Fatal(err) } + defer safeClose(t, client) initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) if err != nil { @@ -800,7 +802,6 @@ func TestInterceptors(t *testing.T) { } } safeClose(t, consumer) - safeClose(t, client) } func testProducingMessages(t *testing.T, config *Config) { @@ -821,6 +822,7 @@ func testProducingMessages(t *testing.T, config *Config) { if err != nil { t.Fatal(err) } + defer safeClose(t, client) // Keep in mind the current offset initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) @@ -885,7 +887,6 @@ func testProducingMessages(t *testing.T, config *Config) { validateConsumerMetrics(t, client) safeClose(t, consumer) - safeClose(t, client) } // TestAsyncProducerRemoteBrokerClosed ensures that the async producer can