Skip to content

Commit

Permalink
Add support for caching based on compaction level and block age (#805)
Browse files Browse the repository at this point in the history
* Checkpoint: Compaction summary enhancements, add block shard count & size histogram

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Experimenting with 2d map of size vs time for blocks

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Clean out histogram library, experiment with per day + per level bloom shard counters

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Actual support for caching based on level and age

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Fix bucket reader

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Add new cache summary command to view bloom filter shards per day per level

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Add docs for new cache summary command and cache estimation

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Tweak/simplify verbiage

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Change var names, logic for clarity

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* changelog

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Update ingester and compactor to use cache settings when writing blocks

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Fixed docs links and image

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Comments from review

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

Co-authored-by: Martin Disibio <mdisibio@gmail.com>
  • Loading branch information
annanay25 and mdisibio authored Aug 6, 2021
1 parent 01f52d3 commit db1209f
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [ENHANCEMENT] Add `tempo_ingester_flush_size_bytes` metric. [#777](https://github.com/grafana/tempo/pull/777) (@bboreham)
* [ENHANCEMENT] Microservices jsonnet: resource requests and limits can be set in `$._config`. [#793](https://github.com/grafana/tempo/pull/793) (@kvrhdn)
* [ENHANCEMENT] Add `-config.expand-env` cli flag to support environment variables expansion in config file. [#796](https://github.com/grafana/tempo/pull/796) (@Ashmita152)
* [ENHANCEMENT] Add ability to control bloom filter caching based on age and/or compaction level. Add new cli command `list cache-summary`. [#805](https://github.com/grafana/tempo/pull/805) (@annanay25)
* [ENHANCEMENT] Emit traces for ingester flush operations. [#812](https://github.com/grafana/tempo/pull/812) (@bboreham)
* [ENHANCEMENT] Add retry middleware in query-frontend. [#814](https://github.com/grafana/tempo/pull/814) (@kvrhdn)
* [ENHANCEMENT] Add `-use-otel-tracer` to use the OpenTelemetry tracer, this will also capture traces emitted by the gcs sdk. Experimental: not all features are supported (i.e. remote sampling). [#842](https://github.com/grafana/tempo/pull/842) (@kvrhdn)
Expand Down
90 changes: 90 additions & 0 deletions cmd/tempo-cli/cmd-list-cachesummary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"fmt"
"os"
"time"

"github.com/olekukonko/tablewriter"
)

type listCacheSummaryCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
backendOptions
}

func (l *listCacheSummaryCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}

windowDuration := time.Hour

results, err := loadBucket(r, c, l.TenantID, windowDuration, false)
if err != nil {
return err
}

displayCacheSummary(results)

return nil
}

func displayCacheSummary(results []blockStats) {
fmt.Println()
fmt.Println("Bloom filter shards by day and compaction level:")

columns := []string{"bloom filter age"}
out := make([][]string, 0)
bloomTable := make([][]int, 0)

for _, r := range results {
row := r.CompactionLevel
// extend rows
for len(bloomTable)-1 < int(row) {
bloomTable = append(bloomTable, make([]int, 0))
}
column := -1 * (int(time.Until(r.StartTime) / (time.Hour * 24)))
// extend column of given row
for len(bloomTable[row])-1 < column {
bloomTable[row] = append(bloomTable[row], 0)
}
// extend columns (header of bloomTable)
for i := len(columns) - 1; i <= column; i++ {
columns = append(columns, fmt.Sprintf("%d days", i))
}

if int(row) < len(bloomTable) && column < len(bloomTable[row]) {
bloomTable[row][column] += int(r.BloomShardCount)
} else {
fmt.Println("something wrong with row / column", row, column)
}
}

fmt.Println()
columnTotals := make([]int, len(columns)-1)
for i, row := range bloomTable {
line := make([]string, 0)
line = append(line, fmt.Sprintf("compaction level %d", i))

for j, column := range row {
line = append(line, fmt.Sprintf("%d", column))
columnTotals[j] += column
}
out = append(out, line)
}

columnTotalsRow := make([]string, 0, len(columns))
columnTotalsRow = append(columnTotalsRow, "total")
for _, total := range columnTotals {
columnTotalsRow = append(columnTotalsRow, fmt.Sprintf("%d", total))
}

fmt.Println()
w := tablewriter.NewWriter(os.Stdout)
w.SetHeader(columns)
w.AppendBulk(out)
w.SetFooter(columnTotalsRow)
w.Render()
}
8 changes: 6 additions & 2 deletions cmd/tempo-cli/cmd-list-compactionsummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,24 @@ func displayCompactionSummary(results []blockStats) {

sort.Ints(levels)

columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest"}
columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest", "bloom shards"}

out := make([][]string, 0)

for _, l := range levels {
sizeSum := uint64(0)
sizeMin := uint64(0)
sizeMax := uint64(0)
countSum := 0
countMin := 0
countMax := 0
countBloomShards := 0

var newest time.Time
var oldest time.Time
for _, r := range resultsByLevel[l] {
sizeSum += r.Size
countSum += r.TotalObjects
countBloomShards += int(r.BloomShardCount)

if r.Size < sizeMin || sizeMin == 0 {
sizeMin = r.Size
Expand Down Expand Up @@ -110,6 +112,8 @@ func displayCompactionSummary(results []blockStats) {
s = fmt.Sprint(time.Since(oldest).Round(time.Second), " ago")
case "latest":
s = fmt.Sprint(time.Since(newest).Round(time.Second), " ago")
case "bloom shards":
s = fmt.Sprint(countBloomShards)
}
line = append(line, s)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var cli struct {
Block listBlockCmd `cmd:"" help:"List information about a block"`
Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"`
CompactionSummary listCompactionSummaryCmd `cmd:"" help:"List summary of data by compaction level"`
CacheSummary listCacheSummaryCmd `cmd:"" help:"List summary of bloom sizes per day per compaction level"`
Index listIndexCmd `cmd:"" help:"List information about a block index"`
} `cmd:""`

Expand Down
16 changes: 10 additions & 6 deletions cmd/tempo-cli/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strconv"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -60,16 +61,16 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa
fmt.Println("total blocks: ", len(blockIDs))

// Load in parallel
wg := boundedwaitgroup.New(10)
wg := boundedwaitgroup.New(20)
resultsCh := make(chan blockStats, len(blockIDs))

for _, id := range blockIDs {
for blockNum, id := range blockIDs {
wg.Add(1)

go func(id2 uuid.UUID) {
go func(id2 uuid.UUID, blockNum2 int) {
defer wg.Done()

b, err := loadBlock(r, c, tenantID, id2, windowRange, includeCompacted)
b, err := loadBlock(r, c, tenantID, id2, blockNum2, windowRange, includeCompacted)
if err != nil {
fmt.Println("Error loading block:", id2, err)
return
Expand All @@ -78,7 +79,7 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa
if b != nil {
resultsCh <- *b
}
}(id)
}(id, blockNum)
}

wg.Wait()
Expand All @@ -96,8 +97,11 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa
return results, nil
}

func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, includeCompacted bool) (*blockStats, error) {
func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.UUID, blockNum int, windowRange time.Duration, includeCompacted bool) (*blockStats, error) {
fmt.Print(".")
if blockNum%100 == 0 {
fmt.Print(strconv.Itoa(blockNum))
}

meta, err := r.BlockMeta(context.Background(), id, tenantID)
if err == backend.ErrDoesNotExist && !includeCompacted {
Expand Down
Binary file modified docs/tempo/website/.DS_Store
Binary file not shown.
10 changes: 10 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ storage:
# Example: "cache: memcached"
[cache: <string>]
# Minimum compaction level of block to qualify for bloom filter caching. Default is 0 (disabled), meaning
# that compaction level is not used to determine if the bloom filter should be cached.
# Example: "cache_min_compaction_level: 2"
[cache_min_compaction_level: <int>]
# Max block age to qualify for bloom filter caching. Default is 0 (disabled), meaning that block age is not
# used to determine if the bloom filter should be cached.
# Example: "cache_max_block_age: 48h"
[cache_max_block_age: <duration>]
# Cortex Background cache configuration. Requires having a cache configured.
background_cache:
Expand Down
Binary file added docs/tempo/website/operations/cache-summary.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
30 changes: 30 additions & 0 deletions docs/tempo/website/operations/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,33 @@ Too many open connections
```

When using the [memcached_exporter](https://github.com/prometheus/memcached_exporter), the number of open connections can be observed at `memcached_current_connections`.

### Cache Size Control

Tempo querier accesses bloom filters of all blocks while searching for a trace. This essentially mandates the size
of cache to be at-least the total size of the bloom filters (the working set) . However, in larger deployments, the
working set might be larger than the desired size of cache. When that happens, eviction rates on the cache grow high,
and hit rate drop. Not nice!

Tempo provides two config parameters in order to filter down on the items stored in cache.

```
# Min compaction level of block to qualify for caching bloom filter
# Example: "cache_min_compaction_level: 2"
[cache_min_compaction_level: <int>]
# Max block age to qualify for caching bloom filter
# Example: "cache_max_block_age: 48h"
[cache_max_block_age: <duration>]
```

Using a combination of these config options, we can narrow down on which bloom filters are cached, thereby reducing our
cache eviction rate, and increasing our cache hit rate. Nice!

So how do we decide the values of these config parameters? We have added a new command to [tempo-cli](../tempo_cli) that
prints a summary of bloom filter shards per day and per compaction level. The result looks something like this:

<p align="center"><img src="../cache-summary.png" alt="Cache summary"></p>

The above image shows the bloom filter shards over 14 days and 6 compaction levels. This can be used to decide the
above configuration parameters.
16 changes: 16 additions & 0 deletions docs/tempo/website/operations/tempo_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ Arguments:
tempo-cli list compaction-summary -c ./tempo.yaml single-tenant
```

## List Cache Summary
Prints information about the number of bloom filter shards per day per compaction level. This command is useful to
estimate and fine-tune cache storage. Read the [caching topic](../caching) for more information.

```bash
tempo-cli list cache-summary <tenant-id>
```

Arguments:
- `tenant-id` The tenant ID. Use `single-tenant` for single tenant setups.

**Example:**
```bash
tempo-cli list cache-summary -c ./tempo.yaml single-tenant
```

## List Index
Lists basic index info for the given block.

Expand Down
2 changes: 1 addition & 1 deletion tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type BlockMeta struct {
MinID []byte `json:"minID"` // Minimum object id stored in this block
MaxID []byte `json:"maxID"` // Maximum object id stored in this block
TenantID string `json:"tenantID"` // ID of tehant to which this block belongs
StartTime time.Time `json:"startTime"` // Currently mostly meaningless but roughly matches to the time the first obj was written to this block
StartTime time.Time `json:"startTime"` // Roughly matches when the first obj was written to this block. Used to determine block age for different purposes (cacheing, etc)
EndTime time.Time `json:"endTime"` // Currently mostly meaningless but roughly matches to the time the last obj was written to this block
TotalObjects int `json:"totalObjects"` // Total objects in this block
Size uint64 `json:"size"` // Total size in bytes of the data object
Expand Down
4 changes: 3 additions & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encodin
func finishBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.StreamingBlock) error {
level.Info(rw.logger).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta()))

bytesFlushed, err := block.Complete(context.TODO(), tracker, rw.w)
w := rw.getWriterForBlock(block.BlockMeta(), time.Now())

bytesFlushed, err := block.Complete(context.TODO(), tracker, w)
if err != nil {
return err
}
Expand Down
11 changes: 7 additions & 4 deletions tempodb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const DefaultBlocklistPollConcurrency = uint(50)
const DefaultRetentionConcurrency = uint(10)

// Config holds the entirety of tempodb configuration
// Defaults are in modules/storage/config.go
type Config struct {
Pool *pool.Config `yaml:"pool,omitempty"`
WAL *wal.Config `yaml:"wal"`
Expand All @@ -37,10 +38,12 @@ type Config struct {
Azure *azure.Config `yaml:"azure"`

// caches
Cache string `yaml:"cache"`
BackgroundCache *cortex_cache.BackgroundConfig `yaml:"background_cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`
Cache string `yaml:"cache"`
CacheMinCompactionLevel uint8 `yaml:"cache_min_compaction_level"`
CacheMaxBlockAge time.Duration `yaml:"cache_max_block_age"`
BackgroundCache *cortex_cache.BackgroundConfig `yaml:"background_cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`
}

// CompactorConfig contains compaction configuration options
Expand Down
17 changes: 14 additions & 3 deletions tempodb/encoding/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader,
blockID := meta.BlockID
tenantID := meta.TenantID

copy := func(name string) error {
// Copy streams, efficient but can't cache.
copyStream := func(name string) error {
reader, size, err := src.StreamReader(ctx, name, blockID, tenantID)
if err != nil {
return errors.Wrapf(err, "error reading %s", name)
Expand All @@ -75,8 +76,18 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader,
return dest.StreamWriter(ctx, name, blockID, tenantID, reader, size)
}

// Read entire object and attempt to cache
copy := func(name string) error {
b, err := src.Read(ctx, name, blockID, tenantID, true)
if err != nil {
return errors.Wrapf(err, "error reading %s", name)
}

return dest.Write(ctx, name, blockID, tenantID, b, true)
}

// Data
err := copy(nameObjects)
err := copyStream(nameObjects)
if err != nil {
return err
}
Expand All @@ -90,7 +101,7 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader,
}

// Index
err = copy(nameIndex)
err = copyStream(nameIndex)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit db1209f

Please sign in to comment.