Skip to content

Commit

Permalink
Add shouldCache param to backend interface read/write methods (#818)
Browse files Browse the repository at this point in the history
* Clean backend and raw interfaces, add shouldCache bool param

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Azure

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* GCS

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* local

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Migrate more backends to new interface

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Revert changes to backend.Reader/Writer, move shouldCache() into cache pkg

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Fix tests, lint

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Revert "Revert changes to backend.Reader/Writer, move shouldCache() into cache pkg"

This reverts commit b81e23b.

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Tidy everything up

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Address comments

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Fix test names

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Address comments

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Lint

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Don't error in ReadAllWithEstimate if estimated is negative

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

Co-authored-by: Martin Disibio <mdisibio@gmail.com>
  • Loading branch information
annanay25 and mdisibio committed Aug 3, 2021
1 parent 6e8cec9 commit a0975ac
Show file tree
Hide file tree
Showing 19 changed files with 164 additions and 197 deletions.
6 changes: 4 additions & 2 deletions pkg/io/read.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io

import "io"
import (
"io"
)

// ReadAllWithEstimate is a fork of https://go.googlesource.com/go/+/go1.16.3/src/io/io.go#626
// with a starting buffer size. if none is provided it uses the existing default of 512
func ReadAllWithEstimate(r io.Reader, estimatedBytes int64) ([]byte, error) {
if estimatedBytes == 0 {
if estimatedBytes <= 0 {
estimatedBytes = 512
}

Expand Down
20 changes: 6 additions & 14 deletions tempodb/backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/base64"
"encoding/binary"
"io"
"io/ioutil"
"path"
"strings"

Expand Down Expand Up @@ -58,12 +59,7 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor,
}

// Write implements backend.Writer
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, buffer []byte) error {
return rw.StreamWriter(ctx, name, keypath, bytes.NewBuffer(buffer), int64(len(buffer)))
}

// StreamWriter implements backend.Writer
func (rw *readerWriter) StreamWriter(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64) error {
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
return rw.writer(ctx, bufio.NewReader(data), backend.ObjectFileName(keypath, name))
}

Expand Down Expand Up @@ -128,21 +124,17 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st
}

// Read implements backend.Reader
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) ([]byte, error) {
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "Read")
defer span.Finish()

object := backend.ObjectFileName(keypath, name)
bytes, err := rw.readAll(derivedCtx, object)
b, err := rw.readAll(derivedCtx, object)
if err != nil {
return nil, readError(err)
return nil, 0, readError(err)
}

return bytes, nil
}

func (rw *readerWriter) StreamReader(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) {
panic("StreamReader is not yet supported for Azure backend")
return ioutil.NopCloser(bytes.NewReader(b)), int64(len(b)), nil
}

// ReadRange implements backend.Reader
Expand Down
7 changes: 4 additions & 3 deletions tempodb/backend/azure/azure_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package azure

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -56,12 +57,12 @@ func TestHedge(t *testing.T) {

// the first call on each client initiates an extra http request
// clearing that here
_, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"))
_, _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), false)
time.Sleep(tc.returnIn)
atomic.StoreInt32(&count, 0)

// calls that should hedge
_, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"))
_, _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), false)
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests*2, atomic.LoadInt32(&count)) // *2 b/c reads execute a HEAD and GET
atomic.StoreInt32(&count, 0)
Expand All @@ -77,7 +78,7 @@ func TestHedge(t *testing.T) {
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.Write(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), make([]byte, 10))
_ = w.Write(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), bytes.NewReader(make([]byte, 10)), 10, false)
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
})
Expand Down
8 changes: 4 additions & 4 deletions tempodb/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type AppendTracker interface{}

