-
Notifications
You must be signed in to change notification settings - Fork 550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pool buffers when sending write request #5195
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the flag should be renamed (with "requests") to correspond to the YAML parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, but please see questions/suggestions :)
Thanks Arve, I've fixed problems raised in your comments. |
Thank you @pstibrany, I will look again :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just leaving some suggestions for readability.
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
…or tiny. Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
7d8cb30
to
27a4fe4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! I left few minor comments. My major comment is that I strongly recommend to keep this experimental feature disabled for now. Contrary to other lower risk optimizations, I want to rollout this optimization more slowly and don't want to enable it by default just yet.
|
||
// Size of "slab" when using pooled buffers for marshaling write requests. When handling single Push request | ||
// buffers for multiple write requests sent to ingesters will be allocated from single "slab", if there is enough space. | ||
writeRequestSlabPoolSize = 512 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify how this number was picked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked average message size received by ingester Push handlers using this query:
sum by (namespace) (increase(cortex_request_message_bytes_sum{route="/cortex.Ingester/Push"}[$__rate_interval]))
/
sum by (namespace) (increase(cortex_request_message_bytes_count{route="/cortex.Ingester/Push"}[$__rate_interval]))
![Screenshot 2023-06-08 at 13 46 52](https://private-user-images.githubusercontent.com/895919/244372388-dc4d02f2-dd28-411b-8347-76e630aa6d95.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkzNTk0MTYsIm5iZiI6MTczOTM1OTExNiwicGF0aCI6Ii84OTU5MTkvMjQ0MzcyMzg4LWRjNGQwMmYyLWRkMjgtNDExYi04MzQ3LTc2ZTYzMGFhNmQ5NS5wbmc_WC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1BS0lBVkNPRFlMU0E1M1BRSzRaQSUyRjIwMjUwMjEyJTJGdXMtZWFzdC0xJTJGczMlMkZhd3M0X3JlcXVlc3QmWC1BbXotRGF0ZT0yMDI1MDIxMlQxMTE4MzZaJlgtQW16LUV4cGlyZXM9MzAwJlgtQW16LVNpZ25hdHVyZT05MzIzZWUwNWI4YzkxNTM2YjQ1ZjQ4ZDI2ZThiNjViMWIzZmM5MzVlMThlOTgzOGQ5NzdhNzVmY2U0ODBlMzNiJlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCJ9.6kOBAjZtA3aUUaJ-CzERgM6rCig8nyZ13dBsuLaEeXc)
We see wildly different numbers across our environments. I picked a number that's big enough to handle big message size (for environment where average is 300 KB), but also provides enough space to keep many smaller messages.
@@ -133,6 +136,9 @@ type Distributor struct { | |||
metadataValidationMetrics *validation.MetadataValidationMetrics | |||
|
|||
PushWithMiddlewares push.Func | |||
|
|||
// Pool of []byte used when marshalling write requests. | |||
writeRequestBytePool sync.Pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] We typically have pools as global vars. Any pro of having it on the Distributor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why it would need to be a global variable, when we have a logical place to put it. I understand that's not always the case, but I think it fits into distributor nicely.
pkg/distributor/distributor.go
Outdated
@@ -1121,6 +1131,11 @@ func (d *Distributor) push(ctx context.Context, pushReq *push.Request) (*mimirpb | |||
// so set this flag false and pass cleanup() to DoBatch. | |||
cleanupInDefer = false | |||
|
|||
if d.cfg.WriteRequestsBufferPoolingEnabled { | |||
slabPool := pool2.NewFastReleasingSlabPool[byte](&d.writeRequestBytePool, writeRequestSlabPoolSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need the fast releasing slab pool if you create a new slab pool for each request? Couldn't just use the "base" one and release all slabs in the cleanup function passed to ring.DoBatch()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, I can see fast releasing slab is an optimization because slabs may be released sooner, but can you confirm it's not strictly required and the base one could be used as well? I just want to make sure I understand rationale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see fast releasing slab is an optimization because slabs may be released sooner, but can you confirm it's not strictly required and the base one could be used as well? I just want to make sure I understand rationale.
Your understanding is correct. It's an optimization to release slabs sooner.
I think this may be actually interesting optimization in environments where average size of write request to ingester is high, and we're also sending requests to many ingesters at once.
slabPool: p, | ||
} | ||
// We can return all buffers back to slabPool when this method finishes. | ||
defer wr.ReturnBuffersToPool() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hard question is: is this safe or could the message buffer be retained by gRPC for longer? Can you share more details about the arguments on "this is safe to do". I still struggle a lot about this, because each time one of us look at it comes to a slightly different conclusion.
My question is not just about gRPC itself (for which you have added a nice test) but the chain of middlewares we have. Our middlewares shouldn't retain it, but logging may be async so worth to double check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to realize is that we're doing this buffering on the client side. "gRPC" that we talk about here is grpc connection to ingester.
Re middlewares... We setup our grpc connection with following interceptors:
- grpcclient.Instrument, which in turns adds:
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())
middleware.ClientUserHeaderInterceptor
middleware.UnaryClientInstrumentInterceptor(requestDuration)
- Interceptors added by
(*Config).DialOption
:
Looking at implementation of these interceptors...
OpenTracingClientInterceptor
has ability to 1) pass request to span inclusion function,, 2) log the payload (request) and 3) decorate the span using the request. Note that we don't configureOpenTracingClientInterceptor
with any of these options.middleware.ClientUserHeaderInterceptor
,middleware.UnaryClientInstrumentInterceptor
,BackoffRetry
andRateLimiter
interceptors don't use request in any way (apart from passing it further)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More places where our buffer could be passed around:
Let's start with binlogs. This is functionality configured by env variable GRPC_BINARY_LOG_FILTER
. When configured, gRPC logs each RPC in the format defined by proto GrpcLogEntry. Design for this feature is at https://github.com/grpc/proposal/blob/master/A16-binary-logging.md.
When used, request is passed to logging method inside binarylog.ClientMessage
struct. This struct has toProto()
which will call another Marhal
on our message. Once binary.ClientMessage
is converted to binlogpb.GrpcLogEntry
(now it contains our buffer from the pool), it's forwarded to the "sink" for writing.
There are three sink
implementations in grpc: bufferedSink
, noopSink
and writerSink
.
writerSink
marshals *binlogpb.GrpcLogEntry
and writes it to some writer. bufferedSink
simply wraps writerSink
but gives it a buffer to write output to, and also runs a goroutine to periodically flush the buffer. Important point is that it doesn't keep *binlogpb.GrpcLogEntry
around, but marshals it immediately.
My conclusion is: using binary logging feature is safe when using our message buffering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statsHandlers
for the grpc connection are configured via dial options. We don't use that feature in our code.
It's possible that I've missed something, but it seems to me that it's safe to reuse the buffer used to Marshal as soon as (*grpc.ClientConn).Invoke
returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this analysis Peter. This is too valuable to get it lost in a GitHub comment. WDYT to move this to a docs/contributing/...
page, where we can keep contributing the more we learn about it in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've copied the notes above into doc: #5216
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above analysis considered "happy path" only -- message gets sent to server, server replies. However in case of errors, messages can still be queued locally and sent later. #5830 fixes this.
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
…5216) * Copy notes from #5195 (comment) into a doc. Signed-off-by: Peter Štibraný <pstibrany@gmail.com> * make doc Signed-off-by: Peter Štibraný <pstibrany@gmail.com> --------- Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
What this PR does
When distributor receives a write request, distributor splits it into multiple smaller requests, and forward each of them to single ingester. In this step distributor needs to marshal protobuf messages. This PR adds pooling of buffers when doing this marshalling.
The actual pooling logic is implemented in a wrapper to
IngesterClient
(grpc interface for communicating with ingester).Our benchmark in dev environment show about 7% reduction of CPU time with this change.
Result of benchmark in this PR shows reduction of allocations per
Push
call (for purpose of this benchmark,Push
only calls messageMarshal
):This PR also adds "fast releasing slab pool". Compared to existing "slab pool" implementation that already exists in Mimir codebase, "fast releasing" one can release individual slabs (pooled object) as soon as they are not used, instead of waiting for final
Release
call on the slab pool.Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]