Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Versioned WAL #638

Merged
merged 22 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
`ingestion_burst_size` limit override has been removed in favour of `ingestion_burst_size_bytes`.
This is a **breaking change** to the overrides config section. [#630](https://github.com/grafana/tempo/pull/630)
* [FEATURE] Add page based access to the index file. [#557](https://github.com/grafana/tempo/pull/557)
* [FEATURE] WAL Compression/checksums. [#638](https://github.com/grafana/tempo/pull/638)
* [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526)
* [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups.
This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557)
* [ENHANCEMENT] Add list compaction-summary command to tempo-cli [#588](https://github.com/grafana/tempo/pull/588)
* [ENHANCEMENT] Add list and view index commands to tempo-cli [#611](https://github.com/grafana/tempo/pull/611)
* [ENHANCEMENT] Add a configurable prefix for HTTP endpoints. [#631](https://github.com/grafana/tempo/pull/631)
* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613)
* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627)
* [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554)
* [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561)
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
Expand All @@ -23,8 +26,6 @@
* [BUGFIX] Fixes issue where Tempo would not parse odd length trace ids [#605](https://github.com/grafana/tempo/pull/605)
* [BUGFIX] Sort traces on flush to reduce unexpected recombination work by compactors [#606](https://github.com/grafana/tempo/pull/606)
* [BUGFIX] Ingester fully persists blocks locally to reduce amount of work done after restart [#628](https://github.com/grafana/tempo/pull/628)
* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613)
* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627)

## v0.6.0

Expand Down
6 changes: 5 additions & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ storage:
backend: gcs # store traces in gcs
gcs:
bucket_name: ops-tools-tracing-ops # store traces in this bucket

blocklist_poll: 5m # how often to repoll the backend for new blocks
blocklist_poll_concurrency: 50 # optional. Number of blocks to process in parallel during polling. Default is 50.
cache: memcached # optional cache configuration
Expand All @@ -138,6 +137,11 @@ storage:
queue_depth: 2000 # length of job queue
wal:
path: /var/tempo/wal # where to store the head blocks while they are being appended to
encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1_000_000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
```

## Memberlist
Expand Down
16 changes: 15 additions & 1 deletion docs/tempo/website/configuration/compression.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,18 @@ The following options are supported:

It is important to note that although all of these compression formats are supported in Tempo, at Grafana
we use zstd and it's possible/probable that the other compression algorithms may have issue at scale. Please
file an issue if you stumble upon any problems!
file an issue if you stumble upon any problems!

## WAL

The WAL also supports compression. By default this is turned off because it comes with a small performance penalty.
However, it does reduce disk i/o and add checksums to the WAL which are valuable in higher volume installations.

```
storage:
trace:
wal:
encoding: snappy
```

If WAL compression is turned on it is recommend to use snappy. All of the above options are supported.
1 change: 1 addition & 0 deletions example/docker-compose/etc/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ storage:
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
azure:
container-name: tempo # how to store data in azure
endpoint-suffix: azurite:10000
Expand Down
1 change: 1 addition & 0 deletions example/docker-compose/etc/tempo-gcs-fake.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ storage:
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
gcs:
bucket_name: tempo
endpoint: https://gcs:4443/storage/v1/
Expand Down
1 change: 1 addition & 0 deletions example/docker-compose/etc/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ storage:
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
local:
path: /tmp/tempo/blocks
pool:
Expand Down
1 change: 1 addition & 0 deletions example/docker-compose/etc/tempo-s3-minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ storage:
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
s3:
bucket: tempo # how to store data in s3
endpoint: minio:9000
Expand Down
1 change: 1 addition & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Trace.WAL = &wal.Config{}
f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.")
cfg.Trace.WAL.Encoding = backend.EncNone

cfg.Trace.Block = &encoding.BlockConfig{}
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.")
Expand Down
15 changes: 15 additions & 0 deletions tempodb/backend/readerat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import (
"context"
"io"
"io/ioutil"

"github.com/grafana/tempo/tempodb/encoding/common"
)

// ContextReader is an io.ReaderAt interface that passes context. It is used to simplify access to backend objects
// and abstract away the name/meta and other details so that the data can be accessed directly and simply
type ContextReader interface {
ReadAt(ctx context.Context, p []byte, off int64) (int, error)
ReadAll(ctx context.Context) ([]byte, error)

// Return an io.Reader representing the underlying. May not be supported by all implementations
Reader() (io.Reader, error)
}

// backendReader is a shim that allows a backend.Reader to be used as a ContextReader
Expand Down Expand Up @@ -40,6 +45,11 @@ func (b *backendReader) ReadAll(ctx context.Context) ([]byte, error) {
return b.r.Read(ctx, b.name, b.meta.BlockID, b.meta.TenantID)
}

// Reader implements ContextReader
func (b *backendReader) Reader() (io.Reader, error) {
return nil, common.ErrUnsupported
}

// AllReader is an interface that supports both io.Reader and io.ReaderAt methods
type AllReader interface {
io.Reader
Expand Down Expand Up @@ -67,3 +77,8 @@ func (r *allReader) ReadAt(ctx context.Context, p []byte, off int64) (int, error
func (r *allReader) ReadAll(ctx context.Context) ([]byte, error) {
return ioutil.ReadAll(r.r)
}

// Reader implements ContextReader
func (r *allReader) Reader() (io.Reader, error) {
return r.r, nil
}
12 changes: 9 additions & 3 deletions tempodb/encoding/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ func NewAppender(dataWriter common.DataWriter) Appender {
// Append appends the id/object to the writer. Note that the caller is giving up ownership of the two byte arrays backing the slices.
// Copies should be made and passed in if this is a problem
func (a *appender) Append(id common.ID, b []byte) error {
length, err := a.dataWriter.Write(id, b)
_, err := a.dataWriter.Write(id, b)
if err != nil {
return err
}

bytesWritten, err := a.dataWriter.CutPage()
if err != nil {
return err
}
Expand All @@ -46,10 +51,11 @@ func (a *appender) Append(id common.ID, b []byte) error {
a.records[i] = &common.Record{
ID: id,
Start: a.currentOffset,
Length: uint32(length),
Length: uint32(bytesWritten),
}

a.currentOffset += uint64(length)
a.currentOffset += uint64(bytesWritten)

return nil
}

Expand Down
28 changes: 10 additions & 18 deletions tempodb/encoding/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// BackendBlock represents a block already in the backend.
type BackendBlock struct {
encoding versionedEncoding
encoding VersionedEncoding

meta *backend.BlockMeta
reader backend.Reader
Expand All @@ -23,17 +23,9 @@ type BackendBlock struct {
// NewBackendBlock returns a BackendBlock for the given backend.BlockMeta
// It is version aware.
func NewBackendBlock(meta *backend.BlockMeta, r backend.Reader) (*BackendBlock, error) {
var encoding versionedEncoding

switch meta.Version {
case "v0":
encoding = v0Encoding{}
case "v1":
encoding = v1Encoding{}
case "v2":
encoding = v2Encoding{}
default:
return nil, fmt.Errorf("%s is not a valid block version", meta.Version)
encoding, err := FromVersion(meta.Version)
if err != nil {
return nil, err
}

return &BackendBlock{
Expand Down Expand Up @@ -76,20 +68,20 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) {
}

indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader)
indexReader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords))
indexReader, err := b.encoding.NewIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords))
if err != nil {
return nil, fmt.Errorf("error building index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

ra := backend.NewContextReader(b.meta, nameObjects, b.reader)
dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding)
dataReader, err := b.encoding.NewDataReader(ra, b.meta.Encoding)
if err != nil {
return nil, fmt.Errorf("error building page reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}
defer dataReader.Close()

// passing nil for objectCombiner here. this is fine b/c a backend block should never have dupes
finder := NewPagedFinder(indexReader, dataReader, nil, b.encoding.newObjectReaderWriter())
finder := NewPagedFinder(indexReader, dataReader, nil, b.encoding.NewObjectReaderWriter())
objectBytes, err := finder.Find(ctx, id)

if err != nil {
Expand All @@ -103,7 +95,7 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) {
func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) {
// read index
ra := backend.NewContextReader(b.meta, nameObjects, b.reader)
dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding)
dataReader, err := b.encoding.NewDataReader(ra, b.meta.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to create dataReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}
Expand All @@ -113,12 +105,12 @@ func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) {
return nil, err
}

return newPagedIterator(chunkSizeBytes, reader, dataReader, b.encoding.newObjectReaderWriter()), nil
return newPagedIterator(chunkSizeBytes, reader, dataReader, b.encoding.NewObjectReaderWriter()), nil
}

func (b *BackendBlock) NewIndexReader() (common.IndexReader, error) {
indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader)
reader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords))
reader, err := b.encoding.NewIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords))
if err != nil {
return nil, fmt.Errorf("failed to create index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}
Expand Down
7 changes: 7 additions & 0 deletions tempodb/encoding/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package common

import (
"context"
"fmt"
"io"
)

// This file contains types that need to be referenced by both the ./encoding and ./encoding/vX packages.
// It primarily exists here to break dependency loops.
var (
ErrUnsupported = fmt.Errorf("unsupported")
)

// ID in TempoDB
type ID []byte
Expand All @@ -31,6 +35,9 @@ type ObjectCombiner interface {
type DataReader interface {
Read(context.Context, []*Record) ([][]byte, error)
Close()

// NextPage can be used to iterate at a page at a time. May return ErrUnsupported for older formats
NextPage() ([]byte, error)
}

// IndexReader is used to abstract away the details of an index. Currently
Expand Down
21 changes: 12 additions & 9 deletions tempodb/encoding/iterator_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,27 @@ package encoding
import (
"bytes"
"context"
"io"
"errors"

"github.com/grafana/tempo/tempodb/encoding/common"
)

type recordIterator struct {
records []*common.Record
ra io.ReaderAt
records []*common.Record

objectRW common.ObjectReaderWriter
dataR common.DataReader

currentIterator Iterator
}

// NewRecordIterator returns a recordIterator. This iterator is used for iterating through
// a series of objects by reading them one at a time from Records.
func NewRecordIterator(r []*common.Record, ra io.ReaderAt, objectRW common.ObjectReaderWriter) Iterator {
func NewRecordIterator(r []*common.Record, dataR common.DataReader, objectRW common.ObjectReaderWriter) Iterator {
return &recordIterator{
records: r,
ra: ra,
objectRW: objectRW,
dataR: dataR,
}
}

Expand All @@ -39,14 +40,15 @@ func (i *recordIterator) Next(ctx context.Context) (common.ID, []byte, error) {

// read the next record and create an iterator
if len(i.records) > 0 {
record := i.records[0]

buff := make([]byte, record.Length)
_, err := i.ra.ReadAt(buff, int64(record.Start))
pages, err := i.dataR.Read(ctx, i.records[:1])
if err != nil {
return nil, nil, err
}
if len(pages) == 0 {
return nil, nil, errors.New("unexpected 0 length pages from dataReader")
}

buff := pages[0]
i.currentIterator = NewIterator(bytes.NewReader(buff), i.objectRW)
i.records = i.records[1:]

Expand All @@ -58,4 +60,5 @@ func (i *recordIterator) Next(ctx context.Context) (common.ID, []byte, error) {
}

func (i *recordIterator) Close() {
i.dataR.Close()
}
44 changes: 44 additions & 0 deletions tempodb/encoding/iterator_wal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package encoding

import (
"context"
"io"

"github.com/grafana/tempo/tempodb/encoding/common"
)

type walIterator struct {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
o common.ObjectReaderWriter
d common.DataReader

currentPage []byte
}

// NewWALIterator iterates over pages from a datareader directly like one would for the WAL
func NewWALIterator(d common.DataReader, o common.ObjectReaderWriter) Iterator {
return &walIterator{
o: o,
d: d,
}
}

func (i *walIterator) Next(_ context.Context) (common.ID, []byte, error) {
var (
id common.ID
obj []byte
err error
)
i.currentPage, id, obj, err = i.o.UnmarshalAndAdvanceBuffer(i.currentPage)
if err == io.EOF {
i.currentPage, err = i.d.NextPage()
if err != nil {
return nil, nil, err
}
i.currentPage, id, obj, err = i.o.UnmarshalAndAdvanceBuffer(i.currentPage)
}
return id, obj, err
}

func (i *walIterator) Close() {
i.d.Close()
}
Loading