// Writer is a collection of methods to write data to tempodb backends
type Writer interface {
// Write is for in memory data. It is expected that this data will be cached.
Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error
// Write is for in memory data. shouldCache specifies whether or not caching should be attempted.
Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte, shouldCache bool) error
// StreamWriter is for larger data payloads streamed through an io.Reader. It is expected this will _not_ be cached.
StreamWriter(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64) error
// WriteBlockMeta writes a block meta to its blocks
Expand All @@ -33,8 +33,8 @@ type Writer interface {

// Reader is a collection of methods to read data from tempodb backends
type Reader interface {
// Reader is for reading entire objects from the backend. It is expected that there will be an attempt to retrieve this from cache
Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error)
// Reader is for reading entire objects from the backend. There will be an attempt to retrieve this from cache if shouldCache is true.
Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) ([]byte, error)
// StreamReader is for streaming entire objects from the backend. It is expected this will _not_ be cached.
StreamReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error)
// ReadRange is for reading parts of large objects from the backend. It is expected this will _not_ be cached.
Expand Down
54 changes: 27 additions & 27 deletions tempodb/backend/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package cache

import (
"bytes"
"context"
"io"
"io/ioutil"
"strings"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/tempodb/backend"
)

Expand All @@ -25,57 +29,57 @@ func NewCache(nextReader backend.RawReader, nextWriter backend.RawWriter, cache
return rw, rw, nil
}

// List implements backend.Reader
// List implements backend.RawReader
func (r *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]string, error) {
return r.nextReader.List(ctx, keypath)
}

// Read implements backend.Reader
func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) ([]byte, error) {
// Read implements backend.RawReader
func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, shouldCache bool) (io.ReadCloser, int64, error) {
var k string
if shouldCache(name) {
if shouldCache {
k = key(keypath, name)
found, vals, _ := r.cache.Fetch(ctx, []string{k})
if len(found) > 0 {
return vals[0], nil
return ioutil.NopCloser(bytes.NewReader(vals[0])), int64(len(vals[0])), nil
}
}

val, err := r.nextReader.Read(ctx, name, keypath)
if err == nil && shouldCache(name) {
r.cache.Store(ctx, []string{k}, [][]byte{val})
object, size, err := r.nextReader.Read(ctx, name, keypath, false)
if err != nil {
return nil, 0, err
}

return val, err
}
b, err := tempo_io.ReadAllWithEstimate(object, size)
if err == nil && shouldCache {
r.cache.Store(ctx, []string{k}, [][]byte{b})
}

func (r *readerWriter) StreamReader(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) {
panic("StreamReader is not yet supported for cache")
return ioutil.NopCloser(bytes.NewReader(b)), size, err
}

// ReadRange implements backend.Reader
// ReadRange implements backend.RawReader
func (r *readerWriter) ReadRange(ctx context.Context, name string, keypath backend.KeyPath, offset uint64, buffer []byte) error {
return r.nextReader.ReadRange(ctx, name, keypath, offset, buffer)
}

// Shutdown implements backend.Reader
// Shutdown implements backend.RawReader
func (r *readerWriter) Shutdown() {
r.nextReader.Shutdown()
r.cache.Stop()
}

// Write implements backend.Writer
func (r *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, buffer []byte) error {
if shouldCache(name) {
r.cache.Store(ctx, []string{key(keypath, name)}, [][]byte{buffer})
func (r *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64, shouldCache bool) error {
b, err := tempo_io.ReadAllWithEstimate(data, size)
if err != nil {
return err
}

return r.nextWriter.Write(ctx, name, keypath, buffer)
}

// Write implements backend.Writer
func (r *readerWriter) StreamWriter(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64) error {
return r.nextWriter.StreamWriter(ctx, name, keypath, data, size)
if shouldCache {
r.cache.Store(ctx, []string{key(keypath, name)}, [][]byte{b})
}
return r.nextWriter.Write(ctx, name, keypath, bytes.NewReader(b), int64(len(b)), false)
}

// Append implements backend.Writer
Expand All @@ -91,7 +95,3 @@ func (r *readerWriter) CloseAppend(ctx context.Context, tracker backend.AppendTr
func key(keypath backend.KeyPath, name string) string {
return strings.Join(keypath, ":") + ":" + name
}

func shouldCache(name string) bool {
return name != backend.MetaName && name != backend.CompactedMetaName && name != backend.BlockIndexName
}
40 changes: 18 additions & 22 deletions tempodb/backend/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package cache

import (
"bytes"
"context"
"io/ioutil"
"testing"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
Expand Down Expand Up @@ -46,31 +48,22 @@ func TestReadWrite(t *testing.T) {
name string
readerRead []byte
readerName string
shouldCache bool
expectedRead []byte
expectedCache []byte
}{
{
name: "read",
readerName: "test-thing",
name: "should cache",
readerName: "foo",
readerRead: []byte{0x02},
shouldCache: true,
expectedRead: []byte{0x02},
expectedCache: []byte{0x02},
},
{
name: "block meta",
readerName: "meta.json",
readerRead: []byte{0x02},
expectedRead: []byte{0x02},
},
{
name: "compacted block meta",
readerName: "meta.compacted.json",
readerRead: []byte{0x02},
expectedRead: []byte{0x02},
},
{
name: "block index",
readerName: "blockindex.json.gz",
name: "should not cache",
readerName: "bar",
shouldCache: false,
readerRead: []byte{0x02},
expectedRead: []byte{0x02},
},
Expand All @@ -87,20 +80,23 @@ func TestReadWrite(t *testing.T) {
r, _, _ := NewCache(mockR, mockW, NewMockClient())

ctx := context.Background()
read, _ := r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID))
reader, _, _ := r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), tt.shouldCache)
read, _ := ioutil.ReadAll(reader)
assert.Equal(t, tt.expectedRead, read)

// clear reader and re-request
mockR.R = nil

read, _ = r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID))
assert.Equal(t, tt.expectedCache, read)
reader, _, _ = r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), tt.shouldCache)
read, _ = ioutil.ReadAll(reader)
assert.Equal(t, len(tt.expectedCache), len(read))

