From 3f87d8da2872e7f9d0493ca97f766dca18f9bf66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 24 Aug 2023 14:46:16 +0200 Subject: [PATCH 1/3] Fix data race when using buffer pooling ingester client. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/client/buffering_client.go | 16 ++++++--- pkg/ingester/client/buffering_client_test.go | 35 ++++++++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/client/buffering_client.go b/pkg/ingester/client/buffering_client.go index aa5d10240bf..0a8d077cccf 100644 --- a/pkg/ingester/client/buffering_client.go +++ b/pkg/ingester/client/buffering_client.go @@ -51,10 +51,18 @@ func (c *bufferPoolingIngesterClient) Push(ctx context.Context, in *mimirpb.Writ WriteRequest: in, slabPool: p, } - // We can return all buffers back to slabPool when this method finishes. - defer wr.ReturnBuffersToPool() - - return c.pushRawFn(ctx, c.conn, wr, opts...) + resp, err := c.pushRawFn(ctx, c.conn, wr, opts...) + if err == nil { + // We can return all buffers back to slabPool when push method finishes. + // However we can only do that if we have actually received reply from server. + // It means that our buffers were sent out successfully, and are not used anymore. + // + // If there was an error (eg. context cancellation), our buffers can still be in + // use by gRPC client (eg. enqueued to be sent via network connection), and we can't + // return them to the pool. + wr.ReturnBuffersToPool() + } + return resp, err } type poolKey int diff --git a/pkg/ingester/client/buffering_client_test.go b/pkg/ingester/client/buffering_client_test.go index 311d481ef82..1004f184dca 100644 --- a/pkg/ingester/client/buffering_client_test.go +++ b/pkg/ingester/client/buffering_client_test.go @@ -92,6 +92,41 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) { }) } +func TestWriteRequestBufferingClient_PushWithCancelContext(t *testing.T) { + serv, conn := setupGrpc(t) + + bufferingClient := IngesterClient(newBufferPoolingIngesterClient(NewIngesterClient(conn), conn)) + + var requestsToSend []*mimirpb.WriteRequest + for i := 0; i < 100; i++ { + requestsToSend = append(requestsToSend, createRequest("test", 100+10*i)) + } + + serv.clearRequests() + + pool := &pool2.TrackedPool{Parent: &sync.Pool{}} + slabPool := pool2.NewFastReleasingSlabPool[byte](pool, 512*1024) + + ctx := WithSlabPool(context.Background(), slabPool) + + for _, r := range requestsToSend { + started := make(chan bool) + + // start background goroutine to cancel context. We want to hit the moment after enqueuing data frame, but before it's sent. + cc, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + close(started) + time.Sleep(1 * time.Millisecond) + cancel() + }() + + <-started + + _, _ = bufferingClient.Push(cc, r) + } +} + func TestWriteRequestBufferingClient_Push_WithMultipleMarshalCalls(t *testing.T) { serv, conn := setupGrpc(t) From f85283204961f89b28cbff865fec5e98690f4e8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 24 Aug 2023 14:47:03 +0200 Subject: [PATCH 2/3] CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f000460371..dab52f6788b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,7 @@ * `-.initial-connection-window-size` * [ENHANCEMENT] Query-frontend: added "response_size_bytes" field to "query stats" log. #5196 * [ENHANCEMENT] Querier: Refine error messages for per-tenant query limits, informing the user of the preferred strategy for not hitting the limit, in addition to how they may tweak the limit. #5059 -* [ENHANCEMENT] Distributor: optimize sending of requests to ingesters by reusing memory buffers for marshalling requests. This optimization can be enabled by setting `-distributor.write-requests-buffer-pooling-enabled` to `true`. #5195 #5805 +* [ENHANCEMENT] Distributor: optimize sending of requests to ingesters by reusing memory buffers for marshalling requests. This optimization can be enabled by setting `-distributor.write-requests-buffer-pooling-enabled` to `true`. #5195 #5805 #5830 * [ENHANCEMENT] Querier: add experimental `-querier.minimize-ingester-requests` option to initially query only the minimum set of ingesters required to reach quorum. #5202 #5259 #5263 * [ENHANCEMENT] Querier: improve error message when streaming chunks from ingesters to queriers and a query limit is reached. #5245 * [ENHANCEMENT] Use new data structure for labels, to reduce memory consumption. #3555 #5731 From ff2f174f6ccbd7453bf34a4db14b33bc74f53775 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 24 Aug 2023 14:59:56 +0200 Subject: [PATCH 3/3] Remove unnecessary code. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/client/buffering_client_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/ingester/client/buffering_client_test.go b/pkg/ingester/client/buffering_client_test.go index 1004f184dca..0878f15bd04 100644 --- a/pkg/ingester/client/buffering_client_test.go +++ b/pkg/ingester/client/buffering_client_test.go @@ -93,7 +93,7 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) { } func TestWriteRequestBufferingClient_PushWithCancelContext(t *testing.T) { - serv, conn := setupGrpc(t) + _, conn := setupGrpc(t) bufferingClient := IngesterClient(newBufferPoolingIngesterClient(NewIngesterClient(conn), conn)) @@ -102,8 +102,6 @@ func TestWriteRequestBufferingClient_PushWithCancelContext(t *testing.T) { requestsToSend = append(requestsToSend, createRequest("test", 100+10*i)) } - serv.clearRequests() - pool := &pool2.TrackedPool{Parent: &sync.Pool{}} slabPool := pool2.NewFastReleasingSlabPool[byte](pool, 512*1024)