Skip to content

Commit

Permalink
Add workaround for Bazel committed_size check when uploading compress…
Browse files Browse the repository at this point in the history
…ed blobs (#1493)
  • Loading branch information
bduffany authored Jan 28, 2022
1 parent e721cf2 commit 1cf409d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 19 deletions.
70 changes: 55 additions & 15 deletions server/remote_cache/byte_stream_server/byte_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ type writeState struct {
d *repb.Digest
activeResourceName string
offset int64
alreadyExists bool
}

func checkInitialPreconditions(req *bspb.WriteRequest) error {
Expand Down Expand Up @@ -239,7 +238,9 @@ func (s *ByteStreamServer) initStreamState(ctx context.Context, req *bspb.WriteR
if err != nil {
return nil, err
}
ws.alreadyExists = exists
if exists {
return nil, status.AlreadyExistsError("Already exists")
}

ws.checksum = NewChecksum()
ws.writer = ws.checksum
Expand Down Expand Up @@ -322,16 +323,12 @@ func (s *ByteStreamServer) Write(stream bspb.ByteStream_WriteServer) error {

// If the API key is read-only, pretend the object already exists.
if !canWrite {
r, err := digest.ParseUploadResourceName(req.ResourceName)
if err != nil {
return err
}
return stream.SendAndClose(&bspb.WriteResponse{
CommittedSize: r.GetDigest().GetSizeBytes(),
})
return s.handleAlreadyExists(ctx, stream, req)
}

streamState, err = s.initStreamState(ctx, req)
if status.IsAlreadyExistsError(err) {
return s.handleAlreadyExists(ctx, stream, req)
}
if err != nil {
return err
}
Expand All @@ -340,11 +337,6 @@ func (s *ByteStreamServer) Write(stream bspb.ByteStream_WriteServer) error {
log.Error(err.Error())
}
}()
if streamState.alreadyExists {
return stream.SendAndClose(&bspb.WriteResponse{
CommittedSize: streamState.d.GetSizeBytes(),
})
}
ht := hit_tracker.NewHitTracker(ctx, s.env, false)
uploadTracker := ht.TrackUpload(streamState.d)
defer uploadTracker.Close()
Expand Down Expand Up @@ -402,6 +394,54 @@ func (s *ByteStreamServer) QueryWriteStatus(ctx context.Context, req *bspb.Query
}, nil
}

func (s *ByteStreamServer) handleAlreadyExists(ctx context.Context, stream bspb.ByteStream_WriteServer, firstRequest *bspb.WriteRequest) error {
r, err := digest.ParseUploadResourceName(firstRequest.ResourceName)
if err != nil {
return err
}
// Bazel 5.0.0 effectively requires that the committed_size match the total
// length of the *uploaded* payload. For compressed payloads, the uploaded
// payload length is not yet known, because it depends on the compression
// parameters used by the client. So we need to read the whole stream from the
// client in the compressed case.
//
// See https://github.com/bazelbuild/bazel/issues/14654 for context.
committedSize := r.GetDigest().GetSizeBytes()
if r.GetCompressor() != repb.Compressor_IDENTITY {
remainingSize := int64(0)
if !firstRequest.FinishWrite {
// In the case where we read the full stream in order to determine its
// size, count it as an upload.
ht := hit_tracker.NewHitTracker(ctx, s.env, false)
uploadTracker := ht.TrackUpload(r.GetDigest())
defer uploadTracker.Close()

remainingSize, err = s.recvAll(stream)
if err != nil {
return err
}
}
committedSize = int64(len(firstRequest.Data)) + remainingSize
}
return stream.SendAndClose(&bspb.WriteResponse{CommittedSize: committedSize})
}

// recvAll receives the remaining write requests from the client, and returns
// the total (compressed) size of the uploaded bytes.
func (s *ByteStreamServer) recvAll(stream bspb.ByteStream_WriteServer) (int64, error) {
size := int64(0)
for {
req, err := stream.Recv()
if err == io.EOF {
return size, nil
}
if err != nil {
return size, err
}
size += int64(len(req.Data))
}
}

type Checksum struct {
hash hash.Hash
bytesWritten int64
Expand Down
46 changes: 42 additions & 4 deletions server/remote_cache/byte_stream_server/byte_stream_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,11 @@ func TestRPCWriteCompressedReadUncompressed(t *testing.T) {
clientConn := runByteStreamServer(ctx, te, t)
bsClient := bspb.NewByteStreamClient(clientConn)

for _, blobSize := range []int{1, 1e2, 1e4, 1e6, 8e6} {
// Note: Some larger blob sizes are included here so that we have a better
// chance of exercising the scenario where the gRPC client sends a burst of
// write requests at once without waiting to see if the server sends back
// a response (this is a common scenario in client-streaming uploads).
for _, blobSize := range []int{1, 1e2, 1e4, 1e6, 8e6, 16e6} {
blob := compressibleBlobOfSize(blobSize)

compressedBlob := compression.CompressZstd(nil, blob)
Expand Down Expand Up @@ -336,6 +340,25 @@ func TestRPCWriteCompressedReadUncompressed(t *testing.T) {
downloadBuf = append(downloadBuf, res.Data...)
}
require.Equal(t, blob, downloadBuf)

// Now try uploading a duplicate. The duplicate upload should not fail,
// and we should still be able to read the blob.
mustUploadChunked(t, ctx, bsClient, uploadResourceName, compressedBlob)

downloadBuf = []byte{}
downloadStream, err = bsClient.Read(ctx, &bspb.ReadRequest{
ResourceName: downloadResourceName,
})
require.NoError(t, err)
for {
res, err := downloadStream.Recv()
if err == io.EOF {
break
}
require.NoError(t, err)
downloadBuf = append(downloadBuf, res.Data...)
}
require.Equal(t, blob, downloadBuf)
}
}

Expand Down Expand Up @@ -413,14 +436,29 @@ func mustUploadChunked(t *testing.T, ctx context.Context, bsClient bspb.ByteStre
Data: remaining[:chunkSize],
FinishWrite: chunkSize == len(remaining),
})
require.NoError(t, err)
if err != io.EOF {
require.NoError(t, err)
}
remaining = remaining[chunkSize:]
if err == io.EOF {
// Server sent back a WriteResponse, which we will receive in the
// following CloseAndRecv call. Note that this response may have been sent
// in response to a WriteRequest sent in a previous loop iteration, since
// the gRPC client does not wait for the server to process each request
// before sending subsequent requests.