Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to cut an active block based on total bytes #477

Merged
merged 8 commits into from
Jan 28, 2021
2 changes: 2 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ ingester:
replication_factor: 2 # number of replicas of each span to make while pushing to the backend
trace_idle_period: 20s # amount of time before considering a trace complete and flushing it to a block
traces_per_block: 100000 # maximum number of traces in a block before cutting it
max_block_bytes: 1_000_000_000 # maximum block size before cutting it
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
max_block_duration: 1h # maximum length of time before cutting a block
```

## Query Frontend
Expand Down
2 changes: 2 additions & 0 deletions modules/ingester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
MaxTraceIdle time.Duration `yaml:"trace_idle_period"`
MaxTracesPerBlock int `yaml:"traces_per_block"`
MaxBlockDuration time.Duration `yaml:"max_block_duration"`
MaxBlockBytes uint64 `yaml:"max_block_bytes"`
CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"`
OverrideRingKey string `yaml:"override_ring_key"`
}
Expand All @@ -38,6 +39,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.DurationVar(&cfg.MaxTraceIdle, "ingester.trace-idle-period", 30*time.Second, "Duration after which to consider a trace complete if no spans have been received")
f.IntVar(&cfg.MaxTracesPerBlock, "ingester.traces-per-block", 50000, "Maximum number of traces allowed in the head block before cutting it")
f.DurationVar(&cfg.MaxBlockDuration, "ingester.max-block-duration", time.Hour, "Maximum duration which the head block can be appended to before cutting it.")
f.Uint64Var(&cfg.MaxBlockBytes, "ingester.max-block-bytes", 1024*1024*1024, "Maximum size of the head block before cutting it.")
f.DurationVar(&cfg.CompleteBlockTimeout, "ingester.complete-block-timeout", time.Minute+storage.DefaultBlocklistPoll, "Duration to keep head blocks in the ingester after they have been cut.")
cfg.OverrideRingKey = ring.IngesterRingKey
}
2 changes: 1 addition & 1 deletion modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
}

