Skip to content

Commit

Permalink
map[]trace
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Apr 29, 2021
1 parent 021a9b4 commit 6247a77
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 36 deletions.
38 changes: 18 additions & 20 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (

type instance struct {
tracesMtx sync.Mutex
traces map[uint32]*trace
traces map[uint32]trace
traceCount atomic.Int32

blocksMtx sync.RWMutex
Expand All @@ -77,7 +77,7 @@ type instance struct {

func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l *local.Backend) (*instance, error) {
i := &instance{
traces: map[uint32]*trace{},
traces: map[uint32]trace{},

instanceID: instanceID,
tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID),
Expand Down Expand Up @@ -105,15 +105,11 @@ func (i *instance) Push(ctx context.Context, req *tempopb.PushRequest) error {
i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

trace, err := i.getOrCreateTrace(req)
err = i.updateTrace(req)
if err != nil {
return err
}

if err := trace.Push(ctx, req); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -331,27 +327,29 @@ func (i *instance) AddCompletingBlock(b *wal.AppendBlock) {
i.completingBlocks = append(i.completingBlocks, b)
}

// getOrCreateTrace will return a new trace object for the given request
// It must be called under the i.tracesMtx lock
func (i *instance) getOrCreateTrace(req *tempopb.PushRequest) (*trace, error) {
// updateTrace updates the trace with the provided PushRequest
func (i *instance) updateTrace(req *tempopb.PushRequest) error {
traceID, err := pushRequestTraceID(req)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "unable to extract traceID: %v", err)
return status.Errorf(codes.InvalidArgument, "unable to extract traceID: %v", err)
}

fp := i.tokenForTraceID(traceID)
trace, ok := i.traces[fp]
if ok {
return trace, nil
if !ok {
maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID)
trace = newTrace(maxBytes, fp, traceID)
i.tracesCreatedTotal.Inc()
i.traceCount.Inc()
}

if err := trace.Push(req); err != nil {
return err
}

maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID)
trace = newTrace(maxBytes, fp, traceID)
i.traces[fp] = trace
i.tracesCreatedTotal.Inc()
i.traceCount.Inc()

return trace, nil
return nil
}

// tokenForTraceID hash trace ID, should be called under lock
Expand All @@ -369,12 +367,12 @@ func (i *instance) resetHeadBlock() error {
return err
}

func (i *instance) tracesToCut(cutoff time.Duration, immediate bool) []*trace {
func (i *instance) tracesToCut(cutoff time.Duration, immediate bool) []trace {
i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

cutoffTime := time.Now().Add(cutoff)
tracesToCut := make([]*trace, 0, len(i.traces))
tracesToCut := make([]trace, 0, len(i.traces))

for key, trace := range i.traces {
if cutoffTime.After(trace.lastAppend) || immediate {
Expand Down
24 changes: 12 additions & 12 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,15 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
id := make([]byte, 16)
rand.Read(id)
tracepb := test.MakeTrace(10, id)
pastTrace := &trace{
pastTrace := trace{
traceID: id,
trace: tracepb,
lastAppend: time.Now().Add(-time.Hour),
}

id = make([]byte, 16)
rand.Read(id)
nowTrace := &trace{
nowTrace := trace{
traceID: id,
trace: tracepb,
lastAppend: time.Now().Add(time.Hour),
Expand All @@ -340,9 +340,9 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
name string
cutoff time.Duration
immediate bool
input []*trace
expectedExist []*trace
expectedNotExist []*trace
input []trace
expectedExist []trace
expectedNotExist []trace
}{
{
name: "empty",
Expand All @@ -353,23 +353,23 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
name: "cut immediate",
cutoff: 0,
immediate: true,
input: []*trace{pastTrace, nowTrace},
expectedNotExist: []*trace{pastTrace, nowTrace},
input: []trace{pastTrace, nowTrace},
expectedNotExist: []trace{pastTrace, nowTrace},
},
{
name: "cut recent",
cutoff: 0,
immediate: false,
input: []*trace{pastTrace, nowTrace},
expectedExist: []*trace{nowTrace},
expectedNotExist: []*trace{pastTrace},
input: []trace{pastTrace, nowTrace},
expectedExist: []trace{nowTrace},
expectedNotExist: []trace{pastTrace},
},
{
name: "cut all time",
cutoff: 2 * time.Hour,
immediate: false,
input: []*trace{pastTrace, nowTrace},
expectedNotExist: []*trace{pastTrace, nowTrace},
input: []trace{pastTrace, nowTrace},
expectedNotExist: []trace{pastTrace, nowTrace},
},
}

Expand Down
7 changes: 3 additions & 4 deletions modules/ingester/trace.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ingester

import (
"context"
"time"

"github.com/gogo/status"
Expand All @@ -19,8 +18,8 @@ type trace struct {
currentBytes int
}

func newTrace(maxBytes int, token uint32, traceID []byte) *trace {
return &trace{
func newTrace(maxBytes int, token uint32, traceID []byte) trace {
return trace{
token: token,
trace: &tempopb.Trace{},
lastAppend: time.Now(),
Expand All @@ -29,7 +28,7 @@ func newTrace(maxBytes int, token uint32, traceID []byte) *trace {
}
}

func (t *trace) Push(_ context.Context, req *tempopb.PushRequest) error {
func (t *trace) Push(req *tempopb.PushRequest) error {
t.lastAppend = time.Now()
if t.maxBytes != 0 {
reqSize := req.Size()
Expand Down

0 comments on commit 6247a77

Please sign in to comment.