Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to cut an active block based on total bytes #477

Merged
merged 8 commits into from
Jan 28, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## master / unreleased
* [CHANGE] Fixed ingester latency spikes on read [#461](https://github.com/grafana/tempo/pull/461)
* [CHANGE] Ingester cut blocks based on size instead of trace count. Replace ingester `traces_per_block` setting with `max_block_bytes`. This is a **breaking change**. [#474](https://github.com/grafana/tempo/issues/474)
* [ENHANCEMENT] Serve config at the "/config" endpoint. [#446](https://github.com/grafana/tempo/pull/446)
* [BUGFIX] Upgrade cortex dependency to 1.6 to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442)

Expand Down
3 changes: 2 additions & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ ingester:
ring:
replication_factor: 2 # number of replicas of each span to make while pushing to the backend
trace_idle_period: 20s # amount of time before considering a trace complete and flushing it to a block
traces_per_block: 100000 # maximum number of traces in a block before cutting it
max_block_bytes: 1_000_000_000 # maximum size of a block before cutting it
max_block_duration: 1h # maximum length of time before cutting a block
```

## Query Frontend
Expand Down
4 changes: 2 additions & 2 deletions docs/tempo/website/troubleshooting/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ If this metric is greater than zero (0), check the logs of the compactor for an
- For detailed information, check - https://grafana.com/docs/tempo/latest/configuration/s3/#permissions
- If there’s a compactor sitting idle while others are running, port-forward to the compactor’s http endpoint. Then go to `/compactor/ring` and click **Forget** on the inactive compactor.
- Check the following configuration parameters to ensure that there are correct settings:
- `traces_per_block` to determine when the ingester cuts blocks.
- `max_compaction_objects` to determine the max number of objects in a compacted block (this should be higher than `tracer_per_block` for the compactor to actually combine multiple blocks together). This can run up to `100,000`
- `max_block_bytes` to determine when the ingester cuts blocks. A good number is anywhere from 100MB to 2GB depending on the workload.
- `max_compaction_objects` to determine the max number of objects in a compacted block. This should relatively high, generally in the millions.
- `retention_duration` for how long traces should be retained in the backend.
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ distributor:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
traces_per_block: 100 # cut the head block when it his this number of traces or ...
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-gcs-fake.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ distributor:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
traces_per_block: 100 # cut the head block when it his this number of traces or ...
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ distributor:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
traces_per_block: 100 # cut the head block when it his this number of traces or ...
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-s3-minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ distributor:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
traces_per_block: 100 # cut the head block when it his this number of traces or ...
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
Expand Down
1 change: 0 additions & 1 deletion integration/bench/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ ingester:
replication_factor: 1
final_sleep: 0s
trace_idle_period: 1s
traces_per_block: 10000
max_block_duration: 10m

storage:
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/config-all-in-one-azurite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ingester:
replication_factor: 1
final_sleep: 0s
trace_idle_period: 100ms
traces_per_block: 1
max_block_bytes: 1
max_block_duration: 2s
complete_block_timeout: 5s
flush_check_period: 1s
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/config-all-in-one.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ingester:
replication_factor: 1
final_sleep: 0s
trace_idle_period: 100ms
traces_per_block: 1
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
max_block_bytes: 1
max_block_duration: 2s
complete_block_timeout: 5s
flush_check_period: 1s
Expand Down
1 change: 0 additions & 1 deletion integration/e2e/config-microservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ ingester:
store: memberlist
replication_factor: 2
trace_idle_period: 100ms
traces_per_block: 1
max_block_duration: 2s
complete_block_timeout: 5s
flush_check_period: 1s
Expand Down
4 changes: 2 additions & 2 deletions modules/ingester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type Config struct {
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
MaxTraceIdle time.Duration `yaml:"trace_idle_period"`
MaxTracesPerBlock int `yaml:"traces_per_block"`
MaxBlockDuration time.Duration `yaml:"max_block_duration"`
MaxBlockBytes uint64 `yaml:"max_block_bytes"`
CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"`
OverrideRingKey string `yaml:"override_ring_key"`
}
Expand All @@ -36,8 +36,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.FlushOpTimeout = 5 * time.Minute

f.DurationVar(&cfg.MaxTraceIdle, "ingester.trace-idle-period", 30*time.Second, "Duration after which to consider a trace complete if no spans have been received")
f.IntVar(&cfg.MaxTracesPerBlock, "ingester.traces-per-block", 50000, "Maximum number of traces allowed in the head block before cutting it")
f.DurationVar(&cfg.MaxBlockDuration, "ingester.max-block-duration", time.Hour, "Maximum duration which the head block can be appended to before cutting it.")
f.Uint64Var(&cfg.MaxBlockBytes, "ingester.max-block-bytes", 1024*1024*1024, "Maximum size of the head block before cutting it.")
f.DurationVar(&cfg.CompleteBlockTimeout, "ingester.complete-block-timeout", time.Minute+storage.DefaultBlocklistPoll, "Duration to keep head blocks in the ingester after they have been cut.")
cfg.OverrideRingKey = ring.IngesterRingKey
}
2 changes: 1 addition & 1 deletion modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
}

// see if it's ready to cut a block?
err = instance.CutBlockIfReady(i.cfg.MaxTracesPerBlock, i.cfg.MaxBlockDuration, immediate)
err = instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
if err != nil {
level.Error(util.WithUserID(instance.instanceID, util.Logger)).Log("msg", "failed to cut block", "err", err)
return
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error
return nil
}