// see if it's ready to cut a block?
err = instance.CutBlockIfReady(i.cfg.MaxTracesPerBlock, i.cfg.MaxBlockDuration, immediate)
err = instance.CutBlockIfReady(i.cfg.MaxTracesPerBlock, i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
if err != nil {
level.Error(util.WithUserID(instance.instanceID, util.Logger)).Log("msg", "failed to cut block", "err", err)
return
Expand Down
4 changes: 2 additions & 2 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error
return nil
}

func (i *instance) CutBlockIfReady(maxTracesPerBlock int, maxBlockLifetime time.Duration, immediate bool) error {
func (i *instance) CutBlockIfReady(maxTracesPerBlock int, maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) error {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

Expand All @@ -140,7 +140,7 @@ func (i *instance) CutBlockIfReady(maxTracesPerBlock int, maxBlockLifetime time.
}

now := time.Now()
if i.headBlock.Length() >= maxTracesPerBlock || i.lastBlockCut.Add(maxBlockLifetime).Before(now) || immediate {
if i.headBlock.Length() >= maxTracesPerBlock || i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {
if i.completingBlock != nil {
return fmt.Errorf("unable to complete head block for %s b/c there is already a completing block. Will try again next cycle", i.instanceID)
}
Expand Down
94 changes: 91 additions & 3 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestInstance(t *testing.T) {
err = i.CutCompleteTraces(0, true)
assert.NoError(t, err)

err = i.CutBlockIfReady(0, 0, false)
err = i.CutBlockIfReady(0, 0, 0, false)
assert.NoError(t, err, "unexpected error cutting block")

// try a few times while the block gets completed
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestInstanceFind(t *testing.T) {
assert.NotNil(t, trace)
assert.NoError(t, err)

err = i.CutBlockIfReady(0, 0, false)
err = i.CutBlockIfReady(0, 0, 0, false)
assert.NoError(t, err)

trace, err = i.FindTraceByID(traceID)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestInstanceDoesNotRace(t *testing.T) {
})

go concurrent(func() {
_ = i.CutBlockIfReady(0, 0, false)
_ = i.CutBlockIfReady(0, 0, 0, false)
})

go concurrent(func() {
Expand Down Expand Up @@ -341,6 +341,94 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
}
}

func TestInstanceCutBlockIfReady(t *testing.T) {
tempDir, _ := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)

tt := []struct {
name string
maxTracesPerBlock int
maxBlockLifetime time.Duration
maxBlockBytes uint64
immediate bool
pushCount int
expectedToCutBlock bool
}{
{
name: "empty",
expectedToCutBlock: false,
},
{
name: "doesnt cut anything",
pushCount: 1,
expectedToCutBlock: false,
},
{
name: "cut immediate",
immediate: true,
pushCount: 1,
expectedToCutBlock: true,
},
{
name: "cut based on trace count",
maxTracesPerBlock: 1,
pushCount: 2,
expectedToCutBlock: true,
},
{
name: "cut based on block lifetime",
maxBlockLifetime: time.Microsecond,
pushCount: 1,
expectedToCutBlock: true,
},
{
name: "cut based on block size",
maxBlockBytes: 10,
pushCount: 10,
expectedToCutBlock: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
instance := defaultInstance(t, tempDir)

for i := 0; i < tc.pushCount; i++ {
request := test.MakeRequest(10, []byte{})
err := instance.Push(context.Background(), request)
require.NoError(t, err)
}

// Defaults
if tc.maxBlockBytes == 0 {
tc.maxBlockBytes = 1000
}
if tc.maxBlockLifetime == 0 {
tc.maxBlockLifetime = time.Hour
}
if tc.maxTracesPerBlock == 0 {
tc.maxTracesPerBlock = 1000
}

lastCutTime := instance.lastBlockCut

// Cut all traces to headblock for testing
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err)

err = instance.CutBlockIfReady(tc.maxTracesPerBlock, tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate)
require.NoError(t, err)

// Wait for goroutine to finish flushing to avoid test flakiness
if tc.expectedToCutBlock {
time.Sleep(time.Millisecond * 250)
}

assert.Equal(t, tc.expectedToCutBlock, instance.lastBlockCut.After(lastCutTime))
})
}
}

func defaultInstance(t assert.TestingT, tempDir string) *instance {
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Appender interface {
Complete()
Records() []*Record
Length() int
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
DataLength() uint64
}

// ObjectCombiner is used to combine two objects in the backend
Expand Down
10 changes: 7 additions & 3 deletions tempodb/encoding/v0/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type appender struct {
writer io.Writer
records []*common.Record
currentOffset int
currentOffset uint64
}

// NewAppender returns an appender. This appender simply appends new objects
Expand All @@ -37,11 +37,11 @@ func (a *appender) Append(id common.ID, b []byte) error {
copy(a.records[i+1:], a.records[i:])
a.records[i] = &common.Record{
ID: id,
Start: uint64(a.currentOffset),
Start: a.currentOffset,
Length: uint32(length),
}

a.currentOffset += length
a.currentOffset += uint64(length)
return nil
}

Expand All @@ -53,6 +53,10 @@ func (a *appender) Length() int {
return len(a.records)
}

func (a *appender) DataLength() uint64 {
return a.currentOffset
}

func (a *appender) Complete() {

}
4 changes: 4 additions & 0 deletions tempodb/encoding/v0/appender_buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (a *bufferedAppender) Length() int {
return a.totalObjects
}

func (a *bufferedAppender) DataLength() uint64 {
return a.currentOffset
}

func (a *bufferedAppender) Complete() {
if a.currentRecord == nil {
return
Expand Down
4 changes: 4 additions & 0 deletions tempodb/wal/append_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (h *AppendBlock) Length() int {
return h.appender.Length()
}

func (h *AppendBlock) DataLength() uint64 {
return h.appender.DataLength()
}

// Complete should be called when you are done with the block. This method will write and return a new CompleteBlock which
// includes an on disk file containing all objects in order.
// Note that calling this method leaves the original file on disk. This file is still considered to be part of the WAL
Expand Down