diff --git a/CHANGELOG.md b/CHANGELOG.md index 92e702e65fb..806e0db61fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## master / unreleased * [CHANGE] Fixed ingester latency spikes on read [#461](https://github.com/grafana/tempo/pull/461) +* [CHANGE] Ingester cut blocks based on size instead of trace count. Replace ingester `traces_per_block` setting with `max_block_bytes`. This is a **breaking change**. [#474](https://github.com/grafana/tempo/issues/474) * [ENHANCEMENT] Serve config at the "/config" endpoint. [#446](https://github.com/grafana/tempo/pull/446) * [BUGFIX] Upgrade cortex dependency to 1.6 to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442) * [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index f26ed2ff408..6b2319d059a 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -52,7 +52,8 @@ ingester: ring: replication_factor: 2 # number of replicas of each span to make while pushing to the backend trace_idle_period: 20s # amount of time before considering a trace complete and flushing it to a block - traces_per_block: 100000 # maximum number of traces in a block before cutting it + max_block_bytes: 1_000_000_000 # maximum size of a block before cutting it + max_block_duration: 1h # maximum length of time before cutting a block ``` ## Query Frontend diff --git a/docs/tempo/website/troubleshooting/_index.md b/docs/tempo/website/troubleshooting/_index.md index b6d00a8059e..1d93cd6a101 100644 --- a/docs/tempo/website/troubleshooting/_index.md +++ b/docs/tempo/website/troubleshooting/_index.md @@ -106,6 +106,6 @@ If this metric is greater than zero (0), check the logs of the compactor for an - For detailed information, check - https://grafana.com/docs/tempo/latest/configuration/s3/#permissions - If there’s a compactor sitting idle while others are running, port-forward to the compactor’s http endpoint. Then go to `/compactor/ring` and click **Forget** on the inactive compactor. - Check the following configuration parameters to ensure that there are correct settings: - - `traces_per_block` to determine when the ingester cuts blocks. - - `max_compaction_objects` to determine the max number of objects in a compacted block (this should be higher than `tracer_per_block` for the compactor to actually combine multiple blocks together). This can run up to `100,000` + - `max_block_bytes` to determine when the ingester cuts blocks. A good number is anywhere from 100MB to 2GB depending on the workload. + - `max_compaction_objects` to determine the max number of objects in a compacted block. This should relatively high, generally in the millions. - `retention_duration` for how long traces should be retained in the backend. diff --git a/example/docker-compose/etc/tempo-azure.yaml b/example/docker-compose/etc/tempo-azure.yaml index 7fc58c04b7f..56eecb02468 100644 --- a/example/docker-compose/etc/tempo-azure.yaml +++ b/example/docker-compose/etc/tempo-azure.yaml @@ -20,7 +20,7 @@ distributor: ingester: trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it - traces_per_block: 100 # cut the head block when it his this number of traces or ... + max_block_bytes: 1_000_000 # cut the head block when it hits this size or ... max_block_duration: 5m # this much time passes compactor: diff --git a/example/docker-compose/etc/tempo-gcs-fake.yaml b/example/docker-compose/etc/tempo-gcs-fake.yaml index 1af2b067f3b..609ca486477 100644 --- a/example/docker-compose/etc/tempo-gcs-fake.yaml +++ b/example/docker-compose/etc/tempo-gcs-fake.yaml @@ -20,7 +20,7 @@ distributor: ingester: trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it - traces_per_block: 100 # cut the head block when it his this number of traces or ... + max_block_bytes: 1_000_000 # cut the head block when it hits this size or ... max_block_duration: 5m # this much time passes compactor: diff --git a/example/docker-compose/etc/tempo-local.yaml b/example/docker-compose/etc/tempo-local.yaml index 6d2d31dc5ee..1623f9a16f6 100644 --- a/example/docker-compose/etc/tempo-local.yaml +++ b/example/docker-compose/etc/tempo-local.yaml @@ -20,7 +20,7 @@ distributor: ingester: trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it - traces_per_block: 100 # cut the head block when it his this number of traces or ... + max_block_bytes: 1_000_000 # cut the head block when it hits this size or ... max_block_duration: 5m # this much time passes compactor: diff --git a/example/docker-compose/etc/tempo-s3-minio.yaml b/example/docker-compose/etc/tempo-s3-minio.yaml index 089a0fb198d..4823f471c92 100644 --- a/example/docker-compose/etc/tempo-s3-minio.yaml +++ b/example/docker-compose/etc/tempo-s3-minio.yaml @@ -20,7 +20,7 @@ distributor: ingester: trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it - traces_per_block: 100 # cut the head block when it his this number of traces or ... + max_block_bytes: 1_000_000 # cut the head block when it hits this size or ... max_block_duration: 5m # this much time passes compactor: diff --git a/integration/bench/config.yaml b/integration/bench/config.yaml index 1c82c605b96..1b41b02f194 100644 --- a/integration/bench/config.yaml +++ b/integration/bench/config.yaml @@ -19,7 +19,6 @@ ingester: replication_factor: 1 final_sleep: 0s trace_idle_period: 1s - traces_per_block: 10000 max_block_duration: 10m storage: diff --git a/integration/e2e/config-all-in-one-azurite.yaml b/integration/e2e/config-all-in-one-azurite.yaml index cef41706299..4853b41bcec 100644 --- a/integration/e2e/config-all-in-one-azurite.yaml +++ b/integration/e2e/config-all-in-one-azurite.yaml @@ -20,7 +20,7 @@ ingester: replication_factor: 1 final_sleep: 0s trace_idle_period: 100ms - traces_per_block: 1 + max_block_bytes: 1 max_block_duration: 2s complete_block_timeout: 5s flush_check_period: 1s diff --git a/integration/e2e/config-all-in-one.yaml b/integration/e2e/config-all-in-one.yaml index 24bacb83109..9905d5afd89 100644 --- a/integration/e2e/config-all-in-one.yaml +++ b/integration/e2e/config-all-in-one.yaml @@ -20,7 +20,7 @@ ingester: replication_factor: 1 final_sleep: 0s trace_idle_period: 100ms - traces_per_block: 1 + max_block_bytes: 1 max_block_duration: 2s complete_block_timeout: 5s flush_check_period: 1s diff --git a/integration/e2e/config-microservices.yaml b/integration/e2e/config-microservices.yaml index f2eb2aa35a3..463a65f023f 100644 --- a/integration/e2e/config-microservices.yaml +++ b/integration/e2e/config-microservices.yaml @@ -16,7 +16,6 @@ ingester: store: memberlist replication_factor: 2 trace_idle_period: 100ms - traces_per_block: 1 max_block_duration: 2s complete_block_timeout: 5s flush_check_period: 1s diff --git a/modules/ingester/config.go b/modules/ingester/config.go index 3df53a62c0e..047fb1e40da 100644 --- a/modules/ingester/config.go +++ b/modules/ingester/config.go @@ -17,8 +17,8 @@ type Config struct { FlushCheckPeriod time.Duration `yaml:"flush_check_period"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` MaxTraceIdle time.Duration `yaml:"trace_idle_period"` - MaxTracesPerBlock int `yaml:"traces_per_block"` MaxBlockDuration time.Duration `yaml:"max_block_duration"` + MaxBlockBytes uint64 `yaml:"max_block_bytes"` CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"` OverrideRingKey string `yaml:"override_ring_key"` } @@ -36,8 +36,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.FlushOpTimeout = 5 * time.Minute f.DurationVar(&cfg.MaxTraceIdle, "ingester.trace-idle-period", 30*time.Second, "Duration after which to consider a trace complete if no spans have been received") - f.IntVar(&cfg.MaxTracesPerBlock, "ingester.traces-per-block", 50000, "Maximum number of traces allowed in the head block before cutting it") f.DurationVar(&cfg.MaxBlockDuration, "ingester.max-block-duration", time.Hour, "Maximum duration which the head block can be appended to before cutting it.") + f.Uint64Var(&cfg.MaxBlockBytes, "ingester.max-block-bytes", 1024*1024*1024, "Maximum size of the head block before cutting it.") f.DurationVar(&cfg.CompleteBlockTimeout, "ingester.complete-block-timeout", time.Minute+storage.DefaultBlocklistPoll, "Duration to keep head blocks in the ingester after they have been cut.") cfg.OverrideRingKey = ring.IngesterRingKey } diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index 36bb74d5d5e..6ba1681b22f 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -90,7 +90,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { } // see if it's ready to cut a block? - err = instance.CutBlockIfReady(i.cfg.MaxTracesPerBlock, i.cfg.MaxBlockDuration, immediate) + err = instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate) if err != nil { level.Error(util.WithUserID(instance.instanceID, util.Logger)).Log("msg", "failed to cut block", "err", err) return diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index b20db0a8ebb..d419a353598 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -131,16 +131,16 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error return nil } -func (i *instance) CutBlockIfReady(maxTracesPerBlock int, maxBlockLifetime time.Duration, immediate bool) error { +func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) error { i.blocksMtx.Lock() defer i.blocksMtx.Unlock() - if i.headBlock == nil || i.headBlock.Length() == 0 { + if i.headBlock == nil || i.headBlock.DataLength() == 0 { return nil } now := time.Now() - if i.headBlock.Length() >= maxTracesPerBlock || i.lastBlockCut.Add(maxBlockLifetime).Before(now) || immediate { + if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate { if i.completingBlock != nil { return fmt.Errorf("unable to complete head block for %s b/c there is already a completing block. Will try again next cycle", i.instanceID) } diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 91de03d7e39..cdf3412d11c 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -147,11 +147,13 @@ func TestInstanceDoesNotRace(t *testing.T) { } go concurrent(func() { request := test.MakeRequest(10, []byte{}) - _ = i.Push(context.Background(), request) + err := i.Push(context.Background(), request) + assert.NoError(t, err, "error pushing traces") }) go concurrent(func() { - _ = i.CutCompleteTraces(0, true) + err := i.CutCompleteTraces(0, true) + assert.NoError(t, err, "error cutting complete traces") }) go concurrent(func() { @@ -161,20 +163,26 @@ func TestInstanceDoesNotRace(t *testing.T) { go concurrent(func() { block := i.GetBlockToBeFlushed() if block != nil { - _ = ingester.store.WriteBlock(context.Background(), block) + err := ingester.store.WriteBlock(context.Background(), block) + assert.NoError(t, err, "error writing block") } }) go concurrent(func() { - _ = i.ClearFlushedBlocks(0) + err := i.ClearFlushedBlocks(0) + assert.NoError(t, err, "error clearing flushed blocks") }) go concurrent(func() { - _, _ = i.FindTraceByID([]byte{0x01}) + _, err := i.FindTraceByID([]byte{0x01}) + assert.NoError(t, err, "error finding trace by id") }) time.Sleep(100 * time.Millisecond) close(end) + // Wait for go funcs to quit before + // exiting and cleaning up + time.Sleep(100 * time.Millisecond) } func TestInstanceLimits(t *testing.T) { @@ -341,6 +349,84 @@ func TestInstanceCutCompleteTraces(t *testing.T) { } } +func TestInstanceCutBlockIfReady(t *testing.T) { + tempDir, _ := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + + tt := []struct { + name string + maxBlockLifetime time.Duration + maxBlockBytes uint64 + immediate bool + pushCount int + expectedToCutBlock bool + }{ + { + name: "empty", + expectedToCutBlock: false, + }, + { + name: "doesnt cut anything", + pushCount: 1, + expectedToCutBlock: false, + }, + { + name: "cut immediate", + immediate: true, + pushCount: 1, + expectedToCutBlock: true, + }, + { + name: "cut based on block lifetime", + maxBlockLifetime: time.Microsecond, + pushCount: 1, + expectedToCutBlock: true, + }, + { + name: "cut based on block size", + maxBlockBytes: 10, + pushCount: 10, + expectedToCutBlock: true, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + instance := defaultInstance(t, tempDir) + + for i := 0; i < tc.pushCount; i++ { + request := test.MakeRequest(10, []byte{}) + err := instance.Push(context.Background(), request) + require.NoError(t, err) + } + + // Defaults + if tc.maxBlockBytes == 0 { + tc.maxBlockBytes = 1000 + } + if tc.maxBlockLifetime == 0 { + tc.maxBlockLifetime = time.Hour + } + + lastCutTime := instance.lastBlockCut + + // Cut all traces to headblock for testing + err := instance.CutCompleteTraces(0, true) + require.NoError(t, err) + + err = instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate) + require.NoError(t, err) + + // Wait for goroutine to finish flushing to avoid test flakiness + if tc.expectedToCutBlock { + time.Sleep(time.Millisecond * 250) + } + + assert.Equal(t, tc.expectedToCutBlock, instance.lastBlockCut.After(lastCutTime)) + }) + } +} + func defaultInstance(t assert.TestingT, tempDir string) *instance { limits, err := overrides.NewOverrides(overrides.Limits{}) assert.NoError(t, err, "unexpected error creating limits") diff --git a/tempodb/encoding/common/types.go b/tempodb/encoding/common/types.go index 948bff4a0a0..6ad019bd731 100644 --- a/tempodb/encoding/common/types.go +++ b/tempodb/encoding/common/types.go @@ -31,6 +31,7 @@ type Appender interface { Complete() Records() []*Record Length() int + DataLength() uint64 } // ObjectCombiner is used to combine two objects in the backend diff --git a/tempodb/encoding/v0/appender.go b/tempodb/encoding/v0/appender.go index 8343d557ac3..f9587f73421 100644 --- a/tempodb/encoding/v0/appender.go +++ b/tempodb/encoding/v0/appender.go @@ -11,7 +11,7 @@ import ( type appender struct { writer io.Writer records []*common.Record - currentOffset int + currentOffset uint64 } // NewAppender returns an appender. This appender simply appends new objects @@ -37,11 +37,11 @@ func (a *appender) Append(id common.ID, b []byte) error { copy(a.records[i+1:], a.records[i:]) a.records[i] = &common.Record{ ID: id, - Start: uint64(a.currentOffset), + Start: a.currentOffset, Length: uint32(length), } - a.currentOffset += length + a.currentOffset += uint64(length) return nil } @@ -53,6 +53,10 @@ func (a *appender) Length() int { return len(a.records) } +func (a *appender) DataLength() uint64 { + return a.currentOffset +} + func (a *appender) Complete() { } diff --git a/tempodb/encoding/v0/appender_buffered.go b/tempodb/encoding/v0/appender_buffered.go index f4451de4f91..f4227f9915e 100644 --- a/tempodb/encoding/v0/appender_buffered.go +++ b/tempodb/encoding/v0/appender_buffered.go @@ -61,6 +61,10 @@ func (a *bufferedAppender) Length() int { return a.totalObjects } +func (a *bufferedAppender) DataLength() uint64 { + return a.currentOffset +} + func (a *bufferedAppender) Complete() { if a.currentRecord == nil { return diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index f2d4c534120..95080d88a62 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -28,10 +28,11 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc } name := h.fullFilename() - _, err := os.Create(name) + unused, err := os.Create(name) if err != nil { return nil, err } + unused.Close() f, err := os.OpenFile(name, os.O_APPEND|os.O_WRONLY, 0644) if err != nil { @@ -52,8 +53,8 @@ func (h *AppendBlock) Write(id common.ID, b []byte) error { return nil } -func (h *AppendBlock) Length() int { - return h.appender.Length() +func (h *AppendBlock) DataLength() uint64 { + return h.appender.DataLength() } // Complete should be called when you are done with the block. This method will write and return a new CompleteBlock which