// WRITE
_, w, _ := NewCache(mockR, mockW, NewMockClient())
_ = w.Write(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), tt.readerRead)
read, _ = r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID))
assert.Equal(t, tt.expectedCache, read)
_ = w.Write(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(tt.readerRead), int64(len(tt.readerRead)), tt.shouldCache)
reader, _, _ = r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), tt.shouldCache)
read, _ = ioutil.ReadAll(reader)
assert.Equal(t, len(tt.expectedCache), len(read))
})
}
}
Expand Down
18 changes: 5 additions & 13 deletions tempodb/backend/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/tls"
"io"
"io/ioutil"
"net/http"
"path"
"strings"
Expand Down Expand Up @@ -59,13 +60,8 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor,
return rw, rw, rw, nil
}

// Write implements backend.Writer
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, buffer []byte) error {
return rw.StreamWriter(ctx, name, keypath, bytes.NewBuffer(buffer), int64(len(buffer)))
}

// StreamWriter implements backend.Writer
func (rw *readerWriter) StreamWriter(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64) error {
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
w := rw.writer(ctx, backend.ObjectFileName(keypath, name))
_, err := io.Copy(w, data)
if err != nil {
Expand Down Expand Up @@ -133,21 +129,17 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st
}

// Read implements backend.Reader
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) ([]byte, error) {
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.Read")
defer span.Finish()

span.SetTag("object", name)

bytes, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name))
b, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name))
if err != nil {
span.SetTag("error", true)
}
return bytes, readError(err)
}

func (rw *readerWriter) StreamReader(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) {
panic("StreamReader is not yet supported for GCS backend")
return ioutil.NopCloser(bytes.NewReader(b)), int64(len(b)), readError(err)
}

// ReadRange implements backend.Reader
Expand Down
7 changes: 4 additions & 3 deletions tempodb/backend/gcs/gcs_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gcs

import (
"bytes"
"context"
"fmt"
"net/http"
Expand Down Expand Up @@ -56,12 +57,12 @@ func TestHedge(t *testing.T) {

// the first call on each client initiates an extra http request
// clearing that here
_, _ = r.Read(ctx, "object", []string{"test"})
_, _, _ = r.Read(ctx, "object", []string{"test"}, false)
time.Sleep(tc.returnIn)
atomic.StoreInt32(&count, 0)

// calls that should hedge
_, _ = r.Read(ctx, "object", []string{"test"})
_, _, _ = r.Read(ctx, "object", []string{"test"}, false)
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests, atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
Expand All @@ -76,7 +77,7 @@ func TestHedge(t *testing.T) {
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.Write(ctx, "object", []string{"test"}, []byte{})
_ = w.Write(ctx, "object", []string{"test"}, bytes.NewReader([]byte{}), 0, false)
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
})
Expand Down
Loading

0 comments on commit a0975ac

Please sign in to comment.