Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffer-pooling ingester client #5830

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* `-<prefix>.initial-connection-window-size`
* [ENHANCEMENT] Query-frontend: added "response_size_bytes" field to "query stats" log. #5196
* [ENHANCEMENT] Querier: Refine error messages for per-tenant query limits, informing the user of the preferred strategy for not hitting the limit, in addition to how they may tweak the limit. #5059
* [ENHANCEMENT] Distributor: optimize sending of requests to ingesters by reusing memory buffers for marshalling requests. This optimization can be enabled by setting `-distributor.write-requests-buffer-pooling-enabled` to `true`. #5195 #5805
* [ENHANCEMENT] Distributor: optimize sending of requests to ingesters by reusing memory buffers for marshalling requests. This optimization can be enabled by setting `-distributor.write-requests-buffer-pooling-enabled` to `true`. #5195 #5805 #5830
* [ENHANCEMENT] Querier: add experimental `-querier.minimize-ingester-requests` option to initially query only the minimum set of ingesters required to reach quorum. #5202 #5259 #5263
* [ENHANCEMENT] Querier: improve error message when streaming chunks from ingesters to queriers and a query limit is reached. #5245
* [ENHANCEMENT] Use new data structure for labels, to reduce memory consumption. #3555 #5731
Expand Down
16 changes: 12 additions & 4 deletions pkg/ingester/client/buffering_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,18 @@ func (c *bufferPoolingIngesterClient) Push(ctx context.Context, in *mimirpb.Writ
WriteRequest: in,
slabPool: p,
}
// We can return all buffers back to slabPool when this method finishes.
defer wr.ReturnBuffersToPool()

return c.pushRawFn(ctx, c.conn, wr, opts...)
resp, err := c.pushRawFn(ctx, c.conn, wr, opts...)
if err == nil {
// We can return all buffers back to slabPool when push method finishes.
// However we can only do that if we have actually received reply from server.
// It means that our buffers were sent out successfully, and are not used anymore.
//
// If there was an error (eg. context cancellation), our buffers can still be in
// use by gRPC client (eg. enqueued to be sent via network connection), and we can't
// return them to the pool.
wr.ReturnBuffersToPool()
}
return resp, err
Copy link
Member

@jhalterman jhalterman Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do buffers ever get returned to the pool in the case of an error? If not, is that a problem?

Copy link
Member Author

@pstibrany pstibrany Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they don't. When this optimization (buffering client) is disabled, then there's no buffer reuse at all, so overall this is still an improvement.

Copy link
Member

@jhalterman jhalterman Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If enough buffers are not returned to the pool, could it eventually become empty? Maybe instead of returning a buffer to the pool on error, you can inform the pool that it has an empty slot?

Copy link
Member Author

@pstibrany pstibrany Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pool also starts empty. If an existing buffer cannot be retrieved from the pool, new buffer is allocated, and then put into the pool when not used anymore.

}

type poolKey int
Expand Down
33 changes: 33 additions & 0 deletions pkg/ingester/client/buffering_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,39 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
})
}

func TestWriteRequestBufferingClient_PushWithCancelContext(t *testing.T) {
_, conn := setupGrpc(t)

bufferingClient := IngesterClient(newBufferPoolingIngesterClient(NewIngesterClient(conn), conn))

var requestsToSend []*mimirpb.WriteRequest
for i := 0; i < 100; i++ {
requestsToSend = append(requestsToSend, createRequest("test", 100+10*i))
}

pool := &pool2.TrackedPool{Parent: &sync.Pool{}}
slabPool := pool2.NewFastReleasingSlabPool[byte](pool, 512*1024)

ctx := WithSlabPool(context.Background(), slabPool)

for _, r := range requestsToSend {
started := make(chan bool)

// start background goroutine to cancel context. We want to hit the moment after enqueuing data frame, but before it's sent.
cc, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
close(started)
time.Sleep(1 * time.Millisecond)
cancel()
}()

<-started

_, _ = bufferingClient.Push(cc, r)
}
}

func TestWriteRequestBufferingClient_Push_WithMultipleMarshalCalls(t *testing.T) {
serv, conn := setupGrpc(t)

Expand Down