Skip to content

Commit

Permalink
Add support to cut an active block based on total bytes (#477)
Browse files Browse the repository at this point in the history
* Add max_block_bytes setting for ingester to cut blocks based on data size

* Update docs

* Remove traces_per_block setting. Update examples and docs

* Update changelog and troubleshooting guide

* Update docs/tempo/website/configuration/_index.md

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>

* Fix to close unused file handle in AppendBlock creation immediately

* Reset integration e2e tests to really low block limits

* Remove unnused AppendBlock.Length

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>
  • Loading branch information
mdisibio and achatterjee-grafana authored Jan 28, 2021
1 parent 6b5c3b8 commit d7bc9b1
Show file tree
Hide file tree
Showing 19 changed files with 124 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## master / unreleased
* [CHANGE] Fixed ingester latency spikes on read [#461](https://github.com/grafana/tempo/pull/461)
* [CHANGE] Ingester cut blocks based on size instead of trace count. Replace ingester `traces_per_block` setting with `max_block_bytes`. This is a **breaking change**. [#474](https://github.com/grafana/tempo/issues/474)
* [ENHANCEMENT] Serve config at the "/config" endpoint. [#446](https://github.com/grafana/tempo/pull/446)
* [BUGFIX] Upgrade cortex dependency to 1.6 to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442)
* [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481)
Expand Down
3 changes: 2 additions & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ ingester:
ring:
replication_factor: 2 # number of replicas of each span to make while pushing to the backend
trace_idle_period: 20s # amount of time before considering a trace complete and flushing it to a block
traces_per_block: 100000 # maximum number of traces in a block before cutting it
max_block_bytes: 1_000_000_000 # maximum size of a block before cutting it
max_block_duration: 1h # maximum length of time before cutting a block
```

## Query Frontend
Expand Down
4 changes: 2 additions & 2 deletions docs/tempo/website/troubleshooting/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ If this metric is greater than zero (0), check the logs of the compactor for an
- For detailed information, check - https://grafana.com/docs/tempo/latest/configuration/s3/#permissions
- If there’s a compactor sitting idle while others are running, port-forward to the compactor’s http endpoint. Then go to `/compactor/ring` and click **Forget** on the inactive compactor.
- Check the following configuration parameters to ensure that there are correct settings:
- `traces_per_block` to determine when the ingester cuts blocks.
- `max_compaction_objects` to determine the max number of objects in a compacted block (this should be higher than `tracer_per_block` for the compactor to actually combine multiple blocks together). This can run up to `100,000`
- `max_block_bytes` to determine when the ingester cuts blocks. A good number is anywhere from 100MB to 2GB depending on the workload.
- `max_compaction_objects` to determine the max number of objects in a compacted block. This should relatively high, generally in the millions.
- `retention_duration` for how long traces should be retained in the backend.
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ distributor:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
traces_per_block: 100 # cut the head block when it his this number of traces or ...
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-gcs-fake.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ distributor:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
traces_per_block: 100 # cut the head block when it his this number of traces or ...
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ distributor:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
traces_per_block: 100 # cut the head block when it his this number of traces or ...
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-s3-minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ distributor:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
traces_per_block: 100 # cut the head block when it his this number of traces or ...
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
Expand Down
1 change: 0 additions & 1 deletion integration/bench/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ ingester:
replication_factor: 1
final_sleep: 0s
trace_idle_period: 1s
traces_per_block: 10000
max_block_duration: 10m

storage:
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/config-all-in-one-azurite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ingester:
replication_factor: 1
final_sleep: 0s
trace_idle_period: 100ms
traces_per_block: 1
max_block_bytes: 1
max_block_duration: 2s
complete_block_timeout: 5s
flush_check_period: 1s
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/config-all-in-one.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ingester:
replication_factor: 1
final_sleep: 0s
trace_idle_period: 100ms
traces_per_block: 1
max_block_bytes: 1
max_block_duration: 2s
complete_block_timeout: 5s
flush_check_period: 1s
Expand Down
1 change: 0 additions & 1 deletion integration/e2e/config-microservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ ingester:
store: memberlist
replication_factor: 2
trace_idle_period: 100ms
traces_per_block: 1
max_block_duration: 2s
complete_block_timeout: 5s
flush_check_period: 1s
Expand Down
4 changes: 2 additions & 2 deletions modules/ingester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type Config struct {
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
MaxTraceIdle time.Duration `yaml:"trace_idle_period"`
MaxTracesPerBlock int `yaml:"traces_per_block"`
MaxBlockDuration time.Duration `yaml:"max_block_duration"`
MaxBlockBytes uint64 `yaml:"max_block_bytes"`
CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"`
OverrideRingKey string `yaml:"override_ring_key"`
}
Expand All @@ -36,8 +36,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.FlushOpTimeout = 5 * time.Minute

f.DurationVar(&cfg.MaxTraceIdle, "ingester.trace-idle-period", 30*time.Second, "Duration after which to consider a trace complete if no spans have been received")
f.IntVar(&cfg.MaxTracesPerBlock, "ingester.traces-per-block", 50000, "Maximum number of traces allowed in the head block before cutting it")
f.DurationVar(&cfg.MaxBlockDuration, "ingester.max-block-duration", time.Hour, "Maximum duration which the head block can be appended to before cutting it.")
f.Uint64Var(&cfg.MaxBlockBytes, "ingester.max-block-bytes", 1024*1024*1024, "Maximum size of the head block before cutting it.")
f.DurationVar(&cfg.CompleteBlockTimeout, "ingester.complete-block-timeout", time.Minute+storage.DefaultBlocklistPoll, "Duration to keep head blocks in the ingester after they have been cut.")
cfg.OverrideRingKey = ring.IngesterRingKey
}
2 changes: 1 addition & 1 deletion modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
}

