Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for caching based on compaction level and block age #805

Merged
merged 15 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [ENHANCEMENT] Add `tempo_ingester_flush_size_bytes` metric. [#777](https://github.com/grafana/tempo/pull/777) (@bboreham)
* [ENHANCEMENT] Microservices jsonnet: resource requests and limits can be set in `$._config`. [#793](https://github.com/grafana/tempo/pull/793) (@kvrhdn)
* [ENHANCEMENT] Add `-config.expand-env` cli flag to support environment variables expansion in config file. [#796](https://github.com/grafana/tempo/pull/796) (@Ashmita152)
* [ENHANCEMENT] Add ability to control bloom filter caching based on age and/or compaction level. Add new cli command `list cache-summary`. [#805](https://github.com/grafana/tempo/pull/805) (@annanay25)
* [ENHANCEMENT] Emit traces for ingester flush operations. [#812](https://github.com/grafana/tempo/pull/812) (@bboreham)
* [ENHANCEMENT] Add retry middleware in query-frontend. [#814](https://github.com/grafana/tempo/pull/814) (@kvrhdn)
* [CHANGE] Docker images are now prefixed by their branch name [#828](https://github.com/grafana/tempo/pull/828) (@jvrplmlmn)
Expand Down
90 changes: 90 additions & 0 deletions cmd/tempo-cli/cmd-list-cachesummary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"fmt"
"os"
"time"

"github.com/olekukonko/tablewriter"
)

type listCacheSummaryCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
backendOptions
}

func (l *listCacheSummaryCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}

windowDuration := time.Hour

results, err := loadBucket(r, c, l.TenantID, windowDuration, false)
if err != nil {
return err
}

displayCacheSummary(results)

return nil
}

func displayCacheSummary(results []blockStats) {
fmt.Println()
fmt.Println("Bloom filter shards by day and compaction level:")

columns := []string{"bloom filter age"}
out := make([][]string, 0)
bloomTable := make([][]int, 0)

for _, r := range results {
row := r.CompactionLevel
// extend rows
for len(bloomTable)-1 < int(row) {
bloomTable = append(bloomTable, make([]int, 0))
}
column := -1 * (int(time.Until(r.StartTime) / (time.Hour * 24)))
// extend column of given row
for len(bloomTable[row])-1 < column {
bloomTable[row] = append(bloomTable[row], 0)
}
// extend columns (header of bloomTable)
for i := len(columns) - 1; i <= column; i++ {
columns = append(columns, fmt.Sprintf("%d days", i))
}

if int(row) < len(bloomTable) && column < len(bloomTable[row]) {
bloomTable[row][column] += int(r.BloomShardCount)
} else {
fmt.Println("something wrong with row / column", row, column)
}
}

fmt.Println()
columnTotals := make([]int, len(columns)-1)
for i, row := range bloomTable {
line := make([]string, 0)
line = append(line, fmt.Sprintf("compaction level %d", i))

for j, column := range row {
line = append(line, fmt.Sprintf("%d", column))
columnTotals[j] += column
}
out = append(out, line)
}

columnTotalsRow := make([]string, 0, len(columns))
columnTotalsRow = append(columnTotalsRow, "total")
for _, total := range columnTotals {
columnTotalsRow = append(columnTotalsRow, fmt.Sprintf("%d", total))
}

fmt.Println()
w := tablewriter.NewWriter(os.Stdout)
w.SetHeader(columns)
w.AppendBulk(out)
w.SetFooter(columnTotalsRow)
w.Render()
}
8 changes: 6 additions & 2 deletions cmd/tempo-cli/cmd-list-compactionsummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,24 @@ func displayCompactionSummary(results []blockStats) {

sort.Ints(levels)

columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest"}
columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest", "bloom shards"}

out := make([][]string, 0)

for _, l := range levels {
sizeSum := uint64(0)
sizeMin := uint64(0)
sizeMax := uint64(0)
countSum := 0
countMin := 0
countMax := 0
countBloomShards := 0

var newest time.Time
var oldest time.Time
for _, r := range resultsByLevel[l] {
sizeSum += r.Size
countSum += r.TotalObjects
countBloomShards += int(r.BloomShardCount)

if r.Size < sizeMin || sizeMin == 0 {
sizeMin = r.Size
Expand Down Expand Up @@ -110,6 +112,8 @@ func displayCompactionSummary(results []blockStats) {
s = fmt.Sprint(time.Since(oldest).Round(time.Second), " ago")
case "latest":
s = fmt.Sprint(time.Since(newest).Round(time.Second), " ago")
case "bloom shards":
s = fmt.Sprint(countBloomShards)
}
line = append(line, s)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var cli struct {
Block listBlockCmd `cmd:"" help:"List information about a block"`
Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"`
CompactionSummary listCompactionSummaryCmd `cmd:"" help:"List summary of data by compaction level"`
CacheSummary listCacheSummaryCmd `cmd:"" help:"List summary of bloom sizes per day per compaction level"`
Index listIndexCmd `cmd:"" help:"List information about a block index"`
} `cmd:""`

Expand Down
16 changes: 10 additions & 6 deletions cmd/tempo-cli/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strconv"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -60,16 +61,16 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa
fmt.Println("total blocks: ", len(blockIDs))

// Load in parallel
wg := boundedwaitgroup.New(10)
wg := boundedwaitgroup.New(20)
resultsCh := make(chan blockStats, len(blockIDs))

for _, id := range blockIDs {
for blockNum, id := range blockIDs {
wg.Add(1)

go func(id2 uuid.UUID) {
go func(id2 uuid.UUID, blockNum2 int) {
defer wg.Done()

b, err := loadBlock(r, c, tenantID, id2, windowRange, includeCompacted)
b, err := loadBlock(r, c, tenantID, id2, blockNum2, windowRange, includeCompacted)
if err != nil {
fmt.Println("Error loading block:", id2, err)
return
Expand All @@ -78,7 +79,7 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa
if b != nil {
resultsCh <- *b
}
}(id)
}(id, blockNum)
}

wg.Wait()
Expand All @@ -96,8 +97,11 @@ func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRa
return results, nil
}

func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, includeCompacted bool) (*blockStats, error) {
func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.UUID, blockNum int, windowRange time.Duration, includeCompacted bool) (*blockStats, error) {
fmt.Print(".")
if blockNum%100 == 0 {
fmt.Print(strconv.Itoa(blockNum))
}

meta, err := r.BlockMeta(context.Background(), id, tenantID)
if err == backend.ErrDoesNotExist && !includeCompacted {
Expand Down
Binary file modified docs/tempo/website/.DS_Store
Binary file not shown.
10 changes: 10 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ storage:
# Example: "cache: memcached"
[cache: <string>]

# Minimum compaction level of block to qualify for bloom filter caching. Default is 0 (disabled), meaning
# that compaction level is not used to determine if the bloom filter should be cached.
# Example: "cache_min_compaction_level: 2"
[cache_min_compaction_level: <int>]

# Max block age to qualify for bloom filter caching. Default is 0 (disabled), meaning that block age is not
# used to determine if the bloom filter should be cached.
# Example: "cache_max_block_age: 48h"
[cache_max_block_age: <duration>]

# Cortex Background cache configuration. Requires having a cache configured.
background_cache:

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
30 changes: 30 additions & 0 deletions docs/tempo/website/operations/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,33 @@ Too many open connections
```

When using the [memcached_exporter](https://github.com/prometheus/memcached_exporter), the number of open connections can be observed at `memcached_current_connections`.

### Cache Size Control

Tempo querier accesses bloom filters of all blocks while searching for a trace. This essentially mandates the size
of cache to be at-least the total size of the bloom filters (the working set) . However, in larger deployments, the
working set might be larger than the desired size of cache. When that happens, eviction rates on the cache grow high,
and hit rate drop. Not nice!

Tempo provides two config parameters in order to filter down on the items stored in cache.

```
# Min compaction level of block to qualify for caching bloom filter
# Example: "cache_min_compaction_level: 2"
[cache_min_compaction_level: <int>]

# Max block age to qualify for caching bloom filter
# Example: "cache_max_block_age: 48h"
[cache_max_block_age: <duration>]
```

Using a combination of these config options, we can narrow down on which bloom filters are cached, thereby reducing our
cache eviction rate, and increasing our cache hit rate. Nice!

So how do we decide the values of these config parameters? We have added a new command to [tempo-cli](../tempo_cli) that
prints a summary of bloom filter shards per day and per compaction level. The result looks something like this:

<p align="center"><img src="../cache-summary.png" alt="Cache summary"></p>
yvrhdn marked this conversation as resolved.
Show resolved Hide resolved

The above image shows the bloom filter shards over 14 days and 6 compaction levels. This can be used to decide the
above configuration parameters.
16 changes: 16 additions & 0 deletions docs/tempo/website/operations/tempo_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ Arguments:
tempo-cli list compaction-summary -c ./tempo.yaml single-tenant
```

## List Cache Summary
Prints information about the number of bloom filter shards per day per compaction level. This command is useful to
estimate and fine-tune cache storage. Read the [caching topic](../caching) for more information.

```bash
tempo-cli list cache-summary <tenant-id>
```

Arguments:
- `tenant-id` The tenant ID. Use `single-tenant` for single tenant setups.

**Example:**
```bash
tempo-cli list cache-summary -c ./tempo.yaml single-tenant
```

## List Index
Lists basic index info for the given block.

Expand Down
2 changes: 1 addition & 1 deletion tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type BlockMeta struct {
MinID []byte `json:"minID"` // Minimum object id stored in this block
MaxID []byte `json:"maxID"` // Maximum object id stored in this block
TenantID string `json:"tenantID"` // ID of tehant to which this block belongs
StartTime time.Time `json:"startTime"` // Currently mostly meaningless but roughly matches to the time the first obj was written to this block
StartTime time.Time `json:"startTime"` // Roughly matches when the first obj was written to this block. Used to determine block age for different purposes (cacheing, etc)
EndTime time.Time `json:"endTime"` // Currently mostly meaningless but roughly matches to the time the last obj was written to this block
TotalObjects int `json:"totalObjects"` // Total objects in this block
Size uint64 `json:"size"` // Total size in bytes of the data object
Expand Down
4 changes: 3 additions & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encodin
func finishBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.StreamingBlock) error {
level.Info(rw.logger).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta()))

bytesFlushed, err := block.Complete(context.TODO(), tracker, rw.w)
w := rw.getWriterForBlock(block.BlockMeta(), time.Now())

bytesFlushed, err := block.Complete(context.TODO(), tracker, w)
if err != nil {
return err
}
Expand Down
11 changes: 7 additions & 4 deletions tempodb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const DefaultBlocklistPollConcurrency = uint(50)
const DefaultRetentionConcurrency = uint(10)

// Config holds the entirety of tempodb configuration
// Defaults are in modules/storage/config.go
type Config struct {
Pool *pool.Config `yaml:"pool,omitempty"`
WAL *wal.Config `yaml:"wal"`
Expand All @@ -37,10 +38,12 @@ type Config struct {
Azure *azure.Config `yaml:"azure"`

// caches
Cache string `yaml:"cache"`
BackgroundCache *cortex_cache.BackgroundConfig `yaml:"background_cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`
Cache string `yaml:"cache"`
CacheMinCompactionLevel uint8 `yaml:"cache_min_compaction_level"`
CacheMaxBlockAge time.Duration `yaml:"cache_max_block_age"`
BackgroundCache *cortex_cache.BackgroundConfig `yaml:"background_cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`
}

// CompactorConfig contains compaction configuration options
Expand Down
17 changes: 14 additions & 3 deletions tempodb/encoding/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader,
blockID := meta.BlockID
tenantID := meta.TenantID

copy := func(name string) error {
// Copy streams, efficient but can't cache.
copyStream := func(name string) error {
reader, size, err := src.StreamReader(ctx, name, blockID, tenantID)
if err != nil {
return errors.Wrapf(err, "error reading %s", name)
Expand All @@ -75,8 +76,18 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader,
return dest.StreamWriter(ctx, name, blockID, tenantID, reader, size)
}

// Read entire object and attempt to cache
copy := func(name string) error {
b, err := src.Read(ctx, name, blockID, tenantID, true)
if err != nil {
return errors.Wrapf(err, "error reading %s", name)
}

return dest.Write(ctx, name, blockID, tenantID, b, true)
}

// Data
err := copy(nameObjects)
err := copyStream(nameObjects)
if err != nil {
return err
}
Expand All @@ -90,7 +101,7 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader,
}

// Index
err = copy(nameIndex)
err = copyStream(nameIndex)
if err != nil {
return err
}
Expand Down
Loading