Skip to content

Commit

Permalink
Revert changes to backend.Reader/Writer, move shouldCache() into cach…
Browse files Browse the repository at this point in the history
…e pkg

Signed-off-by: Annanay <annanayagarwal@gmail.com>
  • Loading branch information
annanay25 committed Jul 16, 2021
1 parent c8b8119 commit b81e23b
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 42 deletions.
4 changes: 2 additions & 2 deletions tempodb/backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor,
}

// Write implements backend.Writer
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64) error {
return rw.writer(ctx, bufio.NewReader(data), backend.ObjectFileName(keypath, name))
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st
}

// Read implements backend.Reader
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "Read")
defer span.Finish()

Expand Down
6 changes: 3 additions & 3 deletions tempodb/backend/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func TestHedge(t *testing.T) {

// the first call on each client initiates an extra http request
// clearing that here
_, _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), false)
_, _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"))
time.Sleep(tc.returnIn)
atomic.StoreInt32(&count, 0)

// calls that should hedge
_, _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), false)
_, _, _ = r.Read(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"))
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests*2, atomic.LoadInt32(&count)) // *2 b/c reads execute a HEAD and GET
atomic.StoreInt32(&count, 0)
Expand All @@ -78,7 +78,7 @@ func TestHedge(t *testing.T) {
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.Write(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), bytes.NewReader(make([]byte, 10)), 10, false)
_ = w.Write(ctx, "object", backend.KeyPathForBlock(uuid.New(), "tenant"), bytes.NewReader(make([]byte, 10)), 10)
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
})
Expand Down
12 changes: 8 additions & 4 deletions tempodb/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type AppendTracker interface{}

// Writer is a collection of methods to write data to tempodb backends
type Writer interface {
// Write is for writing objects to the backend.
Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64, shouldCache bool) error
// Write is for in memory data. It is expected that this data will be cached.
Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error
// StreamWriter is for larger data payloads streamed through an io.Reader. It is expected this will _not_ be cached.
StreamWriter(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64) error
// WriteBlockMeta writes a block meta to its blocks
WriteBlockMeta(ctx context.Context, meta *BlockMeta) error
// Append starts or continues an Append job. Pass nil to AppendTracker to start a job.
Expand All @@ -31,8 +33,10 @@ type Writer interface {

// Reader is a collection of methods to read data from tempodb backends
type Reader interface {
// Read is for reading objects from the backend.
Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) (io.ReadCloser, int64, error)
// Reader is for reading entire objects from the backend. It is expected that there will be an attempt to retrieve this from cache
Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error)
// StreamReader is for streaming entire objects from the backend. It is expected this will _not_ be cached.
StreamReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error)
// ReadRange is for reading parts of large objects from the backend. It is expected this will _not_ be cached.
ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error
// Tenants returns a list of all tenants in a backend
Expand Down
18 changes: 11 additions & 7 deletions tempodb/backend/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,23 @@ func (r *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]str
}