// see if it's ready to cut a block?
err = instance.CutBlockIfReady(i.cfg.MaxTracesPerBlock, i.cfg.MaxBlockDuration, immediate)
err = instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
if err != nil {
level.Error(util.WithUserID(instance.instanceID, util.Logger)).Log("msg", "failed to cut block", "err", err)
return
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error
return nil
}

func (i *instance) CutBlockIfReady(maxTracesPerBlock int, maxBlockLifetime time.Duration, immediate bool) error {
func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) error {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

if i.headBlock == nil || i.headBlock.Length() == 0 {
if i.headBlock == nil || i.headBlock.DataLength() == 0 {
return nil
}

now := time.Now()
if i.headBlock.Length() >= maxTracesPerBlock || i.lastBlockCut.Add(maxBlockLifetime).Before(now) || immediate {
if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {
if i.completingBlock != nil {
return fmt.Errorf("unable to complete head block for %s b/c there is already a completing block. Will try again next cycle", i.instanceID)
}
Expand Down
96 changes: 91 additions & 5 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,13 @@ func TestInstanceDoesNotRace(t *testing.T) {
}
go concurrent(func() {
request := test.MakeRequest(10, []byte{})
_ = i.Push(context.Background(), request)
err := i.Push(context.Background(), request)
assert.NoError(t, err, "error pushing traces")
})

go concurrent(func() {
_ = i.CutCompleteTraces(0, true)
err := i.CutCompleteTraces(0, true)
assert.NoError(t, err, "error cutting complete traces")
})

go concurrent(func() {
Expand All @@ -161,20 +163,26 @@ func TestInstanceDoesNotRace(t *testing.T) {
go concurrent(func() {
block := i.GetBlockToBeFlushed()
if block != nil {
_ = ingester.store.WriteBlock(context.Background(), block)
err := ingester.store.WriteBlock(context.Background(), block)
assert.NoError(t, err, "error writing block")
}
})

go concurrent(func() {
_ = i.ClearFlushedBlocks(0)
err := i.ClearFlushedBlocks(0)
assert.NoError(t, err, "error clearing flushed blocks")
})

go concurrent(func() {
_, _ = i.FindTraceByID([]byte{0x01})
_, err := i.FindTraceByID([]byte{0x01})
assert.NoError(t, err, "error finding trace by id")
})

time.Sleep(100 * time.Millisecond)
close(end)
// Wait for go funcs to quit before
// exiting and cleaning up
time.Sleep(100 * time.Millisecond)
}

func TestInstanceLimits(t *testing.T) {
Expand Down Expand Up @@ -341,6 +349,84 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
}
}

func TestInstanceCutBlockIfReady(t *testing.T) {
tempDir, _ := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)

tt := []struct {
name string
maxBlockLifetime time.Duration
maxBlockBytes uint64
immediate bool
pushCount int
expectedToCutBlock bool
}{
{
name: "empty",
expectedToCutBlock: false,
},
{
name: "doesnt cut anything",
pushCount: 1,
expectedToCutBlock: false,
},
{
name: "cut immediate",
immediate: true,
pushCount: 1,
expectedToCutBlock: true,
},
{
name: "cut based on block lifetime",
maxBlockLifetime: time.Microsecond,
pushCount: 1,
expectedToCutBlock: true,
},
{
name: "cut based on block size",
maxBlockBytes: 10,
pushCount: 10,
expectedToCutBlock: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
instance := defaultInstance(t, tempDir)

for i := 0; i < tc.pushCount; i++ {
request := test.MakeRequest(10, []byte{})
err := instance.Push(context.Background(), request)
require.NoError(t, err)
}

// Defaults
if tc.maxBlockBytes == 0 {
tc.maxBlockBytes = 1000
}
if tc.maxBlockLifetime == 0 {
tc.maxBlockLifetime = time.Hour
}

lastCutTime := instance.lastBlockCut

// Cut all traces to headblock for testing
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err)

err = instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate)
require.NoError(t, err)

// Wait for goroutine to finish flushing to avoid test flakiness
if tc.expectedToCutBlock {
time.Sleep(time.Millisecond * 250)
}

assert.Equal(t, tc.expectedToCutBlock, instance.lastBlockCut.After(lastCutTime))
})
}
}

func defaultInstance(t assert.TestingT, tempDir string) *instance {
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Appender interface {
Complete()
Records() []*Record
Length() int
DataLength() uint64
}

// ObjectCombiner is used to combine two objects in the backend
Expand Down
10 changes: 7 additions & 3 deletions tempodb/encoding/v0/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type appender struct {
writer io.Writer
records []*common.Record
currentOffset int
currentOffset uint64
}

// NewAppender returns an appender. This appender simply appends new objects
Expand All @@ -37,11 +37,11 @@ func (a *appender) Append(id common.ID, b []byte) error {
copy(a.records[i+1:], a.records[i:])
a.records[i] = &common.Record{
ID: id,
Start: uint64(a.currentOffset),
Start: a.currentOffset,
Length: uint32(length),
}

a.currentOffset += length
a.currentOffset += uint64(length)
return nil
}

Expand All @@ -53,6 +53,10 @@ func (a *appender) Length() int {
return len(a.records)
}

func (a *appender) DataLength() uint64 {
return a.currentOffset
}

func (a *appender) Complete() {

}
4 changes: 4 additions & 0 deletions tempodb/encoding/v0/appender_buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (a *bufferedAppender) Length() int {
return a.totalObjects
}

func (a *bufferedAppender) DataLength() uint64 {
return a.currentOffset
}

func (a *bufferedAppender) Complete() {
if a.currentRecord == nil {
return
Expand Down
7 changes: 4 additions & 3 deletions tempodb/wal/append_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc
}

name := h.fullFilename()
_, err := os.Create(name)
unused, err := os.Create(name)
if err != nil {
return nil, err
}
unused.Close()

f, err := os.OpenFile(name, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
Expand All @@ -52,8 +53,8 @@ func (h *AppendBlock) Write(id common.ID, b []byte) error {
return nil
}

func (h *AppendBlock) Length() int {
return h.appender.Length()
func (h *AppendBlock) DataLength() uint64 {
return h.appender.DataLength()
}

// Complete should be called when you are done with the block. This method will write and return a new CompleteBlock which
Expand Down

0 comments on commit d7bc9b1

Please sign in to comment.