Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool buffers when sending write request #5195

Merged
merged 19 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* `-<prefix>.initial-connection-window-size`
* [ENHANCEMENT] Query-frontend: added "response_size_bytes" field to "query stats" log. #5196
* [ENHANCEMENT] Querier: Refine error messages for per-tenant query limits, informing the user of the preferred strategy for not hitting the limit, in addition to how they may tweak the limit. #5059
* [ENHANCEMENT] Distributor: optimize sending of requests to ingesters by reusing memory buffers for marshalling requests. For now this optimization can be disabled by setting `-distributor.write-requests-buffer-pooling-enabled` to `false`. #5195
* [BUGFIX] Querier: don't leak memory when processing query requests from query-frontends (ie. when the query-scheduler is disabled). #5199

### Mixin
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "write_requests_buffer_pooling_enabled",
"required": false,
"desc": "Enable pooling of buffers used for marshaling write requests.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "distributor.write-requests-buffer-pooling-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,8 @@ Usage of ./cmd/mimir/mimir:
The prefix for the keys in the store. Should end with a /. (default "collectors/")
-distributor.ring.store string
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-distributor.write-requests-buffer-pooling-enabled
[experimental] Enable pooling of buffers used for marshaling write requests. (default true)
-enable-go-runtime-metrics
Set to true to enable all Go runtime metrics, such as go_sched_* and go_memstats_*.
-flusher.exit-after-flush
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ The following features are currently experimental:
- Per-tenant Results cache TTL (`-query-frontend.results-cache-ttl`, `-query-frontend.results-cache-ttl-for-out-of-order-time-window`)
- Fetching TLS secrets from Vault for various clients (`-vault.enabled`)
- Timeseries Unmarshal caching optimization in distributor (`-timeseries-unmarshal-caching-optimization-enabled`)
- Reusing buffers for marshalling write requests in distributors (`-distributor.write-requests-buffer-pooling-enabled`)

## Deprecated features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,10 @@ instance_limits:
# per-tenant. Additional requests will be rejected. 0 = unlimited.
# CLI flag: -distributor.instance-limits.max-inflight-push-requests-bytes
[max_inflight_push_requests_bytes: <int> | default = 0]

