diff --git a/CHANGELOG.md b/CHANGELOG.md index 34910ebdf43..0e59cb4b6a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ * [ENHANCEMENT] Reduce search data file sizes by optimizing contents [#1165](https://github.com/grafana/tempo/pull/1165) (@mdisibio) * [ENHANCEMENT] Add `tempo_ingester_live_traces` metric [#1170](https://github.com/grafana/tempo/pull/1170) (@mdisibio) * [ENHANCEMENT] Update compactor ring to automatically forget unhealthy entries [#1178](https://github.com/grafana/tempo/pull/1178) (@mdisibio) +* [ENHANCEMENT] Prevent writes to large traces even after flushing to disk [#1199](https://github.com/grafana/tempo/pull/1199) (@mdisibio) * [BUGFIX] Add process name to vulture traces to work around display issues [#1127](https://github.com/grafana/tempo/pull/1127) (@mdisibio) * [BUGFIX] Fixed issue where compaction sometimes dropped spans. [#1130](https://github.com/grafana/tempo/pull/1130) (@joe-elliott) * [BUGFIX] Ensure that the admin client jsonnet has correct S3 bucket property. (@hedss) diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index b1525c55395..f69c024f37c 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -34,6 +34,25 @@ import ( "github.com/grafana/tempo/tempodb/wal" ) +type traceTooLargeError struct { + traceID common.ID + maxBytes, reqSize int +} + +func newTraceTooLargeError(traceID common.ID, maxBytes, reqSize int) *traceTooLargeError { + return &traceTooLargeError{ + traceID: traceID, + maxBytes: maxBytes, + reqSize: reqSize, + } +} + +func (e traceTooLargeError) Error() string { + return fmt.Sprintf( + "%s max size of trace (%d) exceeded while adding %d bytes to trace %s", + overrides.ErrorPrefixTraceTooLarge, e.maxBytes, e.reqSize, hex.EncodeToString(e.traceID)) +} + // Errors returned on Query. var ( ErrTraceMissing = errors.New("Trace missing") @@ -68,9 +87,10 @@ var ( ) type instance struct { - tracesMtx sync.Mutex - traces map[uint32]*trace - traceCount atomic.Int32 + tracesMtx sync.Mutex + traces map[uint32]*trace + largeTraces map[uint32]int // maxBytes that trace exceeded + traceCount atomic.Int32 blocksMtx sync.RWMutex headBlock *wal.AppendBlock @@ -109,6 +129,7 @@ type searchLocalBlockEntry struct { func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l *local.Backend) (*instance, error) { i := &instance{ traces: map[uint32]*trace{}, + largeTraces: map[uint32]int{}, searchAppendBlocks: map[*wal.AppendBlock]*searchStreamingBlockEntry{}, searchCompleteBlocks: map[*wal.LocalBlock]*searchLocalBlockEntry{}, @@ -140,9 +161,6 @@ func (i *instance) Push(ctx context.Context, req *tempopb.PushRequest) error { return status.Errorf(codes.FailedPrecondition, "%s max live traces exceeded for tenant %s: %v", overrides.ErrorPrefixLiveTracesExceeded, i.instanceID, err) } - i.tracesMtx.Lock() - defer i.tracesMtx.Unlock() - id, err := pushRequestTraceID(req) if err != nil { return err @@ -161,8 +179,7 @@ func (i *instance) Push(ctx context.Context, req *tempopb.PushRequest) error { return err } - trace := i.getOrCreateTrace(id) - return trace.Push(ctx, i.instanceID, buffer, nil) + return i.push(ctx, id, buffer, nil) } // PushBytes is used to push an unmarshalled tempopb.Trace to the instance @@ -179,11 +196,29 @@ func (i *instance) PushBytes(ctx context.Context, id []byte, traceBytes []byte, return status.Errorf(codes.FailedPrecondition, "%s max live traces exceeded for tenant %s: %v", overrides.ErrorPrefixLiveTracesExceeded, i.instanceID, err) } + return i.push(ctx, id, traceBytes, searchData) +} + +func (i *instance) push(ctx context.Context, id, traceBytes, searchData []byte) error { i.tracesMtx.Lock() defer i.tracesMtx.Unlock() + tkn := i.tokenForTraceID(id) + + if maxBytes, ok := i.largeTraces[tkn]; ok { + return status.Errorf(codes.FailedPrecondition, (newTraceTooLargeError(id, maxBytes, len(traceBytes)).Error())) + } + trace := i.getOrCreateTrace(id) - return trace.Push(ctx, i.instanceID, traceBytes, searchData) + err := trace.Push(ctx, i.instanceID, traceBytes, searchData) + if err != nil { + if e, ok := err.(*traceTooLargeError); ok { + i.largeTraces[tkn] = trace.maxBytes + return status.Errorf(codes.FailedPrecondition, e.Error()) + } + } + + return err } func (i *instance) measureReceivedBytes(traceBytes []byte, searchData []byte) { @@ -486,6 +521,12 @@ func (i *instance) tokenForTraceID(id []byte) uint32 { // resetHeadBlock() should be called under lock func (i *instance) resetHeadBlock() error { + + // Clear large traces when cutting block + i.tracesMtx.Lock() + i.largeTraces = map[uint32]int{} + i.tracesMtx.Unlock() + oldHeadBlock := i.headBlock var err error newHeadBlock, err := i.writer.WAL().NewBlock(uuid.New(), i.instanceID, model.CurrentEncoding) diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 15b54bc1e0e..34db6377d82 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -25,6 +25,8 @@ import ( "github.com/grafana/tempo/tempodb/wal" ) +const testTenantID = "fake" + type ringCountMock struct { count int } @@ -45,7 +47,7 @@ func TestInstance(t *testing.T) { ingester, _, _ := defaultIngester(t, tempDir) request := test.MakeRequest(10, []byte{}) - i, err := newInstance("fake", limiter, ingester.store, ingester.local) + i, err := newInstance(testTenantID, limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") err = i.Push(context.Background(), request) assert.NoError(t, err) @@ -94,7 +96,7 @@ func TestInstanceFind(t *testing.T) { defer os.RemoveAll(tempDir) ingester, _, _ := defaultIngester(t, tempDir) - i, err := newInstance("fake", limiter, ingester.store, ingester.local) + i, err := newInstance(testTenantID, limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") numTraces := 500 @@ -177,7 +179,7 @@ func TestInstanceDoesNotRace(t *testing.T) { ingester, _, _ := defaultIngester(t, tempDir) - i, err := newInstance("fake", limiter, ingester.store, ingester.local) + i, err := newInstance(testTenantID, limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") end := make(chan struct{}) @@ -321,7 +323,7 @@ func TestInstanceLimits(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - i, err := newInstance("fake", limiter, ingester.store, ingester.local) + i, err := newInstance(testTenantID, limiter, ingester.store, ingester.local) require.NoError(t, err, "unexpected error creating new instance") for j, push := range tt.pushes { @@ -501,8 +503,6 @@ func TestInstanceCutBlockIfReady(t *testing.T) { } func TestInstanceMetrics(t *testing.T) { - tenantID := "fake" - limits, err := overrides.NewOverrides(overrides.Limits{}) assert.NoError(t, err, "unexpected error creating limits") limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) @@ -513,14 +513,14 @@ func TestInstanceMetrics(t *testing.T) { ingester, _, _ := defaultIngester(t, tempDir) - i, err := newInstance(tenantID, limiter, ingester.store, ingester.local) + i, err := newInstance(testTenantID, limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") cutAndVerify := func(v int) { err := i.CutCompleteTraces(0, true) require.NoError(t, err) - liveTraces, err := test.GetGaugeVecValue(metricLiveTraces, tenantID) + liveTraces, err := test.GetGaugeVecValue(metricLiveTraces, testTenantID) require.NoError(t, err) require.Equal(t, v, int(liveTraces)) } @@ -539,6 +539,51 @@ func TestInstanceMetrics(t *testing.T) { cutAndVerify(0) } +func TestInstanceFailsLargeTracesEvenAfterFlushing(t *testing.T) { + ctx := context.Background() + maxTraceBytes := 100 + id := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + + tempDir, err := os.MkdirTemp("/tmp", "") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + ingester, _, _ := defaultIngester(t, tempDir) + + limits, err := overrides.NewOverrides(overrides.Limits{ + MaxBytesPerTrace: maxTraceBytes, + }) + require.NoError(t, err) + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + + i, err := newInstance(testTenantID, limiter, ingester.store, ingester.local) + require.NoError(t, err) + + pushFn := func(byteCount int) error { + return i.PushBytes(ctx, id, make([]byte, byteCount), nil) + } + + // Fill up trace to max + err = pushFn(maxTraceBytes) + require.NoError(t, err) + + // Pushing again fails + err = pushFn(3) + require.Contains(t, err.Error(), (newTraceTooLargeError(id, maxTraceBytes, 3)).Error()) + + // Pushing still fails after flush + err = i.CutCompleteTraces(0, true) + require.NoError(t, err) + err = pushFn(5) + require.Contains(t, err.Error(), (newTraceTooLargeError(id, maxTraceBytes, 5)).Error()) + + // Cut block and then pushing works again + _, err = i.CutBlockIfReady(0, 0, true) + require.NoError(t, err) + err = pushFn(maxTraceBytes) + require.NoError(t, err) +} + func defaultInstance(t require.TestingT, tmpDir string) *instance { limits, err := overrides.NewOverrides(overrides.Limits{}) assert.NoError(t, err, "unexpected error creating limits") @@ -570,7 +615,7 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance { }, log.NewNopLogger()) assert.NoError(t, err, "unexpected error creating store") - instance, err := newInstance("fake", limiter, s, l) + instance, err := newInstance(testTenantID, limiter, s, l) assert.NoError(t, err, "unexpected error creating new instance") return instance diff --git a/modules/ingester/trace.go b/modules/ingester/trace.go index 63918187f78..49d9392eb74 100644 --- a/modules/ingester/trace.go +++ b/modules/ingester/trace.go @@ -2,17 +2,13 @@ package ingester import ( "context" - "encoding/hex" "time" cortex_util "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" - "github.com/gogo/status" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "google.golang.org/grpc/codes" - "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/tempopb" ) @@ -54,7 +50,7 @@ func (t *trace) Push(_ context.Context, instanceID string, trace []byte, searchD if t.maxBytes != 0 { reqSize := len(trace) if t.currentBytes+reqSize > t.maxBytes { - return status.Errorf(codes.FailedPrecondition, "%s max size of trace (%d) exceeded while adding %d bytes to trace %s", overrides.ErrorPrefixTraceTooLarge, t.maxBytes, reqSize, hex.EncodeToString(t.traceID)) + return newTraceTooLargeError(t.traceID, t.maxBytes, reqSize) } t.currentBytes += reqSize