diff --git a/pkg/io/read.go b/pkg/io/read.go index ae4caeb34fc..6b17a176d11 100644 --- a/pkg/io/read.go +++ b/pkg/io/read.go @@ -1,11 +1,13 @@ package io -import "io" +import ( + "io" +) // ReadAllWithEstimate is a fork of https://go.googlesource.com/go/+/go1.16.3/src/io/io.go#626 // with a starting buffer size. if none is provided it uses the existing default of 512 func ReadAllWithEstimate(r io.Reader, estimatedBytes int64) ([]byte, error) { - if estimatedBytes == 0 { + if estimatedBytes <= 0 { estimatedBytes = 512 } diff --git a/tempodb/backend/azure/azure.go b/tempodb/backend/azure/azure.go index d53af5983a3..9fe18912f87 100644 --- a/tempodb/backend/azure/azure.go +++ b/tempodb/backend/azure/azure.go @@ -7,6 +7,7 @@ import ( "encoding/base64" "encoding/binary" "io" + "io/ioutil" "path" "strings" @@ -58,12 +59,7 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, } // Write implements backend.Writer -func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, buffer []byte) error { - return rw.StreamWriter(ctx, name, keypath, bytes.NewBuffer(buffer), int64(len(buffer))) -} - -// StreamWriter implements backend.Writer -func (rw *readerWriter) StreamWriter(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64) error { +func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error { return rw.writer(ctx, bufio.NewReader(data), backend.ObjectFileName(keypath, name)) } @@ -128,21 +124,17 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st } // Read implements backend.Reader -func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) ([]byte, error) { +func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "Read") defer span.Finish() object := backend.ObjectFileName(keypath, name) - bytes, err := rw.readAll(derivedCtx, object) + b, err := rw.readAll(derivedCtx, object) if err != nil { - return nil, readError(err) + return nil, 0, readError(err) } - return bytes, nil -} - -func (rw *readerWriter) StreamReader(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) { - panic("StreamReader is not yet supported for Azure backend") + return ioutil.NopCloser(bytes.NewReader(b)), int64(len(b)), nil } // ReadRange implements backend.Reader diff --git a/tempodb/backend/azure/azure_test.go b/tempodb/backend/azure/azure_test.go index b873e110879..0f7fe7c1805 100644 --- a/tempodb/backend/azure/azure_test.go +++ b/tempodb/backend/azure/azure_test.go @@ -1,6 +1,7 @@ package azure import ( + "bytes" "context" "net/http" "net/http/httptest" @@ -56,12 +57,12 @@ func TestHedge(t *testing.T) { // the first call on each client initiates an extra http request // clearing that here - _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant")) + _, _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), false) time.Sleep(tc.returnIn) atomic.StoreInt32(&count, 0) // calls that should hedge - _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant")) + _, _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), false) time.Sleep(tc.returnIn) assert.Equal(t, tc.expectedHedgedRequests*2, atomic.LoadInt32(&count)) // *2 b/c reads execute a HEAD and GET atomic.StoreInt32(&count, 0) @@ -77,7 +78,7 @@ func TestHedge(t *testing.T) { assert.Equal(t, int32(1), atomic.LoadInt32(&count)) atomic.StoreInt32(&count, 0) - _ = w.Write(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), make([]byte, 10)) + _ = w.Write(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), bytes.NewReader(make([]byte, 10)), 10, false) assert.Equal(t, int32(1), atomic.LoadInt32(&count)) atomic.StoreInt32(&count, 0) }) diff --git a/tempodb/backend/backend.go b/tempodb/backend/backend.go index 605d6d3082e..53c30cb905e 100644 --- a/tempodb/backend/backend.go +++ b/tempodb/backend/backend.go @@ -19,8 +19,8 @@ type AppendTracker interface{} // Writer is a collection of methods to write data to tempodb backends type Writer interface { - // Write is for in memory data. It is expected that this data will be cached. - Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error + // Write is for in memory data. shouldCache specifies whether or not caching should be attempted. + Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte, shouldCache bool) error // StreamWriter is for larger data payloads streamed through an io.Reader. It is expected this will _not_ be cached. StreamWriter(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64) error // WriteBlockMeta writes a block meta to its blocks @@ -33,8 +33,8 @@ type Writer interface { // Reader is a collection of methods to read data from tempodb backends type Reader interface { - // Reader is for reading entire objects from the backend. It is expected that there will be an attempt to retrieve this from cache - Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) + // Reader is for reading entire objects from the backend. There will be an attempt to retrieve this from cache if shouldCache is true. + Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) ([]byte, error) // StreamReader is for streaming entire objects from the backend. It is expected this will _not_ be cached. StreamReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) // ReadRange is for reading parts of large objects from the backend. It is expected this will _not_ be cached. diff --git a/tempodb/backend/cache/cache.go b/tempodb/backend/cache/cache.go index 7a7f8fa6c65..1e0d4f71ce1 100644 --- a/tempodb/backend/cache/cache.go +++ b/tempodb/backend/cache/cache.go @@ -1,11 +1,15 @@ package cache import ( + "bytes" "context" "io" + "io/ioutil" "strings" cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache" + + tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/tempodb/backend" ) @@ -25,57 +29,57 @@ func NewCache(nextReader backend.RawReader, nextWriter backend.RawWriter, cache return rw, rw, nil } -// List implements backend.Reader +// List implements backend.RawReader func (r *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]string, error) { return r.nextReader.List(ctx, keypath) } -// Read implements backend.Reader -func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) ([]byte, error) { +// Read implements backend.RawReader +func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, shouldCache bool) (io.ReadCloser, int64, error) { var k string - if shouldCache(name) { + if shouldCache { k = key(keypath, name) found, vals, _ := r.cache.Fetch(ctx, []string{k}) if len(found) > 0 { - return vals[0], nil + return ioutil.NopCloser(bytes.NewReader(vals[0])), int64(len(vals[0])), nil } } - val, err := r.nextReader.Read(ctx, name, keypath) - if err == nil && shouldCache(name) { - r.cache.Store(ctx, []string{k}, [][]byte{val}) + object, size, err := r.nextReader.Read(ctx, name, keypath, false) + if err != nil { + return nil, 0, err } - return val, err -} + b, err := tempo_io.ReadAllWithEstimate(object, size) + if err == nil && shouldCache { + r.cache.Store(ctx, []string{k}, [][]byte{b}) + } -func (r *readerWriter) StreamReader(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) { - panic("StreamReader is not yet supported for cache") + return ioutil.NopCloser(bytes.NewReader(b)), size, err } -// ReadRange implements backend.Reader +// ReadRange implements backend.RawReader func (r *readerWriter) ReadRange(ctx context.Context, name string, keypath backend.KeyPath, offset uint64, buffer []byte) error { return r.nextReader.ReadRange(ctx, name, keypath, offset, buffer) } -// Shutdown implements backend.Reader +// Shutdown implements backend.RawReader func (r *readerWriter) Shutdown() { r.nextReader.Shutdown() r.cache.Stop() } // Write implements backend.Writer -func (r *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, buffer []byte) error { - if shouldCache(name) { - r.cache.Store(ctx, []string{key(keypath, name)}, [][]byte{buffer}) +func (r *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64, shouldCache bool) error { + b, err := tempo_io.ReadAllWithEstimate(data, size) + if err != nil { + return err } - return r.nextWriter.Write(ctx, name, keypath, buffer) -} - -// Write implements backend.Writer -func (r *readerWriter) StreamWriter(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64) error { - return r.nextWriter.StreamWriter(ctx, name, keypath, data, size) + if shouldCache { + r.cache.Store(ctx, []string{key(keypath, name)}, [][]byte{b}) + } + return r.nextWriter.Write(ctx, name, keypath, bytes.NewReader(b), int64(len(b)), false) } // Append implements backend.Writer @@ -91,7 +95,3 @@ func (r *readerWriter) CloseAppend(ctx context.Context, tracker backend.AppendTr func key(keypath backend.KeyPath, name string) string { return strings.Join(keypath, ":") + ":" + name } - -func shouldCache(name string) bool { - return name != backend.MetaName && name != backend.CompactedMetaName && name != backend.BlockIndexName -} diff --git a/tempodb/backend/cache/cache_test.go b/tempodb/backend/cache/cache_test.go index 3eaa3578d09..12e552e7449 100644 --- a/tempodb/backend/cache/cache_test.go +++ b/tempodb/backend/cache/cache_test.go @@ -1,7 +1,9 @@ package cache import ( + "bytes" "context" + "io/ioutil" "testing" cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache" @@ -46,31 +48,22 @@ func TestReadWrite(t *testing.T) { name string readerRead []byte readerName string + shouldCache bool expectedRead []byte expectedCache []byte }{ { - name: "read", - readerName: "test-thing", + name: "should cache", + readerName: "foo", readerRead: []byte{0x02}, + shouldCache: true, expectedRead: []byte{0x02}, expectedCache: []byte{0x02}, }, { - name: "block meta", - readerName: "meta.json", - readerRead: []byte{0x02}, - expectedRead: []byte{0x02}, - }, - { - name: "compacted block meta", - readerName: "meta.compacted.json", - readerRead: []byte{0x02}, - expectedRead: []byte{0x02}, - }, - { - name: "block index", - readerName: "blockindex.json.gz", + name: "should not cache", + readerName: "bar", + shouldCache: false, readerRead: []byte{0x02}, expectedRead: []byte{0x02}, }, @@ -87,20 +80,23 @@ func TestReadWrite(t *testing.T) { r, _, _ := NewCache(mockR, mockW, NewMockClient()) ctx := context.Background() - read, _ := r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID)) + reader, _, _ := r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), tt.shouldCache) + read, _ := ioutil.ReadAll(reader) assert.Equal(t, tt.expectedRead, read) // clear reader and re-request mockR.R = nil - read, _ = r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID)) - assert.Equal(t, tt.expectedCache, read) + reader, _, _ = r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), tt.shouldCache) + read, _ = ioutil.ReadAll(reader) + assert.Equal(t, len(tt.expectedCache), len(read)) // WRITE _, w, _ := NewCache(mockR, mockW, NewMockClient()) - _ = w.Write(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), tt.readerRead) - read, _ = r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID)) - assert.Equal(t, tt.expectedCache, read) + _ = w.Write(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(tt.readerRead), int64(len(tt.readerRead)), tt.shouldCache) + reader, _, _ = r.Read(ctx, tt.readerName, backend.KeyPathForBlock(blockID, tenantID), tt.shouldCache) + read, _ = ioutil.ReadAll(reader) + assert.Equal(t, len(tt.expectedCache), len(read)) }) } } diff --git a/tempodb/backend/gcs/gcs.go b/tempodb/backend/gcs/gcs.go index cebaa8d3bf7..2327809cd24 100644 --- a/tempodb/backend/gcs/gcs.go +++ b/tempodb/backend/gcs/gcs.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "io" + "io/ioutil" "net/http" "path" "strings" @@ -59,13 +60,8 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, return rw, rw, rw, nil } -// Write implements backend.Writer -func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, buffer []byte) error { - return rw.StreamWriter(ctx, name, keypath, bytes.NewBuffer(buffer), int64(len(buffer))) -} - // StreamWriter implements backend.Writer -func (rw *readerWriter) StreamWriter(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64) error { +func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error { w := rw.writer(ctx, backend.ObjectFileName(keypath, name)) _, err := io.Copy(w, data) if err != nil { @@ -133,21 +129,17 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st } // Read implements backend.Reader -func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) ([]byte, error) { +func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.Read") defer span.Finish() span.SetTag("object", name) - bytes, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name)) + b, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name)) if err != nil { span.SetTag("error", true) } - return bytes, readError(err) -} - -func (rw *readerWriter) StreamReader(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) { - panic("StreamReader is not yet supported for GCS backend") + return ioutil.NopCloser(bytes.NewReader(b)), int64(len(b)), readError(err) } // ReadRange implements backend.Reader diff --git a/tempodb/backend/gcs/gcs_test.go b/tempodb/backend/gcs/gcs_test.go index 50c6b47b035..f1943318843 100644 --- a/tempodb/backend/gcs/gcs_test.go +++ b/tempodb/backend/gcs/gcs_test.go @@ -1,6 +1,7 @@ package gcs import ( + "bytes" "context" "fmt" "net/http" @@ -56,12 +57,12 @@ func TestHedge(t *testing.T) { // the first call on each client initiates an extra http request // clearing that here - _, _ = r.Read(ctx, "object", []string{"test"}) + _, _, _ = r.Read(ctx, "object", []string{"test"}, false) time.Sleep(tc.returnIn) atomic.StoreInt32(&count, 0) // calls that should hedge - _, _ = r.Read(ctx, "object", []string{"test"}) + _, _, _ = r.Read(ctx, "object", []string{"test"}, false) time.Sleep(tc.returnIn) assert.Equal(t, tc.expectedHedgedRequests, atomic.LoadInt32(&count)) atomic.StoreInt32(&count, 0) @@ -76,7 +77,7 @@ func TestHedge(t *testing.T) { assert.Equal(t, int32(1), atomic.LoadInt32(&count)) atomic.StoreInt32(&count, 0) - _ = w.Write(ctx, "object", []string{"test"}, []byte{}) + _ = w.Write(ctx, "object", []string{"test"}, bytes.NewReader([]byte{}), 0, false) assert.Equal(t, int32(1), atomic.LoadInt32(&count)) atomic.StoreInt32(&count, 0) }) diff --git a/tempodb/backend/local/local.go b/tempodb/backend/local/local.go index 10f0f96f2bb..cda39b45645 100644 --- a/tempodb/backend/local/local.go +++ b/tempodb/backend/local/local.go @@ -1,7 +1,6 @@ package local import ( - "bytes" "context" "io" "io/ioutil" @@ -39,12 +38,7 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, } // Write implements backend.Writer -func (rw *Backend) Write(ctx context.Context, name string, keypath backend.KeyPath, buffer []byte) error { - return rw.StreamWriter(ctx, name, keypath, bytes.NewBuffer(buffer), int64(len(buffer))) -} - -// StreamWriter implements backend.Writer -func (rw *Backend) StreamWriter(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64) error { +func (rw *Backend) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error { blockFolder := rw.rootPath(keypath) err := os.MkdirAll(blockFolder, os.ModePerm) if err != nil { @@ -122,14 +116,20 @@ func (rw *Backend) List(ctx context.Context, keypath backend.KeyPath) ([]string, } // Read implements backend.Reader -func (rw *Backend) Read(ctx context.Context, name string, keypath backend.KeyPath) ([]byte, error) { +func (rw *Backend) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { filename := rw.objectFileName(keypath, name) - bytes, err := ioutil.ReadFile(filename) + + f, err := os.OpenFile(filename, os.O_RDONLY, 0644) if err != nil { - return nil, readError(err) + return nil, -1, readError(err) } - return bytes, nil + stat, err := f.Stat() + if err != nil { + return nil, -1, err + } + + return f, stat.Size(), err } // ReadRange implements backend.Reader @@ -150,22 +150,6 @@ func (rw *Backend) ReadRange(ctx context.Context, name string, keypath backend.K return nil } -func (rw *Backend) StreamReader(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) { - filename := rw.objectFileName(keypath, name) - - f, err := os.OpenFile(filename, os.O_RDONLY, 0644) - if err != nil { - return nil, -1, readError(err) - } - - stat, err := f.Stat() - if err != nil { - return nil, -1, err - } - - return f, stat.Size(), err -} - // Shutdown implements backend.Reader func (rw *Backend) Shutdown() { diff --git a/tempodb/backend/local/local_test.go b/tempodb/backend/local/local_test.go index 5d05d1982e8..3c85a8cc5f9 100644 --- a/tempodb/backend/local/local_test.go +++ b/tempodb/backend/local/local_test.go @@ -9,6 +9,8 @@ import ( "os" "testing" + "github.com/grafana/tempo/pkg/io" + "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -16,7 +18,6 @@ import ( ) const objectName = "test" -const objectReaderName = "test-reader" func TestReadWrite(t *testing.T) { tempDir, err := ioutil.TempDir("/tmp", "") @@ -51,18 +52,18 @@ func TestReadWrite(t *testing.T) { ctx := context.Background() for _, id := range tenantIDs { fakeMeta.TenantID = id - err = w.Write(ctx, objectName, backend.KeyPathForBlock(fakeMeta.BlockID, id), fakeObject) + err = w.Write(ctx, objectName, backend.KeyPathForBlock(fakeMeta.BlockID, id), bytes.NewReader(fakeObject), int64(len(fakeObject)), false) assert.NoError(t, err, "unexpected error writing") - err = w.StreamWriter(ctx, objectReaderName, backend.KeyPathForBlock(fakeMeta.BlockID, id), bytes.NewBuffer(fakeObject), int64(len(fakeObject))) - assert.NoError(t, err, "unexpected error writing reader") } - actualObject, err := r.Read(ctx, objectName, backend.KeyPathForBlock(blockID, tenantIDs[0])) + actualObject, size, err := r.Read(ctx, objectName, backend.KeyPathForBlock(blockID, tenantIDs[0]), false) + assert.NoError(t, err, "unexpected error reading") + actualObjectBytes, err := io.ReadAllWithEstimate(actualObject, size) assert.NoError(t, err, "unexpected error reading") - assert.Equal(t, fakeObject, actualObject) + assert.Equal(t, fakeObject, actualObjectBytes) actualReadRange := make([]byte, 5) - err = r.ReadRange(ctx, objectReaderName, backend.KeyPathForBlock(blockID, tenantIDs[0]), 5, actualReadRange) + err = r.ReadRange(ctx, objectName, backend.KeyPathForBlock(blockID, tenantIDs[0]), 5, actualReadRange) assert.NoError(t, err, "unexpected error range") assert.Equal(t, fakeObject[5:10], actualReadRange) diff --git a/tempodb/backend/mocks.go b/tempodb/backend/mocks.go index 343d58bf9fa..7302384fa20 100644 --- a/tempodb/backend/mocks.go +++ b/tempodb/backend/mocks.go @@ -1,10 +1,13 @@ package backend import ( + "bytes" "context" "io" "io/ioutil" + tempo_io "github.com/grafana/tempo/pkg/io" + "github.com/google/uuid" ) @@ -18,7 +21,7 @@ type MockRawReader struct { ListFn func(ctx context.Context, keypath KeyPath) ([]string, error) R []byte // read Range []byte // ReadRange - ReadFn func(ctx context.Context, name string, keypath KeyPath) ([]byte, error) + ReadFn func(ctx context.Context, name string, keypath KeyPath, shouldCache bool) (io.ReadCloser, int64, error) } func (m *MockRawReader) List(ctx context.Context, keypath KeyPath) ([]string, error) { @@ -28,15 +31,12 @@ func (m *MockRawReader) List(ctx context.Context, keypath KeyPath) ([]string, er return m.L, nil } -func (m *MockRawReader) Read(ctx context.Context, name string, keypath KeyPath) ([]byte, error) { +func (m *MockRawReader) Read(ctx context.Context, name string, keypath KeyPath, shouldCache bool) (io.ReadCloser, int64, error) { if m.ReadFn != nil { - return m.ReadFn(ctx, name, keypath) + return m.ReadFn(ctx, name, keypath, shouldCache) } - return m.R, nil -} -func (m *MockRawReader) StreamReader(ctx context.Context, name string, keypath KeyPath) (io.ReadCloser, int64, error) { - panic("StreamReader is not yet supported for mock reader") + return ioutil.NopCloser(bytes.NewReader(m.R)), int64(len(m.R)), nil } func (m *MockRawReader) ReadRange(ctx context.Context, name string, keypath KeyPath, offset uint64, buffer []byte) error { copy(buffer, m.Range) @@ -48,18 +48,14 @@ func (m *MockRawReader) Shutdown() {} // MockRawWriter type MockRawWriter struct { writeBuffer []byte - writeStreamBuffer []byte appendBuffer []byte closeAppendCalled bool } -func (m *MockRawWriter) Write(ctx context.Context, name string, keypath KeyPath, buffer []byte) error { - m.writeBuffer = buffer - return nil -} -func (m *MockRawWriter) StreamWriter(ctx context.Context, name string, keypath KeyPath, data io.Reader, size int64) error { - m.writeStreamBuffer, _ = ioutil.ReadAll(data) - return nil +func (m *MockRawWriter) Write(ctx context.Context, name string, keypath KeyPath, data io.Reader, size int64, shouldCache bool) error { + var err error + m.writeBuffer, err = tempo_io.ReadAllWithEstimate(data, size) + return err } func (m *MockRawWriter) Append(ctx context.Context, name string, keypath KeyPath, tracker AppendTracker, buffer []byte) (AppendTracker, error) { m.appendBuffer = buffer @@ -116,7 +112,7 @@ func (m *MockReader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID return m.M, nil } -func (m *MockReader) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) { +func (m *MockReader) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) ([]byte, error) { if m.ReadFn != nil { return m.ReadFn(name, blockID, tenantID) } diff --git a/tempodb/backend/raw.go b/tempodb/backend/raw.go index 338c7d6a09d..23f62a6a30d 100644 --- a/tempodb/backend/raw.go +++ b/tempodb/backend/raw.go @@ -1,6 +1,7 @@ package backend import ( + "bytes" "context" "encoding/json" "fmt" @@ -8,6 +9,8 @@ import ( "path" "github.com/google/uuid" + + tempo_io "github.com/grafana/tempo/pkg/io" ) const ( @@ -21,10 +24,8 @@ type KeyPath []string // RawWriter is a collection of methods to write data to tempodb backends type RawWriter interface { - // Write is for in memory data. It is expected that this data will be cached. - Write(ctx context.Context, name string, keypath KeyPath, buffer []byte) error - // StreamWriter is for larger data payloads streamed through an io.Reader. It is expected this will _not_ be cached. - StreamWriter(ctx context.Context, name string, keypath KeyPath, data io.Reader, size int64) error + // Write is for in memory data. shouldCache specifies whether or not caching should be attempted. + Write(ctx context.Context, name string, keypath KeyPath, data io.Reader, size int64, shouldCache bool) error // Append starts or continues an Append job. Pass nil to AppendTracker to start a job. Append(ctx context.Context, name string, keypath KeyPath, tracker AppendTracker, buffer []byte) (AppendTracker, error) // Closes any resources associated with the AppendTracker @@ -33,14 +34,12 @@ type RawWriter interface { // RawReader is a collection of methods to read data from tempodb backends type RawReader interface { - // Read is for reading entire objects from the backend. It is expected that there will be an attempt to retrieve this from cache - Read(ctx context.Context, name string, keypath KeyPath) ([]byte, error) - // StreamReader is for streaming entire objects from the backend. It is expected this will _not_ be cached. - StreamReader(ctx context.Context, name string, keypath KeyPath) (io.ReadCloser, int64, error) - // ReadRange is for reading parts of large objects from the backend. It is expected this will _not_ be cached. - ReadRange(ctx context.Context, name string, keypath KeyPath, offset uint64, buffer []byte) error // List returns all objects one level beneath the provided keypath List(ctx context.Context, keypath KeyPath) ([]string, error) + // Read is for streaming entire objects from the backend. There will be an attempt to retrieve this from cache if shouldCache is true. + Read(ctx context.Context, name string, keyPath KeyPath, shouldCache bool) (io.ReadCloser, int64, error) + // ReadRange is for reading parts of large objects from the backend. It is expected this will _not_ be cached. + ReadRange(ctx context.Context, name string, keypath KeyPath, offset uint64, buffer []byte) error // Shutdown must be called when the Reader is finished and cleans up any associated resources. Shutdown() } @@ -56,12 +55,12 @@ func NewWriter(w RawWriter) Writer { } } -func (w *writer) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error { - return w.w.Write(ctx, name, KeyPathForBlock(blockID, tenantID), buffer) +func (w *writer) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte, shouldCache bool) error { + return w.w.Write(ctx, name, KeyPathForBlock(blockID, tenantID), bytes.NewReader(buffer), int64(len(buffer)), shouldCache) } func (w *writer) StreamWriter(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64) error { - return w.w.StreamWriter(ctx, name, KeyPathForBlock(blockID, tenantID), data, size) + return w.w.Write(ctx, name, KeyPathForBlock(blockID, tenantID), data, size, false) } func (w *writer) WriteBlockMeta(ctx context.Context, meta *BlockMeta) error { @@ -73,7 +72,7 @@ func (w *writer) WriteBlockMeta(ctx context.Context, meta *BlockMeta) error { return err } - return w.w.Write(ctx, MetaName, KeyPathForBlock(blockID, tenantID), bMeta) + return w.w.Write(ctx, MetaName, KeyPathForBlock(blockID, tenantID), bytes.NewReader(bMeta), int64(len(bMeta)), false) } func (w *writer) Append(ctx context.Context, name string, blockID uuid.UUID, tenantID string, tracker AppendTracker, buffer []byte) (AppendTracker, error) { @@ -95,12 +94,17 @@ func NewReader(r RawReader) Reader { } } -func (r *reader) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) { - return r.r.Read(ctx, name, KeyPathForBlock(blockID, tenantID)) +func (r *reader) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) ([]byte, error) { + objReader, size, err := r.r.Read(ctx, name, KeyPathForBlock(blockID, tenantID), shouldCache) + if err != nil { + return nil, err + } + defer objReader.Close() + return tempo_io.ReadAllWithEstimate(objReader, size) } func (r *reader) StreamReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) { - return r.r.StreamReader(ctx, name, KeyPathForBlock(blockID, tenantID)) + return r.r.Read(ctx, name, KeyPathForBlock(blockID, tenantID), false) } func (r *reader) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error { @@ -134,7 +138,13 @@ func (r *reader) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, erro } func (r *reader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error) { - bytes, err := r.r.Read(ctx, MetaName, KeyPathForBlock(blockID, tenantID)) + reader, size, err := r.r.Read(ctx, MetaName, KeyPathForBlock(blockID, tenantID), false) + if err != nil { + return nil, err + } + defer reader.Close() + + bytes, err := tempo_io.ReadAllWithEstimate(reader, size) if err != nil { return nil, err } diff --git a/tempodb/backend/raw_test.go b/tempodb/backend/raw_test.go index b28bb905779..e609d52e957 100644 --- a/tempodb/backend/raw_test.go +++ b/tempodb/backend/raw_test.go @@ -1,7 +1,6 @@ package backend import ( - "bytes" "context" "encoding/json" "testing" @@ -17,14 +16,10 @@ func TestWriter(t *testing.T) { expected := []byte{0x01, 0x02, 0x03, 0x04} - err := w.Write(ctx, "test", uuid.New(), "test", expected) + err := w.Write(ctx, "test", uuid.New(), "test", expected, false) assert.NoError(t, err) assert.Equal(t, expected, m.writeBuffer) - err = w.StreamWriter(ctx, "test", uuid.New(), "test", bytes.NewReader(expected), 0) - assert.NoError(t, err) - assert.Equal(t, expected, m.writeStreamBuffer) - _, err = w.Append(ctx, "test", uuid.New(), "test", nil, expected) assert.NoError(t, err) assert.Equal(t, expected, m.appendBuffer) @@ -47,7 +42,7 @@ func TestReader(t *testing.T) { expected := []byte{0x01, 0x02, 0x03, 0x04} m.R = expected - actual, err := r.Read(ctx, "test", uuid.New(), "test") + actual, err := r.Read(ctx, "test", uuid.New(), "test", false) assert.NoError(t, err) assert.Equal(t, expected, actual) diff --git a/tempodb/backend/readerat.go b/tempodb/backend/readerat.go index 23c7eebc72e..89ab37a4660 100644 --- a/tempodb/backend/readerat.go +++ b/tempodb/backend/readerat.go @@ -19,17 +19,19 @@ type ContextReader interface { // backendReader is a shim that allows a backend.Reader to be used as a ContextReader type backendReader struct { - meta *BlockMeta - name string - r Reader + meta *BlockMeta + name string + r Reader + shouldCache bool } // NewContextReader creates a ReaderAt for the given BlockMeta -func NewContextReader(meta *BlockMeta, name string, r Reader) ContextReader { +func NewContextReader(meta *BlockMeta, name string, r Reader, shouldCache bool) ContextReader { return &backendReader{ - meta: meta, - name: name, - r: r, + meta: meta, + name: name, + r: r, + shouldCache: shouldCache, } } @@ -41,7 +43,7 @@ func (b *backendReader) ReadAt(ctx context.Context, p []byte, off int64) (int, e // ReadAll implements ContextReader func (b *backendReader) ReadAll(ctx context.Context) ([]byte, error) { - return b.r.Read(ctx, b.name, b.meta.BlockID, b.meta.TenantID) + return b.r.Read(ctx, b.name, b.meta.BlockID, b.meta.TenantID, b.shouldCache) } // Reader implements ContextReader diff --git a/tempodb/backend/s3/s3.go b/tempodb/backend/s3/s3.go index b54e4d296d7..01e7a76d7d5 100644 --- a/tempodb/backend/s3/s3.go +++ b/tempodb/backend/s3/s3.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "io/ioutil" "net/http" "path" "strings" @@ -96,12 +97,7 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, } // Write implements backend.Writer -func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, buffer []byte) error { - return rw.StreamWriter(ctx, name, keypath, bytes.NewBuffer(buffer), int64(len(buffer))) -} - -// StreamWriter implements backend.Writer -func (rw *readerWriter) StreamWriter(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64) error { +func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64, _ bool) error { objName := backend.ObjectFileName(keypath, name) info, err := rw.core.Client.PutObject( @@ -228,20 +224,16 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st } // Read implements backend.Reader -func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) ([]byte, error) { +func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "Read") defer span.Finish() - bytes, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name)) + b, err := rw.readAll(derivedCtx, backend.ObjectFileName(keypath, name)) if err != nil { - return nil, readError(err) + return nil, 0, readError(err) } - return bytes, err -} - -func (rw *readerWriter) StreamReader(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) { - panic("StreamReader is not yet supported for S3 backend") + return ioutil.NopCloser(bytes.NewReader(b)), int64(len(b)), err } // ReadRange implements backend.Reader diff --git a/tempodb/backend/s3/s3_test.go b/tempodb/backend/s3/s3_test.go index 48eefc8d1c1..eb1e7f2ee58 100644 --- a/tempodb/backend/s3/s3_test.go +++ b/tempodb/backend/s3/s3_test.go @@ -1,6 +1,7 @@ package s3 import ( + "bytes" "context" "fmt" "net/http" @@ -59,12 +60,12 @@ func TestHedge(t *testing.T) { // the first call on each client initiates an extra http request // clearing that here - _, _ = r.Read(ctx, "object", backend.KeyPath{"test"}) + _, _, _ = r.Read(ctx, "object", backend.KeyPath{"test"}, false) time.Sleep(tc.returnIn) atomic.StoreInt32(&count, 0) // calls that should hedge - _, _ = r.Read(ctx, "object", backend.KeyPath{"test"}) + _, _, _ = r.Read(ctx, "object", backend.KeyPath{"test"}, false) time.Sleep(tc.returnIn) assert.Equal(t, tc.expectedHedgedRequests, atomic.LoadInt32(&count)) atomic.StoreInt32(&count, 0) @@ -79,7 +80,7 @@ func TestHedge(t *testing.T) { assert.Equal(t, int32(1), atomic.LoadInt32(&count)) atomic.StoreInt32(&count, 0) - _ = w.Write(ctx, "object", backend.KeyPath{"test"}, []byte{}) + _ = w.Write(ctx, "object", backend.KeyPath{"test"}, bytes.NewReader([]byte{}), 0, false) assert.Equal(t, int32(1), atomic.LoadInt32(&count)) atomic.StoreInt32(&count, 0) }) diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index e8f9a17491b..1c66f7e447a 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -52,7 +52,8 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { blockID := b.meta.BlockID tenantID := b.meta.TenantID - bloomBytes, err := b.reader.Read(ctx, bloomName(shardKey), blockID, tenantID) + nameBloom := bloomName(shardKey) + bloomBytes, err := b.reader.Read(ctx, nameBloom, blockID, tenantID, true) if err != nil { return nil, fmt.Errorf("error retrieving bloom (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } @@ -67,13 +68,13 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { return nil, nil } - indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader) + indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader, false) indexReader, err := b.encoding.NewIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) if err != nil { return nil, fmt.Errorf("error building index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } - ra := backend.NewContextReader(b.meta, nameObjects, b.reader) + ra := backend.NewContextReader(b.meta, nameObjects, b.reader, false) dataReader, err := b.encoding.NewDataReader(ra, b.meta.Encoding) if err != nil { return nil, fmt.Errorf("error building page reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) @@ -94,7 +95,7 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { // Iterator returns an Iterator that iterates over the objects in the block from the backend func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) { // read index - ra := backend.NewContextReader(b.meta, nameObjects, b.reader) + ra := backend.NewContextReader(b.meta, nameObjects, b.reader, false) dataReader, err := b.encoding.NewDataReader(ra, b.meta.Encoding) if err != nil { return nil, fmt.Errorf("failed to create dataReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) @@ -109,7 +110,7 @@ func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) { } func (b *BackendBlock) NewIndexReader() (common.IndexReader, error) { - indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader) + indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader, false) reader, err := b.encoding.NewIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) if err != nil { return nil, fmt.Errorf("failed to create index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) diff --git a/tempodb/encoding/block.go b/tempodb/encoding/block.go index 7a9c557dfc5..8979bf82558 100644 --- a/tempodb/encoding/block.go +++ b/tempodb/encoding/block.go @@ -32,14 +32,15 @@ func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMe } // index - err = w.Write(ctx, nameIndex, meta.BlockID, meta.TenantID, indexBytes) + err = w.Write(ctx, nameIndex, meta.BlockID, meta.TenantID, indexBytes, false) if err != nil { return fmt.Errorf("unexpected error writing index %w", err) } // bloom for i, bloom := range blooms { - err := w.Write(ctx, bloomName(i), meta.BlockID, meta.TenantID, bloom) + nameBloom := bloomName(i) + err := w.Write(ctx, nameBloom, meta.BlockID, meta.TenantID, bloom, true) if err != nil { return fmt.Errorf("unexpected error writing bloom-%d %w", i, err) } diff --git a/tempodb/wal/local_block.go b/tempodb/wal/local_block.go index e81a4191d91..a312ebc3295 100644 --- a/tempodb/wal/local_block.go +++ b/tempodb/wal/local_block.go @@ -33,7 +33,7 @@ func NewLocalBlock(ctx context.Context, existingBlock *encoding.BackendBlock, l writer: backend.NewWriter(l), } - flushedBytes, err := c.reader.Read(ctx, nameFlushed, c.BlockMeta().BlockID, c.BlockMeta().TenantID) + flushedBytes, err := c.reader.Read(ctx, nameFlushed, c.BlockMeta().BlockID, c.BlockMeta().TenantID, false) if err == nil { flushedTime := time.Time{} err = flushedTime.UnmarshalText(flushedBytes) @@ -66,7 +66,7 @@ func (c *LocalBlock) SetFlushed(ctx context.Context) error { return errors.Wrap(err, "error marshalling flush time to text") } - err = c.writer.Write(ctx, nameFlushed, c.BlockMeta().BlockID, c.BlockMeta().TenantID, flushedBytes) + err = c.writer.Write(ctx, nameFlushed, c.BlockMeta().BlockID, c.BlockMeta().TenantID, flushedBytes, false) if err != nil { return errors.Wrap(err, "error writing ingester block flushed file") }