diff --git a/CHANGELOG.md b/CHANGELOG.md index 33cd0f54285..5425a82c494 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [ENHANCEMENT] Add docker-compose example for GCS along with new backend options [#397](https://github.com/grafana/tempo/pull/397) * [ENHANCEMENT] tempo-cli list blocks usability improvements [#403](https://github.com/grafana/tempo/pull/403) * [ENHANCEMENT] Add Query Frontend module to allow scaling the query path [#400](https://github.com/grafana/tempo/pull/400) +* [ENHANCEMENT] Reduce active traces locking time. [#449](https://github.com/grafana/tempo/pull/449) * [ENHANCEMENT] Added `tempo_distributor_bytes_received_total` as a per tenant counter of uncompressed bytes received. [#453](https://github.com/grafana/tempo/pull/453) * [BUGFIX] Compactor without GCS permissions fail silently [#379](https://github.com/grafana/tempo/issues/379) * [BUGFIX] Prevent race conditions between querier polling and ingesters clearing complete blocks [#421](https://github.com/grafana/tempo/issues/421) diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 094859bfe5f..b1a2b1d2252 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -113,26 +113,21 @@ func (i *instance) PushBytes(ctx context.Context, id tempodb_encoding.ID, object // Moves any complete traces out of the map to complete traces func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error { - i.tracesMtx.Lock() - defer i.tracesMtx.Unlock() + tracesToCut := i.tracesToCut(cutoff, immediate) i.blocksMtx.Lock() defer i.blocksMtx.Unlock() - now := time.Now() - for key, trace := range i.traces { - if now.Add(cutoff).After(trace.lastAppend) || immediate { - out, err := proto.Marshal(trace.trace) - if err != nil { - return err - } - err = i.headBlock.Write(trace.traceID, out) - if err != nil { - return err - } - i.bytesWrittenTotal.Add(float64(len(out))) - delete(i.traces, key) + for _, t := range tracesToCut { + out, err := proto.Marshal(t.trace) + if err != nil { + return err + } + err = i.headBlock.Write(t.traceID, out) + if err != nil { + return err } + i.bytesWrittenTotal.Add(float64(len(out))) } return nil @@ -277,6 +272,8 @@ func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) { return nil, nil } +// getOrCreateTrace will return a new trace object for the given request +// It must be called under the i.tracesMtx lock func (i *instance) getOrCreateTrace(req *tempopb.PushRequest) (*trace, error) { traceID, err := pushRequestTraceID(req) if err != nil { @@ -317,6 +314,23 @@ func (i *instance) resetHeadBlock() error { return err } +func (i *instance) tracesToCut(cutoff time.Duration, immediate bool) []*trace { + i.tracesMtx.Lock() + defer i.tracesMtx.Unlock() + + cutoffTime := time.Now().Add(cutoff) + tracesToCut := make([]*trace, 0, len(i.traces)) + + for key, trace := range i.traces { + if cutoffTime.After(trace.lastAppend) || immediate { + tracesToCut = append(tracesToCut, trace) + delete(i.traces, key) + } + } + + return tracesToCut +} + func (i *instance) Combine(objA []byte, objB []byte) []byte { return util.CombineTraces(objA, objB) } diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index a52e8713c1b..91de03d7e39 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "io/ioutil" + "math/rand" "os" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/grafana/tempo/tempodb/wal" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type ringCountMock struct { @@ -255,6 +257,90 @@ func TestInstanceLimits(t *testing.T) { } } +func TestInstanceCutCompleteTraces(t *testing.T) { + tempDir, _ := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + + id := make([]byte, 16) + rand.Read(id) + tracepb := test.MakeTrace(10, id) + pastTrace := &trace{ + traceID: id, + trace: tracepb, + lastAppend: time.Now().Add(-time.Hour), + } + + id = make([]byte, 16) + rand.Read(id) + nowTrace := &trace{ + traceID: id, + trace: tracepb, + lastAppend: time.Now().Add(time.Hour), + } + + tt := []struct { + name string + cutoff time.Duration + immediate bool + input []*trace + expectedExist []*trace + expectedNotExist []*trace + }{ + { + name: "empty", + cutoff: 0, + immediate: false, + }, + { + name: "cut immediate", + cutoff: 0, + immediate: true, + input: []*trace{pastTrace, nowTrace}, + expectedNotExist: []*trace{pastTrace, nowTrace}, + }, + { + name: "cut recent", + cutoff: 0, + immediate: false, + input: []*trace{pastTrace, nowTrace}, + expectedExist: []*trace{nowTrace}, + expectedNotExist: []*trace{pastTrace}, + }, + { + name: "cut all time", + cutoff: 2 * time.Hour, + immediate: false, + input: []*trace{pastTrace, nowTrace}, + expectedNotExist: []*trace{pastTrace, nowTrace}, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + instance := defaultInstance(t, tempDir) + + for _, trace := range tc.input { + fp := instance.tokenForTraceID(trace.traceID) + instance.traces[fp] = trace + } + + err := instance.CutCompleteTraces(tc.cutoff, tc.immediate) + require.NoError(t, err) + + assert.Equal(t, len(tc.expectedExist), len(instance.traces)) + for _, expectedExist := range tc.expectedExist { + _, ok := instance.traces[instance.tokenForTraceID(expectedExist.traceID)] + assert.True(t, ok) + } + + for _, expectedNotExist := range tc.expectedNotExist { + _, ok := instance.traces[instance.tokenForTraceID(expectedNotExist.traceID)] + assert.False(t, ok) + } + }) + } +} + func defaultInstance(t assert.TestingT, tempDir string) *instance { limits, err := overrides.NewOverrides(overrides.Limits{}) assert.NoError(t, err, "unexpected error creating limits")