From 8a9ae2699eb2b9f8d740f1abdddeb6dddb173311 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 9 Dec 2020 13:59:47 -0500 Subject: [PATCH 1/5] Update list blocks command to now show compacted blocks unless flag is specified. When flag specified then inlude compacted y/n column. Sort by block end time and add age column. --- cmd/tempo-cli/cmd-list-block.go | 16 ++++---- cmd/tempo-cli/cmd-list-blocks.go | 63 +++++++++++++++----------------- cmd/tempo-cli/main.go | 47 +++++++++++++++++++++--- 3 files changed, 78 insertions(+), 48 deletions(-) diff --git a/cmd/tempo-cli/cmd-list-block.go b/cmd/tempo-cli/cmd-list-block.go index d38bd3444c0..a782d3a1f2b 100644 --- a/cmd/tempo-cli/cmd-list-block.go +++ b/cmd/tempo-cli/cmd-list-block.go @@ -47,14 +47,14 @@ func dumpBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID s return nil } - objects, lvl, window, start, end := blockStats(meta, compactedMeta, windowRange) - - fmt.Println("ID : ", id) - fmt.Println("Total Objects : ", objects) - fmt.Println("Level : ", lvl) - fmt.Println("Window : ", window) - fmt.Println("Start : ", start) - fmt.Println("End : ", end) + unifiedMeta := getMeta(meta, compactedMeta, windowRange) + + fmt.Println("ID : ", unifiedMeta.id) + fmt.Println("Total Objects : ", unifiedMeta.objects) + fmt.Println("Level : ", unifiedMeta.compactionLevel) + fmt.Println("Window : ", unifiedMeta.window) + fmt.Println("Start : ", unifiedMeta.start) + fmt.Println("End : ", unifiedMeta.end) if checkDupes { fmt.Println("Searching for dupes ...") diff --git a/cmd/tempo-cli/cmd-list-blocks.go b/cmd/tempo-cli/cmd-list-blocks.go index 70f823a9dc2..bc3bb618dc8 100644 --- a/cmd/tempo-cli/cmd-list-blocks.go +++ b/cmd/tempo-cli/cmd-list-blocks.go @@ -9,15 +9,15 @@ import ( "strconv" "time" - "github.com/google/uuid" tempodb_backend "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/olekukonko/tablewriter" ) type listBlocksCmd struct { - TenantID string `arg:"" help:"tenant-id within the bucket"` - LoadIndex bool `help:"load block indexes and display additional information"` + TenantID string `arg:"" help:"tenant-id within the bucket"` + LoadIndex bool `help:"load block indexes and display additional information"` + IncludeCompacted bool `help:"include compacted blocks"` backendOptions } @@ -30,41 +30,38 @@ func (l *listBlocksCmd) Run(ctx *globalOptions) error { windowDuration := time.Hour - results, err := loadBucket(r, c, l.TenantID, windowDuration, l.LoadIndex) + results, err := loadBucket(r, c, l.TenantID, windowDuration, l.LoadIndex, l.IncludeCompacted) if err != nil { return err } - displayResults(results, windowDuration, l.LoadIndex) + displayResults(results, windowDuration, l.LoadIndex, l.IncludeCompacted) return nil } -type bucketStats struct { - id uuid.UUID - compactionLevel uint8 - objects int - window int64 - start time.Time - end time.Time +type blockStats struct { + unifiedBlockMeta totalIDs int duplicateIDs int } -func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, loadIndex bool) ([]bucketStats, error) { +func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, loadIndex bool, includeCompacted bool) ([]blockStats, error) { blockIDs, err := r.Blocks(context.Background(), tenantID) if err != nil { return nil, err } fmt.Println("total blocks: ", len(blockIDs)) - results := make([]bucketStats, 0) + results := make([]blockStats, 0) for _, id := range blockIDs { fmt.Print(".") meta, err := r.BlockMeta(context.Background(), id, tenantID) - if err != nil && err != tempodb_backend.ErrMetaDoesNotExist { + if err == tempodb_backend.ErrMetaDoesNotExist && !includeCompacted { + continue + } else if err != nil && err != tempodb_backend.ErrMetaDoesNotExist { return nil, err } @@ -93,15 +90,8 @@ func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID } } - objects, lvl, window, start, end := blockStats(meta, compactedMeta, windowRange) - - results = append(results, bucketStats{ - id: id, - compactionLevel: lvl, - objects: objects, - window: window, - start: start, - end: end, + results = append(results, blockStats{ + unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange), totalIDs: totalIDs, duplicateIDs: duplicateIDs, @@ -109,25 +99,21 @@ func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID } sort.Slice(results, func(i, j int) bool { - bI := results[i] - bJ := results[j] - - if bI.window == bJ.window { - return bI.compactionLevel < bJ.compactionLevel - } - - return bI.window < bJ.window + return results[i].end.Before(results[j].end) }) return results, nil } -func displayResults(results []bucketStats, windowDuration time.Duration, includeIndexInfo bool) { +func displayResults(results []blockStats, windowDuration time.Duration, includeIndexInfo bool, includeCompacted bool) { - columns := []string{"id", "lvl", "count", "window", "start", "end", "duration"} + columns := []string{"id", "lvl", "count", "window", "start", "end", "duration", "age"} if includeIndexInfo { columns = append(columns, "idx", "dupe") } + if includeCompacted { + columns = append(columns, "cmp") + } totalObjects := 0 out := make([][]string, 0) @@ -155,12 +141,21 @@ func displayResults(results []bucketStats, windowDuration time.Duration, include case "duration": // Time range included in bucket s = fmt.Sprint(r.end.Sub(r.start).Round(time.Second)) + case "age": + s = fmt.Sprint(time.Since(r.end).Round(time.Second)) case "idx": // Number of entries in the index (may not be the same as the block when index downsampling enabled) s = strconv.Itoa(r.totalIDs) case "dupe": // Number of duplicate IDs found in the index s = strconv.Itoa(r.duplicateIDs) + case "cmp": + // Compacted? + if r.compacted { + s = "Y" + } else { + s = " " + } } line = append(line, s) diff --git a/cmd/tempo-cli/main.go b/cmd/tempo-cli/main.go index 6c692648cde..ad26852d6d8 100644 --- a/cmd/tempo-cli/main.go +++ b/cmd/tempo-cli/main.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "time" + "github.com/google/uuid" "github.com/grafana/tempo/cmd/tempo/app" tempodb_backend "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" @@ -107,12 +108,46 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t return r, c, nil } -func blockStats(meta *encoding.BlockMeta, compactedMeta *encoding.CompactedBlockMeta, windowRange time.Duration) (int, uint8, int64, time.Time, time.Time) { +type unifiedBlockMeta struct { + id uuid.UUID + compactionLevel uint8 + objects int + window int64 + start time.Time + end time.Time + compacted bool +} + +func getMeta(meta *encoding.BlockMeta, compactedMeta *encoding.CompactedBlockMeta, windowRange time.Duration) unifiedBlockMeta { if meta != nil { - return meta.TotalObjects, meta.CompactionLevel, meta.EndTime.Unix() / int64(windowRange/time.Second), meta.StartTime, meta.EndTime - } else if compactedMeta != nil { - return compactedMeta.TotalObjects, compactedMeta.CompactionLevel, compactedMeta.EndTime.Unix() / int64(windowRange/time.Second), compactedMeta.StartTime, compactedMeta.EndTime + return unifiedBlockMeta{ + id: meta.BlockID, + compactionLevel: meta.CompactionLevel, + objects: meta.TotalObjects, + window: meta.EndTime.Unix() / int64(windowRange/time.Second), + start: meta.StartTime, + end: meta.EndTime, + compacted: false, + } + } + if compactedMeta != nil { + return unifiedBlockMeta{ + id: compactedMeta.BlockID, + compactionLevel: compactedMeta.CompactionLevel, + objects: compactedMeta.TotalObjects, + window: compactedMeta.EndTime.Unix() / int64(windowRange/time.Second), + start: compactedMeta.StartTime, + end: compactedMeta.EndTime, + compacted: true, + } + } + return unifiedBlockMeta{ + id: uuid.UUID{}, + compactionLevel: 0, + objects: -1, + window: -1, + start: time.Unix(0, 0), + end: time.Unix(0, 0), + compacted: false, } - - return -1, 0, -1, time.Unix(0, 0), time.Unix(0, 0) } From b36f2136f5f6d829a65045cc61337f7937f8f92a Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 9 Dec 2020 14:06:06 -0500 Subject: [PATCH 2/5] Add new --include-compacted flag and columns to cli documentation --- docs/tempo/website/cli/_index.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/tempo/website/cli/_index.md b/docs/tempo/website/cli/_index.md index 0a1b108f990..328b2247de6 100644 --- a/docs/tempo/website/cli/_index.md +++ b/docs/tempo/website/cli/_index.md @@ -74,6 +74,7 @@ Arguments: - `tenant-id` The tenant ID. Use `single-tenant` for single tenant setups. Options: +- `--include-compacted` Include blocks that have been compacted. Default behavior is to display only active blocks. - `--load-index` Also load the block indexes and perform integrity checks for duplicates. **Note:** can be intense. **Output:** @@ -84,9 +85,11 @@ Explanation of output: - `Window` The time window considered for compaction purposes. - `Start` The earliest timestamp stored in the block. - `End` The latest timestamp stored in the block. +- `Age` The age of the block. - `Duration` Time duration between start and end. - `Idx` Number of records stored in the index (present when --load-index is specified). - `Dupe` Number of duplicate entries in the index. Should be zero. (present when --load-index is specified). +- `Cmp` Whether the block has been compacted (present when --include-compacted is specified). **Example:** ```bash From 0e42b53573c3d40790667a715f978afcf88560b3 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 9 Dec 2020 15:08:17 -0500 Subject: [PATCH 3/5] List blocks: load meta in parallel to speed up command --- cmd/tempo-cli/cmd-list-blocks.go | 100 ++++++++++++++++++++----------- cmd/tempo-cli/main.go | 29 +++++++++ 2 files changed, 94 insertions(+), 35 deletions(-) diff --git a/cmd/tempo-cli/cmd-list-blocks.go b/cmd/tempo-cli/cmd-list-blocks.go index bc3bb618dc8..7fcb71512b7 100644 --- a/cmd/tempo-cli/cmd-list-blocks.go +++ b/cmd/tempo-cli/cmd-list-blocks.go @@ -9,6 +9,7 @@ import ( "strconv" "time" + "github.com/google/uuid" tempodb_backend "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/olekukonko/tablewriter" @@ -53,49 +54,35 @@ func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID } fmt.Println("total blocks: ", len(blockIDs)) - results := make([]blockStats, 0) + + // Load in parallel + wg := newBoundedWaitGroup(10) + resultsCh := make(chan blockStats, len(blockIDs)) for _, id := range blockIDs { - fmt.Print(".") + wg.Add(1) - meta, err := r.BlockMeta(context.Background(), id, tenantID) - if err == tempodb_backend.ErrMetaDoesNotExist && !includeCompacted { - continue - } else if err != nil && err != tempodb_backend.ErrMetaDoesNotExist { - return nil, err - } + go func(id2 uuid.UUID) { + defer wg.Done() - compactedMeta, err := c.CompactedBlockMeta(id, tenantID) - if err != nil && err != tempodb_backend.ErrMetaDoesNotExist { - return nil, err - } - - totalIDs := -1 - duplicateIDs := -1 + b, err := loadBlock(r, c, tenantID, id2, windowRange, loadIndex, includeCompacted) + if err != nil { + fmt.Println("Error loading block:", id2, err) + return + } - if loadIndex { - indexBytes, err := r.Index(context.Background(), id, tenantID) - if err == nil { - records, err := encoding.UnmarshalRecords(indexBytes) - if err != nil { - return nil, err - } - duplicateIDs = 0 - totalIDs = len(records) - for i := 1; i < len(records); i++ { - if bytes.Equal(records[i-1].ID, records[i].ID) { - duplicateIDs++ - } - } + if b != nil { + resultsCh <- *b } - } + }(id) + } - results = append(results, blockStats{ - unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange), + wg.Wait() + close(resultsCh) - totalIDs: totalIDs, - duplicateIDs: duplicateIDs, - }) + results := make([]blockStats, 0) + for b := range resultsCh { + results = append(results, b) } sort.Slice(results, func(i, j int) bool { @@ -105,6 +92,49 @@ func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID return results, nil } +func loadBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, loadIndex bool, includeCompacted bool) (*blockStats, error) { + fmt.Print(".") + + meta, err := r.BlockMeta(context.Background(), id, tenantID) + if err == tempodb_backend.ErrMetaDoesNotExist && !includeCompacted { + return nil, nil + } else if err != nil && err != tempodb_backend.ErrMetaDoesNotExist { + return nil, err + } + + compactedMeta, err := c.CompactedBlockMeta(id, tenantID) + if err != nil && err != tempodb_backend.ErrMetaDoesNotExist { + return nil, err + } + + totalIDs := -1 + duplicateIDs := -1 + + if loadIndex { + indexBytes, err := r.Index(context.Background(), id, tenantID) + if err == nil { + records, err := encoding.UnmarshalRecords(indexBytes) + if err != nil { + return nil, err + } + duplicateIDs = 0 + totalIDs = len(records) + for i := 1; i < len(records); i++ { + if bytes.Equal(records[i-1].ID, records[i].ID) { + duplicateIDs++ + } + } + } + } + + return &blockStats{ + unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange), + + totalIDs: totalIDs, + duplicateIDs: duplicateIDs, + }, nil +} + func displayResults(results []blockStats, windowDuration time.Duration, includeIndexInfo bool, includeCompacted bool) { columns := []string{"id", "lvl", "count", "window", "start", "end", "duration", "age"} diff --git a/cmd/tempo-cli/main.go b/cmd/tempo-cli/main.go index ad26852d6d8..66cc5748bca 100644 --- a/cmd/tempo-cli/main.go +++ b/cmd/tempo-cli/main.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "io/ioutil" + "sync" "time" "github.com/google/uuid" @@ -151,3 +152,31 @@ func getMeta(meta *encoding.BlockMeta, compactedMeta *encoding.CompactedBlockMet compacted: false, } } + +// boundedWaitGroup like a normal wait group except limits number of active goroutines to given capacity. +type boundedWaitGroup struct { + wg sync.WaitGroup + ch chan struct{} // Chan buffer size is used to limit concurrency. +} + +func newBoundedWaitGroup(cap int) boundedWaitGroup { + return boundedWaitGroup{ch: make(chan struct{}, cap)} +} + +func (bwg *boundedWaitGroup) Add(delta int) { + for i := 0; i > delta; i-- { + <-bwg.ch + } + for i := 0; i < delta; i++ { + bwg.ch <- struct{}{} + } + bwg.wg.Add(delta) +} + +func (bwg *boundedWaitGroup) Done() { + bwg.Add(-1) +} + +func (bwg *boundedWaitGroup) Wait() { + bwg.wg.Wait() +} From cb9e3f0bec6ab7a84f2e886b9c3a28fa47485288 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 10 Dec 2020 08:51:18 -0500 Subject: [PATCH 4/5] Apply copy/edit suggestions Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> --- docs/tempo/website/cli/_index.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/tempo/website/cli/_index.md b/docs/tempo/website/cli/_index.md index 328b2247de6..e4f50b961b0 100644 --- a/docs/tempo/website/cli/_index.md +++ b/docs/tempo/website/cli/_index.md @@ -75,20 +75,20 @@ Arguments: Options: - `--include-compacted` Include blocks that have been compacted. Default behavior is to display only active blocks. -- `--load-index` Also load the block indexes and perform integrity checks for duplicates. **Note:** can be intense. +- `--load-index` Also load the block indexes and perform integrity checks for duplicates. **Note:** This can be a resource intensive process. **Output:** Explanation of output: - `ID` Block ID. - `Lvl` Compaction level of the block. - `Count` Number of objects stored in the block. -- `Window` The time window considered for compaction purposes. +- `Window` The window of time that was considered for compaction purposes. - `Start` The earliest timestamp stored in the block. - `End` The latest timestamp stored in the block. - `Age` The age of the block. -- `Duration` Time duration between start and end. +- `Duration`Duration between the start and end time. - `Idx` Number of records stored in the index (present when --load-index is specified). -- `Dupe` Number of duplicate entries in the index. Should be zero. (present when --load-index is specified). +- `Dupe` Number of duplicate entries in the index (present when --load-index is specified). Should be zero. - `Cmp` Whether the block has been compacted (present when --include-compacted is specified). **Example:** From 62d11e14fd708b6ae02fd3aa2c2f04538de8752b Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 10 Dec 2020 09:23:14 -0500 Subject: [PATCH 5/5] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9321001a0ce..0b3f9964338 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ * [CHANGE] Redo tempo-cli with basic command structure and improvements [#385](https://github.com/grafana/tempo/pull/385) * [CHANGE] Add content negotiation support and sharding parameters to Querier [#375](https://github.com/grafana/tempo/pull/375) * [ENHANCEMENT] Add docker-compose example for GCS along with new backend options [#397](https://github.com/grafana/tempo/pull/397) +* [ENHANCEMENT] tempo-cli list blocks usability improvements [#403](https://github.com/grafana/tempo/pull/403) * [BUGFIX] Compactor without GCS permissions fail silently [#379](https://github.com/grafana/tempo/issues/379) ## v0.4.0