func (i *instance) CutBlockIfReady(maxTracesPerBlock int, maxBlockLifetime time.Duration, immediate bool) error {
func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) error {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

if i.headBlock == nil || i.headBlock.Length() == 0 {
if i.headBlock == nil || i.headBlock.DataLength() == 0 {
return nil
}

now := time.Now()
if i.headBlock.Length() >= maxTracesPerBlock || i.lastBlockCut.Add(maxBlockLifetime).Before(now) || immediate {
if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {
if i.completingBlock != nil {
return fmt.Errorf("unable to complete head block for %s b/c there is already a completing block. Will try again next cycle", i.instanceID)
}
Expand Down
96 changes: 91 additions & 5 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,13 @@ func TestInstanceDoesNotRace(t *testing.T) {
}
go concurrent(func() {
request := test.MakeRequest(10, []byte{})
_ = i.Push(context.Background(), request)
err := i.Push(context.Background(), request)
assert.NoError(t, err, "error pushing traces")
})

go concurrent(func() {
_ = i.CutCompleteTraces(0, true)
err := i.CutCompleteTraces(0, true)
assert.NoError(t, err, "error cutting complete traces")
})

go concurrent(func() {
Expand All @@ -161,20 +163,26 @@ func TestInstanceDoesNotRace(t *testing.T) {
go concurrent(func() {
block := i.GetBlockToBeFlushed()
if block != nil {
_ = ingester.store.WriteBlock(context.Background(), block)
err := ingester.store.WriteBlock(context.Background(), block)
assert.NoError(t, err, "error writing block")
}
})

go concurrent(func() {
_ = i.ClearFlushedBlocks(0)
err := i.ClearFlushedBlocks(0)
assert.NoError(t, err, "error clearing flushed blocks")
})

go concurrent(func() {
_, _ = i.FindTraceByID([]byte{0x01})
_, err := i.FindTraceByID([]byte{0x01})
assert.NoError(t, err, "error finding trace by id")
})

time.Sleep(100 * time.Millisecond)
close(end)
// Wait for go funcs to quit before
// exiting and cleaning up
time.Sleep(100 * time.Millisecond)
}

func TestInstanceLimits(t *testing.T) {
Expand Down Expand Up @@ -341,6 +349,84 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
}
}

func TestInstanceCutBlockIfReady(t *testing.T) {
tempDir, _ := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)

tt := []struct {
name string
maxBlockLifetime time.Duration
maxBlockBytes uint64
immediate bool
pushCount int
expectedToCutBlock bool
}{
{
name: "empty",
expectedToCutBlock: false,
},
{
name: "doesnt cut anything",
pushCount: 1,
expectedToCutBlock: false,
},
{
name: "cut immediate",
immediate: true,
pushCount: 1,
expectedToCutBlock: true,
},
{
name: "cut based on block lifetime",
maxBlockLifetime: time.Microsecond,
pushCount: 1,
expectedToCutBlock: true,
},
{
name: "cut based on block size",
maxBlockBytes: 10,
pushCount: 10,
expectedToCutBlock: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
instance := defaultInstance(t, tempDir)

for i := 0; i < tc.pushCount; i++ {
request := test.MakeRequest(10, []byte{})
err := instance.Push(context.Background(), request)
require.NoError(t, err)
}

// Defaults
if tc.maxBlockBytes == 0 {
tc.maxBlockBytes = 1000
}
if tc.maxBlockLifetime == 0 {
tc.maxBlockLifetime = time.Hour
}

lastCutTime := instance.lastBlockCut

// Cut all traces to headblock for testing
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err)

err = instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate)
require.NoError(t, err)

// Wait for goroutine to finish flushing to avoid test flakiness
if tc.expectedToCutBlock {
time.Sleep(time.Millisecond * 250)
}

assert.Equal(t, tc.expectedToCutBlock, instance.lastBlockCut.After(lastCutTime))
})
}
}

func defaultInstance(t assert.TestingT, tempDir string) *instance {
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Appender interface {
Complete()
Records() []*Record
Length() int
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
DataLength() uint64
}

// ObjectCombiner is used to combine two objects in the backend
Expand Down
10 changes: 7 additions & 3 deletions tempodb/encoding/v0/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type appender struct {
writer io.Writer
records []*common.Record
currentOffset int
currentOffset uint64
}

// NewAppender returns an appender. This appender simply appends new objects
Expand All @@ -37,11 +37,11 @@ func (a *appender) Append(id common.ID, b []byte) error {
copy(a.records[i+1:], a.records[i:])
a.records[i] = &common.Record{
ID: id,
Start: uint64(a.currentOffset),
Start: a.currentOffset,
Length: uint32(length),
}

a.currentOffset += length
a.currentOffset += uint64(length)
return nil
}

Expand All @@ -53,6 +53,10 @@ func (a *appender) Length() int {
return len(a.records)
}

func (a *appender) DataLength() uint64 {
return a.currentOffset
}

func (a *appender) Complete() {

}
4 changes: 4 additions & 0 deletions tempodb/encoding/v0/appender_buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (a *bufferedAppender) Length() int {
return a.totalObjects
}

func (a *bufferedAppender) DataLength() uint64 {
return a.currentOffset
}

func (a *bufferedAppender) Complete() {
if a.currentRecord == nil {
return
Expand Down
7 changes: 4 additions & 3 deletions tempodb/wal/append_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc
}

name := h.fullFilename()
_, err := os.Create(name)
unused, err := os.Create(name)
if err != nil {
return nil, err
}
unused.Close()

f, err := os.OpenFile(name, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
Expand All @@ -52,8 +53,8 @@ func (h *AppendBlock) Write(id common.ID, b []byte) error {
return nil
}

func (h *AppendBlock) Length() int {
return h.appender.Length()
func (h *AppendBlock) DataLength() uint64 {
return h.appender.DataLength()
}

// Complete should be called when you are done with the block. This method will write and return a new CompleteBlock which
Expand Down