Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Redis caching #354

Merged
merged 17 commits into from
Nov 25, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [ENHANCEMENT] Add tempodb_compaction_bytes_written metric. [#360](https://github.com/grafana/tempo/pull/360)
* [ENHANCEMENT] Add tempodb_compaction_blocks_total metric. [#360](https://github.com/grafana/tempo/pull/360)
* [ENHANCEMENT] Add support for S3 V2 signatures. [#352](https://github.com/grafana/tempo/pull/352)
* [ENHANCEMENT] Add support for Redis caching. [#354](https://github.com/grafana/tempo/pull/354)
* [BUGFIX] Frequent errors logged by compactor regarding meta not found [#327](https://github.com/grafana/tempo/pull/327)
* [BUGFIX] Fix distributors panicking on rollout [#343](https://github.com/grafana/tempo/pull/343)
* [BUGFIX] Fix ingesters occassionally double flushing [#364](https://github.com/grafana/tempo/pull/364)
Expand Down
6 changes: 5 additions & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ compactor:
```

## [Storage](https://github.com/grafana/tempo/blob/master/tempodb/config.go)
The storage block is used to configure TempoDB. It supports S3, GCS, local file system, and optionally can use memcached for increased query performance.
The storage block is used to configure TempoDB. It supports S3, GCS, local file system, and optionally can use Memcached or Redis for increased query performance.

The following example shows common options. For platform-specific options refer to the following:
* [S3](s3/)
* [Redis](redis/)

```
storage:
Expand All @@ -86,6 +87,9 @@ storage:
host: memcached
service: memcached-client
timeout: 500ms
redis: # optional redis configuration
endpoint: redis
timeout: 500ms
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
pool: # the worker pool is used primarily when finding traces by id, but is also used by other
max_workers: 50 # total number of workers pulling jobs from the queue
queue_depth: 2000 # length of job queue
Expand Down
23 changes: 23 additions & 0 deletions docs/tempo/website/configuration/redis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
title: Redis
---

# Redis Configuration
Redis caching is configured in the storage block.

```
storage:
trace:
redis:
endpoint: redis # redis endpoint to use when caching.
timeout: 500ms # optional. maximum time to wait before giving up on redis requests. (default 100ms)
master-name: redis-master # optional. redis Sentinel master name. (default "")
db: 0 # optional. database index. (default 0)
expiration: 0s # optional. how long keys stay in the redis. (default 0)
tls-enabled: false # optional. enable connecting to redis with TLS. (default false)
tls-insecure-skip-verify: false # optional. skip validating server certificate. (default false)
pool-size: 0 # optional. maximum number of connections in the pool. (default 0)
password: ... # optional. password to use when connecting to redis. (default "")
idle-timeout: 0s # optional. close connections after remaining idle for this duration. (default 0s)
max-connection-age: 0s # optional. close connections older than this duration. (default 0s)
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.15
require (
cloud.google.com/go/storage v1.6.0
contrib.go.opencensus.io/exporter/prometheus v0.2.0
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cortexproject/cortex v1.4.0
github.com/go-kit/kit v0.10.0
Expand Down
147 changes: 147 additions & 0 deletions tempodb/backend/redis/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package redis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a ton of code that's pretty much the same as memcached/cache.go .. This can get tough to maintain. I would suggest we split out the cache section into a separate module and pass it a Cache interface which looks something like this

type Cache interface {
    func get(ctx context.Context, key string) []byte
    func set(ctx context.Context, key string, val []byte)
}

The different cache backends can then implement this small interface instead of the entire backend.Reader / backend.Writer interfaces. It will also help to keep all the tests in one place.

Copy link
Member Author

@dgzlopes dgzlopes Nov 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diskcache should live inside this separate module too? Probably not, I suppose 😕

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diskcache should also live there, but if that turns out to be tough we can always address it in a different PR :)


import (
"context"
"strconv"
"time"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/go-kit/kit/log"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"

"github.com/google/uuid"
)

const (
typeBloom = "bloom"
typeIndex = "index"
)

type Config struct {
ClientConfig cache.RedisConfig `yaml:",inline"`

TTL time.Duration `yaml:"ttl"`
}

type readerWriter struct {
nextReader backend.Reader
nextWriter backend.Writer
client *cache.RedisCache
logger log.Logger
}

func New(nextReader backend.Reader, nextWriter backend.Writer, cfg *Config, logger log.Logger) (backend.Reader, backend.Writer, error) {
if cfg.ClientConfig.Timeout == 0 {
cfg.ClientConfig.Timeout = 100 * time.Millisecond
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
}
if cfg.ClientConfig.Expiration == 0 {
cfg.ClientConfig.Expiration = cfg.TTL
}

client := cache.NewRedisClient(&cfg.ClientConfig)

rw := &readerWriter{
client: cache.NewRedisCache("tempo", client, logger),
nextReader: nextReader,
nextWriter: nextWriter,
logger: logger,
}

return rw, rw, nil
}

// Reader
func (r *readerWriter) Tenants(ctx context.Context) ([]string, error) {
return r.nextReader.Tenants(ctx)
}

func (r *readerWriter) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error) {
return r.nextReader.Blocks(ctx, tenantID)
}

func (r *readerWriter) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*encoding.BlockMeta, error) {
return r.nextReader.BlockMeta(ctx, blockID, tenantID)
}

func (r *readerWriter) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, shardNum int) ([]byte, error) {
key := bloomKey(blockID, tenantID, shardNum)
val := r.get(ctx, key)
if val != nil {
return val, nil
}

val, err := r.nextReader.Bloom(ctx, blockID, tenantID, shardNum)
if err == nil {
r.set(ctx, key, val)
}

return val, err
}

func (r *readerWriter) Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
key := key(blockID, tenantID)
val := r.get(ctx, key)
if val != nil {
return val, nil
}

val, err := r.nextReader.Index(ctx, blockID, tenantID)
if err == nil {
r.set(ctx, key, val)
}

return val, err
}

func (r *readerWriter) Object(ctx context.Context, blockID uuid.UUID, tenantID string, start uint64, buffer []byte) error {
return r.nextReader.Object(ctx, blockID, tenantID, start, buffer)
}

func (r *readerWriter) Shutdown() {
r.nextReader.Shutdown()
r.client.Stop()
}

// Writer
func (r *readerWriter) Write(ctx context.Context, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte, objectFilePath string) error {
for i, b := range bBloom {
r.set(ctx, bloomKey(meta.BlockID, meta.TenantID, i), b)
}
r.set(ctx, key(meta.BlockID, meta.TenantID), bIndex)

return r.nextWriter.Write(ctx, meta, bBloom, bIndex, objectFilePath)
}

func (r *readerWriter) WriteBlockMeta(ctx context.Context, tracker backend.AppendTracker, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte) error {
for i, b := range bBloom {
r.set(ctx, bloomKey(meta.BlockID, meta.TenantID, i), b)
}
r.set(ctx, key(meta.BlockID, meta.TenantID), bIndex)

return r.nextWriter.WriteBlockMeta(ctx, tracker, meta, bBloom, bIndex)
}

func (r *readerWriter) AppendObject(ctx context.Context, tracker backend.AppendTracker, meta *encoding.BlockMeta, bObject []byte) (backend.AppendTracker, error) {
return r.nextWriter.AppendObject(ctx, tracker, meta, bObject)
}

func (r *readerWriter) get(ctx context.Context, key string) []byte {
found, vals, _ := r.client.Fetch(ctx, []string{key})
if len(found) > 0 {
return vals[0]
}
return nil
}

func (r *readerWriter) set(ctx context.Context, key string, val []byte) {
r.client.Store(ctx, []string{key}, [][]byte{val})
}

func key(blockID uuid.UUID, tenantID string) string {
return blockID.String() + ":" + tenantID + ":" + typeIndex
}

func bloomKey(blockID uuid.UUID, tenantID string, shardNum int) string {
return blockID.String() + ":" + tenantID + ":" + typeBloom + strconv.Itoa(shardNum)
}
169 changes: 169 additions & 0 deletions tempodb/backend/redis/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package redis

import (
"context"
"testing"

"github.com/alicebob/miniredis"
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/go-kit/kit/log"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/stretchr/testify/assert"
)

type mockReader struct {
tenants []string
blocks []uuid.UUID
meta *encoding.BlockMeta
bloom []byte
index []byte
object []byte
}

func (m *mockReader) Tenants(ctx context.Context) ([]string, error) {
return m.tenants, nil
}
func (m *mockReader) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error) {
return m.blocks, nil
}
func (m *mockReader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*encoding.BlockMeta, error) {
return m.meta, nil
}
func (m *mockReader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, shardNum int) ([]byte, error) {
return m.bloom, nil
}
func (m *mockReader) Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
return m.index, nil
}
func (m *mockReader) Object(ctx context.Context, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error {
copy(buffer, m.object)
return nil
}
func (m *mockReader) Shutdown() {}

type mockWriter struct {
}

func (m *mockWriter) Write(ctx context.Context, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte, objectFilePath string) error {
return nil
}
func (m *mockWriter) WriteBlockMeta(ctx context.Context, tracker backend.AppendTracker, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte) error {
return nil
}
func (m *mockWriter) AppendObject(ctx context.Context, tracker backend.AppendTracker, meta *encoding.BlockMeta, bObject []byte) (backend.AppendTracker, error) {
return nil, nil
}

func TestCache(t *testing.T) {
tenantID := "test"
blockID := uuid.New()
shardNum := 0

tests := []struct {
name string
readerTenants []string
readerBlocks []uuid.UUID
readerMeta *encoding.BlockMeta
readerBloom []byte
readerIndex []byte
readerObject []byte
expectedTenants []string
expectedBlocks []uuid.UUID
expectedMeta *encoding.BlockMeta
expectedBloom []byte
expectedIndex []byte
expectedObject []byte
}{
{
name: "tenants passthrough",
expectedTenants: []string{"1"},
readerTenants: []string{"1"},
},
{
name: "blocks passthrough",
expectedBlocks: []uuid.UUID{blockID},
readerBlocks: []uuid.UUID{blockID},
},
{
name: "index",
expectedIndex: []byte{0x01},
readerIndex: []byte{0x01},
},
{
name: "bloom",
expectedBloom: []byte{0x02},
readerBloom: []byte{0x02},
},
{
name: "object",
expectedObject: []byte{0x03},
readerObject: []byte{0x03},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockR := &mockReader{
tenants: tt.readerTenants,
blocks: tt.readerBlocks,
meta: tt.readerMeta,
bloom: tt.readerBloom,
index: tt.readerIndex,
object: tt.readerObject,
}
mockW := &mockWriter{}
mr, _ := miniredis.Run()
Copy link
Contributor

@annanay25 annanay25 Nov 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple map cache can be implemented like in memcached tests but I saw Cortex is using this too, so OK.


mockC := cache.NewRedisClient(&cache.RedisConfig{
Endpoint: mr.Addr(),
})
logger := log.NewNopLogger()
rw := &readerWriter{
client: cache.NewRedisCache("tempo", mockC, logger),
nextReader: mockR,
nextWriter: mockW,
logger: logger,
}

ctx := context.Background()
tenants, _ := rw.Tenants(ctx)
assert.Equal(t, tt.expectedTenants, tenants)
blocks, _ := rw.Blocks(ctx, tenantID)
assert.Equal(t, tt.expectedBlocks, blocks)
meta, _ := rw.BlockMeta(ctx, blockID, tenantID)
assert.Equal(t, tt.expectedMeta, meta)
bloom, _ := rw.Bloom(ctx, blockID, tenantID, shardNum)
assert.Equal(t, tt.expectedBloom, bloom)
index, _ := rw.Index(ctx, blockID, tenantID)
assert.Equal(t, tt.expectedIndex, index)

if tt.expectedObject != nil {
object := make([]byte, 1)
_ = rw.Object(ctx, blockID, tenantID, 0, object)
assert.Equal(t, tt.expectedObject, object)
}

// clear reader and re-request. things should be cached!
mockR.bloom = nil
mockR.index = nil
mockR.tenants = nil
mockR.blocks = nil
mockR.meta = nil

bloom, _ = rw.Bloom(ctx, blockID, tenantID, shardNum)
assert.Equal(t, tt.expectedBloom, bloom)
index, _ = rw.Index(ctx, blockID, tenantID)
assert.Equal(t, tt.expectedIndex, index)

// others should be nil
tenants, _ = rw.Tenants(ctx)
assert.Nil(t, tenants)
blocks, _ = rw.Blocks(ctx, tenantID)
assert.Nil(t, blocks)
meta, _ = rw.BlockMeta(ctx, blockID, tenantID)
assert.Nil(t, tt.expectedMeta, meta)
})
}
}
2 changes: 2 additions & 0 deletions tempodb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/grafana/tempo/tempodb/backend/gcs"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/backend/memcached"
"github.com/grafana/tempo/tempodb/backend/redis"
"github.com/grafana/tempo/tempodb/backend/s3"
"github.com/grafana/tempo/tempodb/pool"
"github.com/grafana/tempo/tempodb/wal"
Expand All @@ -22,6 +23,7 @@ type Config struct {

Diskcache *diskcache.Config `yaml:"disk_cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`

BlocklistPoll time.Duration `yaml:"blocklist_poll"`
}
Expand Down
Loading