diff --git a/CHANGELOG.md b/CHANGELOG.md index 1526f444512..24b664fb15f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## main / unreleased +* [BUGFIX] Azure Backend - Fix an issue with the append method on the Azure backend. [#736](https://github.com/grafana/tempo/pull/736) + ## v1.0.0-rc.0 * [ENHANCEMENT] Performance: Improve Ingester Record Insertion. [#681](https://github.com/grafana/tempo/pull/681) diff --git a/tempodb/backend/azure/azure.go b/tempodb/backend/azure/azure.go index 000c898e960..02a5ab2de5f 100644 --- a/tempodb/backend/azure/azure.go +++ b/tempodb/backend/azure/azure.go @@ -4,6 +4,8 @@ import ( "bufio" "bytes" "context" + "encoding/base64" + "encoding/binary" "encoding/json" "io" "strings" @@ -91,7 +93,7 @@ func (rw *readerWriter) Append(ctx context.Context, name string, blockID uuid.UU } else { a = tracker.(appendTracker) - _, err := rw.append(ctx, buffer, a.Name) + err := rw.append(ctx, buffer, a.Name) if err != nil { return nil, err } @@ -222,16 +224,45 @@ func (rw *readerWriter) writeAll(ctx context.Context, name string, b []byte) err return nil } -func (rw *readerWriter) append(ctx context.Context, src []byte, name string) (string, error) { - appendBlobURL := rw.containerURL.NewAppendBlobURL(name) +func (rw *readerWriter) append(ctx context.Context, src []byte, name string) error { + appendBlobURL := rw.containerURL.NewBlockBlobURL(name) - resp, err := appendBlobURL.AppendBlock(ctx, bytes.NewReader(src), blob.AppendBlobAccessConditions{}, nil) + // These helper functions convert a binary block ID to a base-64 string and vice versa + // NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length + blockIDBinaryToBase64 := func(blockID []byte) string { return base64.StdEncoding.EncodeToString(blockID) } + blockIDIntToBase64 := func(blockID int) string { + binaryBlockID := (&[64]byte{})[:] + binary.LittleEndian.PutUint32(binaryBlockID, uint32(blockID)) + return blockIDBinaryToBase64(binaryBlockID) + } + + l, err := appendBlobURL.GetBlockList(ctx, blob.BlockListAll, blob.LeaseAccessConditions{}) + if err != nil { + return err + } + + // generate the next block id + id := blockIDIntToBase64(len(l.CommittedBlocks) + 1) + + _, err = appendBlobURL.StageBlock(ctx, id, bytes.NewReader(src), blob.LeaseAccessConditions{}, nil) if err != nil { - return "", err + return err } - return resp.RequestID(), nil + base64BlockIDs := make([]string, len(l.CommittedBlocks)+1) + for i := 0; i < len(l.CommittedBlocks); i++ { + base64BlockIDs[i] = l.CommittedBlocks[i].Name + } + + base64BlockIDs[len(l.CommittedBlocks)] = id + + // After all the blocks are uploaded, atomically commit them to the blob. + _, err = appendBlobURL.CommitBlockList(ctx, base64BlockIDs, blob.BlobHTTPHeaders{}, blob.Metadata{}, blob.BlobAccessConditions{}) + if err != nil { + return err + } + return nil } func (rw *readerWriter) writer(ctx context.Context, src io.Reader, name string) error {