Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add span on push for distributor and ingester #5319

Merged
merged 4 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [CHANGE] Alertmanager: Validating new fields on the PagerDuty AM config. #5290
* [CHANGE] Ingester: Creating label `native-histogram-sample` on the `cortex_discarded_samples_total` to keep track of discarded native histogram samples. #5289
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
Expand Down
243 changes: 134 additions & 109 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return nil, err
}

span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.Push")
defer span.Finish()

// We will report *this* request in the error too.
inflight := d.inflightPushRequests.Inc()
defer d.inflightPushRequests.Dec()
Expand All @@ -572,9 +575,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
now := time.Now()
d.activeUsers.UpdateUserTimestamp(userID, now)

source := util.GetSourceIPsFromOutgoingCtx(ctx)

var firstPartialErr error
removeReplica := false

numSamples := 0
Expand All @@ -589,16 +589,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
// Count the total number of metadata in.
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))

// A WriteRequest can only contain series or metadata but not both. This might change in the future.
// For each timeseries or samples, we compute a hash to distribute across ingesters;
// check each sample/metadata and discard if outside limits.
validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
validatedMetadata := make([]*cortexpb.MetricMetadata, 0, len(req.Metadata))
metadataKeys := make([]uint32, 0, len(req.Metadata))
seriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
validatedExemplars := 0

// Cache user limit with overrides so we spend less CPU doing locking. See issue #4904
limits := d.limits.GetOverridesForUser(userID)

Expand Down Expand Up @@ -628,6 +618,135 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}
}

// A WriteRequest can only contain series or metadata but not both. This might change in the future.
seriesKeys, validatedTimeseries, validatedSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
if err != nil {
return nil, err
}
metadataKeys, validatedMetadata, firstPartialErr := d.prepareMetadataKeys(req, limits, userID, firstPartialErr)

d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
// Ensure the request slice is reused if there's no series or metadata passing the validation.
cortexpb.ReuseSlice(req.Timeseries)

return &cortexpb.WriteResponse{}, firstPartialErr
}

totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)

validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
d.ingestionRate.Add(int64(totalN))

subRing := d.ingestersRing

// Obtain a subring if required.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize)
}

keys := append(seriesKeys, metadataKeys...)
initialMetadataIndex := len(seriesKeys)

err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID)
if err != nil {
return nil, err
}

return &cortexpb.WriteResponse{}, firstPartialErr
}

func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error {
span, _ := opentracing.StartSpanFromContext(ctx, "doBatch")
defer span.Finish()

// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
// Get any HTTP headers that are supposed to be added to logs and add to localCtx for later use
if headerMap := util_log.HeaderMapFromContext(ctx); headerMap != nil {
localCtx = util_log.ContextWithHeaderMap(localCtx, headerMap)
}
// Get clientIP(s) from Context and add it to localCtx
source := util.GetSourceIPsFromOutgoingCtx(ctx)
localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)

op := ring.WriteNoExtend
if d.cfg.ExtendWrites {
op = ring.Write
}

return ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.InstanceDesc, indexes []int) error {
timeseries := make([]cortexpb.PreallocTimeseries, 0, len(indexes))
var metadata []*cortexpb.MetricMetadata

for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, validatedTimeseries[i])
}
}

return d.send(localCtx, ingester, timeseries, metadata, req.Source)
}, func() {
cortexpb.ReuseSlice(req.Timeseries)
cancel()
})
}

func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *validation.Limits, userID string, firstPartialErr error) ([]uint32, []*cortexpb.MetricMetadata, error) {
validatedMetadata := make([]*cortexpb.MetricMetadata, 0, len(req.Metadata))
metadataKeys := make([]uint32, 0, len(req.Metadata))

for _, m := range req.Metadata {
err := validation.ValidateMetadata(limits, userID, m)

if err != nil {
if firstPartialErr == nil {
firstPartialErr = err
}

continue
}

metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName))
validatedMetadata = append(validatedMetadata, m)
}
return metadataKeys, validatedMetadata, firstPartialErr
}

func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, error, error) {
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys")
defer pSpan.Finish()

// For each timeseries or samples, we compute a hash to distribute across ingesters;
// check each sample/metadata and discard if outside limits.
validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
seriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
validatedExemplars := 0

var firstPartialErr error

latestSampleTimestampMs := int64(0)
defer func() {
// Update this metric even in case of errors.
Expand All @@ -638,7 +757,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

// For each timeseries, compute a hash to distribute across ingesters;
// check each sample and discard if outside limits.

skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
for _, ts := range req.Timeseries {
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
Expand Down Expand Up @@ -690,7 +808,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
// label and dropped labels (if any)
key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return nil, err
return nil, nil, 0, 0, nil, err
}
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)

Expand All @@ -712,100 +830,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
validatedSamples += len(ts.Samples)
validatedExemplars += len(ts.Exemplars)
}

for _, m := range req.Metadata {
err := validation.ValidateMetadata(limits, userID, m)

if err != nil {
if firstPartialErr == nil {
firstPartialErr = err
}

continue
}

metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName))
validatedMetadata = append(validatedMetadata, m)
}

d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
d.receivedExemplars.WithLabelValues(userID).Add((float64(validatedExemplars)))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
// Ensure the request slice is reused if there's no series or metadata passing the validation.
cortexpb.ReuseSlice(req.Timeseries)

return &cortexpb.WriteResponse{}, firstPartialErr
}

totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)

validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
d.ingestionRate.Add(int64(totalN))

subRing := d.ingestersRing

// Obtain a subring if required.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize)
}

// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
// Get any HTTP headers that are supposed to be added to logs and add to localCtx for later use
if headerMap := util_log.HeaderMapFromContext(ctx); headerMap != nil {
localCtx = util_log.ContextWithHeaderMap(localCtx, headerMap)
}
// Get clientIP(s) from Context and add it to localCtx
localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)

keys := append(seriesKeys, metadataKeys...)
initialMetadataIndex := len(seriesKeys)

op := ring.WriteNoExtend
if d.cfg.ExtendWrites {
op = ring.Write
}

err = ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.InstanceDesc, indexes []int) error {
timeseries := make([]cortexpb.PreallocTimeseries, 0, len(indexes))
var metadata []*cortexpb.MetricMetadata

for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, validatedTimeseries[i])
}
}

return d.send(localCtx, ingester, timeseries, metadata, req.Source)
}, func() {
cortexpb.ReuseSlice(req.Timeseries)
cancel()
})

if err != nil {
return nil, err
}
return &cortexpb.WriteResponse{}, firstPartialErr
return seriesKeys, validatedTimeseries, validatedSamples, validatedExemplars, firstPartialErr, nil
}

func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -936,6 +937,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
return nil, err
}

span, ctx := opentracing.StartSpanFromContext(ctx, "Ingester.Push")
defer span.Finish()

// We will report *this* request in the error too.
inflight := i.inflightPushRequests.Inc()
defer i.inflightPushRequests.Dec()
Expand Down
1 change: 1 addition & 0 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb
cleanup()
return fmt.Errorf("DoBatch: InstancesCount <= 0")
}

expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.InstancesCount()
itemTrackers := make([]itemTracker, len(keys))
instances := make(map[string]instance, r.InstancesCount())
Expand Down