diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 60c3d710c3f..abcd44c6397 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -55,7 +55,7 @@ var ( type instance struct { tracesMtx sync.Mutex - traces map[uint32]*trace + traces map[uint32]trace traceCount atomic.Int32 blocksMtx sync.RWMutex @@ -77,7 +77,7 @@ type instance struct { func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l *local.Backend) (*instance, error) { i := &instance{ - traces: map[uint32]*trace{}, + traces: map[uint32]trace{}, instanceID: instanceID, tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID), @@ -105,15 +105,11 @@ func (i *instance) Push(ctx context.Context, req *tempopb.PushRequest) error { i.tracesMtx.Lock() defer i.tracesMtx.Unlock() - trace, err := i.getOrCreateTrace(req) + err = i.updateTrace(req) if err != nil { return err } - if err := trace.Push(ctx, req); err != nil { - return err - } - return nil } @@ -331,27 +327,29 @@ func (i *instance) AddCompletingBlock(b *wal.AppendBlock) { i.completingBlocks = append(i.completingBlocks, b) } -// getOrCreateTrace will return a new trace object for the given request -// It must be called under the i.tracesMtx lock -func (i *instance) getOrCreateTrace(req *tempopb.PushRequest) (*trace, error) { +// updateTrace updates the trace with the provided PushRequest +func (i *instance) updateTrace(req *tempopb.PushRequest) error { traceID, err := pushRequestTraceID(req) if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "unable to extract traceID: %v", err) + return status.Errorf(codes.InvalidArgument, "unable to extract traceID: %v", err) } fp := i.tokenForTraceID(traceID) trace, ok := i.traces[fp] - if ok { - return trace, nil + if !ok { + maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID) + trace = newTrace(maxBytes, fp, traceID) + i.tracesCreatedTotal.Inc() + i.traceCount.Inc() + } + + if err := trace.Push(req); err != nil { + return err } - maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID) - trace = newTrace(maxBytes, fp, traceID) i.traces[fp] = trace - i.tracesCreatedTotal.Inc() - i.traceCount.Inc() - return trace, nil + return nil } // tokenForTraceID hash trace ID, should be called under lock @@ -369,12 +367,12 @@ func (i *instance) resetHeadBlock() error { return err } -func (i *instance) tracesToCut(cutoff time.Duration, immediate bool) []*trace { +func (i *instance) tracesToCut(cutoff time.Duration, immediate bool) []trace { i.tracesMtx.Lock() defer i.tracesMtx.Unlock() cutoffTime := time.Now().Add(cutoff) - tracesToCut := make([]*trace, 0, len(i.traces)) + tracesToCut := make([]trace, 0, len(i.traces)) for key, trace := range i.traces { if cutoffTime.After(trace.lastAppend) || immediate { diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 0f478b02cc0..a0a11e6f90b 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -322,7 +322,7 @@ func TestInstanceCutCompleteTraces(t *testing.T) { id := make([]byte, 16) rand.Read(id) tracepb := test.MakeTrace(10, id) - pastTrace := &trace{ + pastTrace := trace{ traceID: id, trace: tracepb, lastAppend: time.Now().Add(-time.Hour), @@ -330,7 +330,7 @@ func TestInstanceCutCompleteTraces(t *testing.T) { id = make([]byte, 16) rand.Read(id) - nowTrace := &trace{ + nowTrace := trace{ traceID: id, trace: tracepb, lastAppend: time.Now().Add(time.Hour), @@ -340,9 +340,9 @@ func TestInstanceCutCompleteTraces(t *testing.T) { name string cutoff time.Duration immediate bool - input []*trace - expectedExist []*trace - expectedNotExist []*trace + input []trace + expectedExist []trace + expectedNotExist []trace }{ { name: "empty", @@ -353,23 +353,23 @@ func TestInstanceCutCompleteTraces(t *testing.T) { name: "cut immediate", cutoff: 0, immediate: true, - input: []*trace{pastTrace, nowTrace}, - expectedNotExist: []*trace{pastTrace, nowTrace}, + input: []trace{pastTrace, nowTrace}, + expectedNotExist: []trace{pastTrace, nowTrace}, }, { name: "cut recent", cutoff: 0, immediate: false, - input: []*trace{pastTrace, nowTrace}, - expectedExist: []*trace{nowTrace}, - expectedNotExist: []*trace{pastTrace}, + input: []trace{pastTrace, nowTrace}, + expectedExist: []trace{nowTrace}, + expectedNotExist: []trace{pastTrace}, }, { name: "cut all time", cutoff: 2 * time.Hour, immediate: false, - input: []*trace{pastTrace, nowTrace}, - expectedNotExist: []*trace{pastTrace, nowTrace}, + input: []trace{pastTrace, nowTrace}, + expectedNotExist: []trace{pastTrace, nowTrace}, }, } diff --git a/modules/ingester/trace.go b/modules/ingester/trace.go index 0734d1bab39..8f67f5fb450 100644 --- a/modules/ingester/trace.go +++ b/modules/ingester/trace.go @@ -1,7 +1,6 @@ package ingester import ( - "context" "time" "github.com/gogo/status" @@ -19,8 +18,8 @@ type trace struct { currentBytes int } -func newTrace(maxBytes int, token uint32, traceID []byte) *trace { - return &trace{ +func newTrace(maxBytes int, token uint32, traceID []byte) trace { + return trace{ token: token, trace: &tempopb.Trace{}, lastAppend: time.Now(), @@ -29,7 +28,7 @@ func newTrace(maxBytes int, token uint32, traceID []byte) *trace { } } -func (t *trace) Push(_ context.Context, req *tempopb.PushRequest) error { +func (t *trace) Push(req *tempopb.PushRequest) error { t.lastAppend = time.Now() if t.maxBytes != 0 { reqSize := req.Size()