// Read implements backend.RawReader
func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, shouldCache bool) (io.ReadCloser, int64, error) {
func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) {
var k string
if shouldCache {
if shouldCache(name) {
k = key(keypath, name)
found, vals, _ := r.cache.Fetch(ctx, []string{k})
if len(found) > 0 {
return ioutil.NopCloser(bytes.NewReader(vals[0])), int64(len(vals[0])), nil
}
}

object, size, err := r.nextReader.Read(ctx, name, keypath, false)
object, size, err := r.nextReader.Read(ctx, name, keypath)
if err != nil {
return nil, 0, err
}

b, err := tempo_io.ReadAllWithEstimate(object, size)
if err == nil && shouldCache {
if err == nil && shouldCache(name) {
r.cache.Store(ctx, []string{k}, [][]byte{b})
}

Expand All @@ -70,16 +70,16 @@ func (r *readerWriter) Shutdown() {
}

// Write implements backend.Writer
func (r *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64, shouldCache bool) error {
func (r *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64) error {
b, err := tempo_io.ReadAllWithEstimate(data, size)
if err != nil {
return err
}

if shouldCache {
if shouldCache(name) {
r.cache.Store(ctx, []string{key(keypath, name)}, [][]byte{b})
}
return r.nextWriter.Write(ctx, name, keypath, bytes.NewReader(b), int64(len(b)), false)
return r.nextWriter.Write(ctx, name, keypath, bytes.NewReader(b), int64(len(b)))
}

// Append implements backend.Writer
Expand All @@ -95,3 +95,7 @@ func (r *readerWriter) CloseAppend(ctx context.Context, tracker backend.AppendTr
func key(keypath backend.KeyPath, name string) string {
return strings.Join(keypath, ":") + ":" + name
}

func shouldCache(name string) bool {
return name != backend.MetaName && name != backend.CompactedMetaName && name != backend.BlockIndexName
}
4 changes: 2 additions & 2 deletions tempodb/backend/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor,
}

// StreamWriter implements backend.Writer
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64) error {
w := rw.writer(ctx, backend.ObjectFileName(keypath, name))
_, err := io.Copy(w, data)
if err != nil {
Expand Down Expand Up @@ -129,7 +129,7 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st
}

// Read implements backend.Reader
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.Read")
defer span.Finish()

Expand Down
6 changes: 3 additions & 3 deletions tempodb/backend/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func TestHedge(t *testing.T) {

// the first call on each client initiates an extra http request
// clearing that here
_, _, _ = r.Read(ctx, "object", []string{"test"}, false)
_, _, _ = r.Read(ctx, "object", []string{"test"})
time.Sleep(tc.returnIn)
atomic.StoreInt32(&count, 0)

// calls that should hedge
_, _, _ = r.Read(ctx, "object", []string{"test"}, false)
_, _, _ = r.Read(ctx, "object", []string{"test"})
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests, atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
Expand All @@ -77,7 +77,7 @@ func TestHedge(t *testing.T) {
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.Write(ctx, "object", []string{"test"}, bytes.NewReader([]byte{}), 0, false)
_ = w.Write(ctx, "object", []string{"test"}, bytes.NewReader([]byte{}), 0)
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
})
Expand Down
4 changes: 2 additions & 2 deletions tempodb/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor,
}

// Write implements backend.Writer
func (rw *Backend) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
func (rw *Backend) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64) error {
blockFolder := rw.rootPath(keypath)
err := os.MkdirAll(blockFolder, os.ModePerm)
if err != nil {
Expand Down Expand Up @@ -116,7 +116,7 @@ func (rw *Backend) List(ctx context.Context, keypath backend.KeyPath) ([]string,
}

// Read implements backend.Reader
func (rw *Backend) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
func (rw *Backend) Read(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) {
filename := rw.objectFileName(keypath, name)

f, err := os.OpenFile(filename, os.O_RDONLY, 0644)
Expand Down
4 changes: 2 additions & 2 deletions tempodb/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func TestReadWrite(t *testing.T) {
ctx := context.Background()
for _, id := range tenantIDs {
fakeMeta.TenantID = id
err = w.Write(ctx, objectName, backend.KeyPathForBlock(fakeMeta.BlockID, id), bytes.NewReader(fakeObject), int64(len(fakeObject)), false)
err = w.Write(ctx, objectName, backend.KeyPathForBlock(fakeMeta.BlockID, id), bytes.NewReader(fakeObject), int64(len(fakeObject)))
assert.NoError(t, err, "unexpected error writing")
}

actualObject, size, err := r.Read(ctx, objectName, backend.KeyPathForBlock(blockID, tenantIDs[0]), false)
actualObject, size, err := r.Read(ctx, objectName, backend.KeyPathForBlock(blockID, tenantIDs[0]))
assert.NoError(t, err, "unexpected error reading")
actualObjectBytes, err := io.ReadAllWithEstimate(actualObject, size)
assert.NoError(t, err, "unexpected error reading")
Expand Down
32 changes: 20 additions & 12 deletions tempodb/backend/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type KeyPath []string
// RawWriter is a collection of methods to write data to tempodb backends
type RawWriter interface {
// Write is for in memory data. It is expected that this data will be cached.
Write(ctx context.Context, name string, keypath KeyPath, data io.Reader, size int64, shouldCache bool) error
Write(ctx context.Context, name string, keypath KeyPath, data io.Reader, size int64) error
// Append starts or continues an Append job. Pass nil to AppendTracker to start a job.
Append(ctx context.Context, name string, keypath KeyPath, tracker AppendTracker, buffer []byte) (AppendTracker, error)
// Closes any resources associated with the AppendTracker
Expand All @@ -38,7 +38,7 @@ type RawReader interface {
// List returns all objects one level beneath the provided keypath
List(ctx context.Context, keypath KeyPath) ([]string, error)
// Read is for streaming entire objects from the backend. It is expected this will _not_ be cached.
Read(ctx context.Context, name string, keyPath KeyPath, shouldCache bool) (io.ReadCloser, int64, error)
Read(ctx context.Context, name string, keyPath KeyPath) (io.ReadCloser, int64, error)
// ReadRange is for reading parts of large objects from the backend. It is expected this will _not_ be cached.
ReadRange(ctx context.Context, name string, keypath KeyPath, offset uint64, buffer []byte) error
// Shutdown must be called when the Reader is finished and cleans up any associated resources.
Expand All @@ -56,8 +56,12 @@ func NewWriter(w RawWriter) Writer {
}
}

func (w *writer) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64, shouldCache bool) error {
return w.w.Write(ctx, name, KeyPathForBlock(blockID, tenantID), data, size, shouldCache)
func (w *writer) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error {
return w.w.Write(ctx, name, KeyPathForBlock(blockID, tenantID), bytes.NewReader(buffer), int64(len(buffer)))
}

func (w *writer) StreamWriter(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64) error {
return w.w.Write(ctx, name, KeyPathForBlock(blockID, tenantID), data, size)
}

func (w *writer) WriteBlockMeta(ctx context.Context, meta *BlockMeta) error {
Expand All @@ -69,7 +73,7 @@ func (w *writer) WriteBlockMeta(ctx context.Context, meta *BlockMeta) error {
return err
}

return w.w.Write(ctx, MetaName, KeyPathForBlock(blockID, tenantID), bytes.NewReader(bMeta), int64(len(bMeta)), false)
return w.w.Write(ctx, MetaName, KeyPathForBlock(blockID, tenantID), bytes.NewReader(bMeta), int64(len(bMeta)))
}

func (w *writer) Append(ctx context.Context, name string, blockID uuid.UUID, tenantID string, tracker AppendTracker, buffer []byte) (AppendTracker, error) {
Expand All @@ -91,8 +95,16 @@ func NewReader(r RawReader) Reader {
}
}

func (r *reader) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) (io.ReadCloser, int64, error) {
return r.r.Read(ctx, name, KeyPathForBlock(blockID, tenantID), shouldCache)
func (r *reader) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) {
objReader, size, err := r.r.Read(ctx, name, KeyPathForBlock(blockID, tenantID))
if err != nil {
return nil, err
}
return tempo_io.ReadAllWithEstimate(objReader, size)
}

func (r *reader) StreamReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) {
return r.r.Read(ctx, name, KeyPathForBlock(blockID, tenantID))
}

func (r *reader) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error {
Expand Down Expand Up @@ -126,7 +138,7 @@ func (r *reader) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, erro
}

func (r *reader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error) {
reader, size, err := r.r.Read(ctx, MetaName, KeyPathForBlock(blockID, tenantID), false)
reader, size, err := r.r.Read(ctx, MetaName, KeyPathForBlock(blockID, tenantID))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -175,7 +187,3 @@ func CompactedMetaFileName(blockID uuid.UUID, tenantID string) string {
func RootPath(blockID uuid.UUID, tenantID string) string {
return path.Join(tenantID, blockID.String())
}

func ShouldCache(name string) bool {
return name != MetaName && name != CompactedMetaName && name != BlockIndexName
}
4 changes: 2 additions & 2 deletions tempodb/backend/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor,
}

// Write implements backend.Writer
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64, _ bool) error {
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, size int64) error {
objName := backend.ObjectFileName(keypath, name)

info, err := rw.core.Client.PutObject(
Expand Down Expand Up @@ -224,7 +224,7 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st
}

// Read implements backend.Reader
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath) (io.ReadCloser, int64, error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "Read")
defer span.Finish()

Expand Down
6 changes: 3 additions & 3 deletions tempodb/backend/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func TestHedge(t *testing.T) {

// the first call on each client initiates an extra http request
// clearing that here
_, _, _ = r.Read(ctx, "object", backend.KeyPath{"test"}, false)
_, _, _ = r.Read(ctx, "object", backend.KeyPath{"test"})
time.Sleep(tc.returnIn)
atomic.StoreInt32(&count, 0)

// calls that should hedge
_, _, _ = r.Read(ctx, "object", backend.KeyPath{"test"}, false)
_, _, _ = r.Read(ctx, "object", backend.KeyPath{"test"})
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests, atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
Expand All @@ -80,7 +80,7 @@ func TestHedge(t *testing.T) {
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.Write(ctx, "object", backend.KeyPath{"test"}, bytes.NewReader([]byte{}), 0, false)
_ = w.Write(ctx, "object", backend.KeyPath{"test"}, bytes.NewReader([]byte{}), 0)
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
})
Expand Down

0 comments on commit b81e23b

Please sign in to comment.