# (experimental) Enable pooling of buffers used for marshaling write requests.
# CLI flag: -distributor.write-requests-buffer-pooling-enabled
[write_requests_buffer_pooling_enabled: <boolean> | default = true]
```

### ingester
Expand Down
20 changes: 18 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
util_math "github.com/grafana/mimir/pkg/util/math"
pool2 "github.com/grafana/mimir/pkg/util/pool"
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
"github.com/grafana/mimir/pkg/util/push"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand All @@ -64,10 +65,12 @@ const (

// metaLabelTenantID is the name of the metric_relabel_configs label with tenant ID.
metaLabelTenantID = model.MetaLabelPrefix + "tenant_id"
)

const (
instanceIngestionRateTickInterval = time.Second

// Size of "slab" when using pooled buffers for marshaling write requests. When handling single Push request
// buffers for multiple write requests sent to ingesters will be allocated from single "slab", if there is enough space.
writeRequestSlabPoolSize = 512 * 1024
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify how this number was picked?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked average message size received by ingester Push handlers using this query:

sum by (namespace) (increase(cortex_request_message_bytes_sum{route="/cortex.Ingester/Push"}[$__rate_interval]))
/
sum by (namespace) (increase(cortex_request_message_bytes_count{route="/cortex.Ingester/Push"}[$__rate_interval]))
Screenshot 2023-06-08 at 13 46 52

We see wildly different numbers across our environments. I picked a number that's big enough to handle big message size (for environment where average is 300 KB), but also provides enough space to keep many smaller messages.

)

// Distributor forwards appends and queries to individual ingesters.
Expand Down Expand Up @@ -133,6 +136,9 @@ type Distributor struct {
metadataValidationMetrics *validation.MetadataValidationMetrics

PushWithMiddlewares push.Func

// Pool of []byte used when marshalling write requests.
writeRequestBytePool sync.Pool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] We typically have pools as global vars. Any pro of having it on the Distributor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why it would need to be a global variable, when we have a logical place to put it. I understand that's not always the case, but I think it fits into distributor nicely.

}

// Config contains the configuration required to
Expand Down Expand Up @@ -168,6 +174,8 @@ type Config struct {
// and access the deserialized write requests before/after they are pushed.
// These functions will only receive samples that don't get dropped by HA deduplication.
PushWrappers []PushWrapper `yaml:"-"`

WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand All @@ -181,6 +189,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
pstibrany marked this conversation as resolved.
Show resolved Hide resolved

cfg.DefaultLimits.RegisterFlags(f)
}
Expand All @@ -204,6 +213,7 @@ const (
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) {
clientConfig.WriteRequestsBufferPoolingEnabled = cfg.WriteRequestsBufferPoolingEnabled
return ingester_client.MakeIngesterClient(addr, clientConfig)
}
}
Expand Down Expand Up @@ -1121,6 +1131,11 @@ func (d *Distributor) push(ctx context.Context, pushReq *push.Request) (*mimirpb
// so set this flag false and pass cleanup() to DoBatch.
cleanupInDefer = false

if d.cfg.WriteRequestsBufferPoolingEnabled {
slabPool := pool2.NewFastReleasingSlabPool[byte](&d.writeRequestBytePool, writeRequestSlabPoolSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the fast releasing slab pool if you create a new slab pool for each request? Couldn't just use the "base" one and release all slabs in the cleanup function passed to ring.DoBatch()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I can see fast releasing slab is an optimization because slabs may be released sooner, but can you confirm it's not strictly required and the base one could be used as well? I just want to make sure I understand rationale.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see fast releasing slab is an optimization because slabs may be released sooner, but can you confirm it's not strictly required and the base one could be used as well? I just want to make sure I understand rationale.

Your understanding is correct. It's an optimization to release slabs sooner.

I think this may be actually interesting optimization in environments where average size of write request to ingester is high, and we're also sending requests to many ingesters at once.

localCtx = ingester_client.WithSlabPool(localCtx, slabPool)
}

err = ring.DoBatch(ctx, ring.WriteNoExtend, subRing, keys, func(ingester ring.InstanceDesc, indexes []int) error {
var timeseriesCount, metadataCount int
for _, i := range indexes {
Expand Down Expand Up @@ -1203,6 +1218,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
Metadata: metadata,
Source: source,
}

_, err = c.Push(ctx, &req)
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok {
// Wrap HTTP gRPC error with more explanatory message.
Expand Down
111 changes: 111 additions & 0 deletions pkg/ingester/client/buffering_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package client

import (
"context"

"google.golang.org/grpc"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/pool"
)

// This is a copy of (*ingesterClient).Push method, but accepting any message type.
func pushRaw(ctx context.Context, conn *grpc.ClientConn, msg interface{}, opts ...grpc.CallOption) (*mimirpb.WriteResponse, error) {
out := new(mimirpb.WriteResponse)
err := conn.Invoke(ctx, "/cortex.Ingester/Push", msg, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// bufferPoolingIngesterClient implements IngesterClient, but overrides Push method to add pooling of buffers used to marshal write requests.
type bufferPoolingIngesterClient struct {
IngesterClient

conn *grpc.ClientConn

// This refers to pushRaw function, but is overridden in the benchmark to avoid doing actual grpc calls.
pushRawFn func(ctx context.Context, conn *grpc.ClientConn, msg interface{}, opts ...grpc.CallOption) (*mimirpb.WriteResponse, error)
}

func newBufferPoolingIngesterClient(client IngesterClient, conn *grpc.ClientConn) *bufferPoolingIngesterClient {
c := &bufferPoolingIngesterClient{
IngesterClient: client,
conn: conn,
pushRawFn: pushRaw,
}
return c
}

// Push wraps WriteRequest to implement buffer pooling.
func (c *bufferPoolingIngesterClient) Push(ctx context.Context, in *mimirpb.WriteRequest, opts ...grpc.CallOption) (*mimirpb.WriteResponse, error) {
p := getPool(ctx)
if p == nil {
return c.IngesterClient.Push(ctx, in, opts...)
}

wr := &wrappedRequest{
WriteRequest: in,
slabPool: p,
}
// We can return all buffers back to slabPool when this method finishes.
defer wr.ReturnBuffersToPool()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hard question is: is this safe or could the message buffer be retained by gRPC for longer? Can you share more details about the arguments on "this is safe to do". I still struggle a lot about this, because each time one of us look at it comes to a slightly different conclusion.

My question is not just about gRPC itself (for which you have added a nice test) but the chain of middlewares we have. Our middlewares shouldn't retain it, but logging may be async so worth to double check it.

Copy link
Member Author

@pstibrany pstibrany Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to realize is that we're doing this buffering on the client side. "gRPC" that we talk about here is grpc connection to ingester.

Re middlewares... We setup our grpc connection with following interceptors:

Looking at implementation of these interceptors...

  • OpenTracingClientInterceptor has ability to 1) pass request to span inclusion function,, 2) log the payload (request) and 3) decorate the span using the request. Note that we don't configure OpenTracingClientInterceptor with any of these options.
  • middleware.ClientUserHeaderInterceptor, middleware.UnaryClientInstrumentInterceptor, BackoffRetry and RateLimiter interceptors don't use request in any way (apart from passing it further)

Copy link
Member Author

@pstibrany pstibrany Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More places where our buffer could be passed around:

Let's start with binlogs. This is functionality configured by env variable GRPC_BINARY_LOG_FILTER. When configured, gRPC logs each RPC in the format defined by proto GrpcLogEntry. Design for this feature is at https://github.com/grpc/proposal/blob/master/A16-binary-logging.md.

When used, request is passed to logging method inside binarylog.ClientMessage struct. This struct has toProto() which will call another Marhal on our message. Once binary.ClientMessage is converted to binlogpb.GrpcLogEntry (now it contains our buffer from the pool), it's forwarded to the "sink" for writing.

There are three sink implementations in grpc: bufferedSink, noopSink and writerSink.

writerSink marshals *binlogpb.GrpcLogEntry and writes it to some writer. bufferedSink simply wraps writerSink but gives it a buffer to write output to, and also runs a goroutine to periodically flush the buffer. Important point is that it doesn't keep *binlogpb.GrpcLogEntry around, but marshals it immediately.

My conclusion is: using binary logging feature is safe when using our message buffering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statsHandlers for the grpc connection are configured via dial options. We don't use that feature in our code.

It's possible that I've missed something, but it seems to me that it's safe to reuse the buffer used to Marshal as soon as (*grpc.ClientConn).Invoke returns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this analysis Peter. This is too valuable to get it lost in a GitHub comment. WDYT to move this to a docs/contributing/... page, where we can keep contributing the more we learn about it in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've copied the notes above into doc: #5216

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above analysis considered "happy path" only -- message gets sent to server, server replies. However in case of errors, messages can still be queued locally and sent later. #5830 fixes this.


return c.pushRawFn(ctx, c.conn, wr, opts...)
}

type poolKey int

var poolKeyValue poolKey = 1

func WithSlabPool(ctx context.Context, pool *pool.FastReleasingSlabPool[byte]) context.Context {
if pool != nil {
return context.WithValue(ctx, poolKeyValue, pool)
}
return ctx
}

func getPool(ctx context.Context) *pool.FastReleasingSlabPool[byte] {
v := ctx.Value(poolKeyValue)
if p, ok := v.(*pool.FastReleasingSlabPool[byte]); ok {
return p
}
return nil
}

type wrappedRequest struct {
*mimirpb.WriteRequest

slabPool *pool.FastReleasingSlabPool[byte]
slabID int
moreSlabIDs []int // Used in case when Marshal gets called multiple times.
}

func (w *wrappedRequest) Marshal() ([]byte, error) {
size := w.WriteRequest.Size()
buf, slabID := w.slabPool.Get(size)

if w.slabID == 0 {
w.slabID = slabID
} else {
w.moreSlabIDs = append(w.moreSlabIDs, slabID)
}

n, err := w.WriteRequest.MarshalToSizedBuffer(buf[:size])
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
return buf[:n], nil
}

func (w *wrappedRequest) ReturnBuffersToPool() {
if w.slabID != 0 {
w.slabPool.Release(w.slabID)
w.slabID = 0
}
for _, s := range w.moreSlabIDs {
w.slabPool.Release(s)
}
w.moreSlabIDs = nil
}
Loading