Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester: Rate limit max trace logs and drop too many live traces #4418

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [CHANGE] Return 422 for TRACE_TOO_LARGE queries [#4160](https://github.com/grafana/tempo/pull/4160) (@zalegrala)
* [CHANGE] Upgrade OTEL sdk to reduce allocs [#4243](https://github.com/grafana/tempo/pull/4243) (@joe-elliott)
* [CHANGE] Tighten file permissions [#4251](https://github.com/grafana/tempo/pull/4251) (@zalegrala)
* [CHANGE] Drop max live traces log message and rate limit trace too large. [#4418](https://github.com/grafana/tempo/pull/4418) (@joe-elliott)
* [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk)
* [FEATURE] Discarded span logging `log_discarded_spans` [#3957](https://github.com/grafana/tempo/issues/3957) (@dastrobu)
* [FEATURE] TraceQL support for instrumentation scope [#3967](https://github.com/grafana/tempo/pull/3967) (@ie-pham)
Expand Down
40 changes: 20 additions & 20 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/google/uuid"
Expand All @@ -38,19 +39,9 @@ var (
errMaxLiveTraces = errors.New(overrides.ErrorPrefixLiveTracesExceeded)
)

func newTraceTooLargeError(traceID common.ID, instanceID string, maxBytes, reqSize int) error {
level.Warn(log.Logger).Log("msg", fmt.Sprintf("%s: max size of trace (%d) exceeded while adding %d bytes to trace %s for tenant %s",
overrides.ErrorPrefixTraceTooLarge, maxBytes, reqSize, hex.EncodeToString(traceID), instanceID))
return errTraceTooLarge
}

func newMaxLiveTracesError(instanceID string, limit string) error {
level.Warn(log.Logger).Log("msg", fmt.Sprintf("%s: max live traces exceeded for tenant %s: %v", overrides.ErrorPrefixLiveTracesExceeded, instanceID, limit))
return errMaxLiveTraces
}

const (
traceDataType = "trace"
traceDataType = "trace"
maxTraceLogLinesPerSecond = 10
)

var (
Expand Down Expand Up @@ -115,9 +106,14 @@ type instance struct {
localWriter backend.Writer

hash hash.Hash32

logger kitlog.Logger
maxTraceLogger *log.RateLimitedLogger
}

func newInstance(instanceID string, limiter *Limiter, overrides ingesterOverrides, writer tempodb.Writer, l *local.Backend, dedicatedColumns backend.DedicatedColumns) (*instance, error) {
logger := kitlog.With(log.Logger, "tenant", instanceID)

i := &instance{
traces: map[uint32]*liveTrace{},
traceSizes: tracesizes.New(),
Expand All @@ -136,6 +132,9 @@ func newInstance(instanceID string, limiter *Limiter, overrides ingesterOverride
localWriter: backend.NewWriter(l),

hash: fnv.New32(),

logger: logger,
maxTraceLogger: log.NewRateLimitedLogger(maxTraceLogLinesPerSecond, level.Warn(logger)),
}
err := i.resetHeadBlock()
if err != nil {
Expand Down Expand Up @@ -176,7 +175,7 @@ func (i *instance) addTraceError(errorsByTrace []tempopb.PushErrorReason, pushEr
}

// error is not either MaxLiveTraces or TraceTooLarge
level.Error(log.Logger).Log("msg", "Unexpected error during PushBytes", "tenant", i.instanceID, "error", pushError)
level.Error(i.logger).Log("msg", "Unexpected error during PushBytes", "error", pushError)
errorsByTrace = append(errorsByTrace, tempopb.PushErrorReason_UNKNOWN_ERROR)
return errorsByTrace

Expand Down Expand Up @@ -204,14 +203,15 @@ func (i *instance) push(ctx context.Context, id, traceBytes []byte) error {

err := i.limiter.AssertMaxTracesPerUser(i.instanceID, len(i.traces))
if err != nil {
return newMaxLiveTracesError(i.instanceID, err.Error())
return errMaxLiveTraces
}

maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID)
reqSize := len(traceBytes)

if maxBytes > 0 && !i.traceSizes.Allow(id, reqSize, maxBytes) {
return newTraceTooLargeError(id, i.instanceID, maxBytes, reqSize)
i.maxTraceLogger.Log("msg", overrides.ErrorPrefixTraceTooLarge, "max", maxBytes, "size", reqSize, "trace", hex.EncodeToString(id))
return errTraceTooLarge
}

tkn := i.tokenForTraceID(id)
Expand Down Expand Up @@ -529,7 +529,7 @@ func (i *instance) getDedicatedColumns() backend.DedicatedColumns {
if cols := i.overrides.DedicatedColumns(i.instanceID); cols != nil {
err := cols.Validate()
if err != nil {
level.Error(log.Logger).Log("msg", "Unable to apply overrides for dedicated attribute columns. Columns invalid.", "tenant", i.instanceID, "error", err)
level.Error(i.logger).Log("msg", "Unable to apply overrides for dedicated attribute columns. Columns invalid.", "error", err)
return i.dedicatedColumns
}

Expand Down Expand Up @@ -608,14 +608,14 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er
if err != nil {
if errors.Is(err, backend.ErrDoesNotExist) {
// Partial/incomplete block found, remove, it will be recreated from data in the wal.
level.Warn(log.Logger).Log("msg", "Unable to reload meta for local block. This indicates an incomplete block and will be deleted", "tenant", i.instanceID, "block", id.String())
level.Warn(i.logger).Log("msg", "Unable to reload meta for local block. This indicates an incomplete block and will be deleted", "block", id.String())
err = i.local.ClearBlock(id, i.instanceID)
if err != nil {
return nil, fmt.Errorf("deleting bad local block tenant %v block %v: %w", i.instanceID, id.String(), err)
}
} else {
// Block with unknown error
level.Error(log.Logger).Log("msg", "Unexpected error reloading meta for local block. Ignoring and continuing. This block should be investigated.", "tenant", i.instanceID, "block", id.String(), "error", err)
level.Error(i.logger).Log("msg", "Unexpected error reloading meta for local block. Ignoring and continuing. This block should be investigated.", "block", id.String(), "error", err)
metricReplayErrorsTotal.WithLabelValues(i.instanceID).Inc()
}

Expand All @@ -631,7 +631,7 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er
// level corruption
err = b.Validate(ctx)
if err != nil && !errors.Is(err, common.ErrUnsupported) {
level.Error(log.Logger).Log("msg", "local block failed validation, dropping", "tenantID", i.instanceID, "block", id.String(), "error", err)
level.Error(i.logger).Log("msg", "local block failed validation, dropping", "block", id.String(), "error", err)
metricReplayErrorsTotal.WithLabelValues(i.instanceID).Inc()

err = i.local.ClearBlock(id, i.instanceID)
Expand All @@ -645,7 +645,7 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er
ib := NewLocalBlock(ctx, b, i.local)
rediscoveredBlocks = append(rediscoveredBlocks, ib)

level.Info(log.Logger).Log("msg", "reloaded local block", "tenantID", i.instanceID, "block", id.String(), "flushed", ib.FlushedTime())
level.Info(i.logger).Log("msg", "reloaded local block", "block", id.String(), "flushed", ib.FlushedTime())
}

i.blocksMtx.Lock()
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/log/rate_limited_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"

gkLog "github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/time/rate"
)

Expand All @@ -12,6 +14,14 @@ type RateLimitedLogger struct {
logger gkLog.Logger
}

var metricDropedLines = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo",
Name: "dropped_log_lines_total",
Help: "The total number of log lines dropped by the rate limited logger.",
})

// NewRateLimitedLogger returns a new RateLimitedLogger that logs at most logsPerSecond messages per second.
// TODO: migrate to the dskit rate limited logger
func NewRateLimitedLogger(logsPerSecond int, logger gkLog.Logger) *RateLimitedLogger {
return &RateLimitedLogger{
limiter: rate.NewLimiter(rate.Limit(logsPerSecond), 1),
Expand All @@ -21,6 +31,7 @@ func NewRateLimitedLogger(logsPerSecond int, logger gkLog.Logger) *RateLimitedLo

func (l *RateLimitedLogger) Log(keyvals ...interface{}) {
if !l.limiter.AllowN(time.Now(), 1) {
metricDropedLines.Inc()
return
}

Expand Down
Loading