Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup(storage)!: uploads track committed_size #7868

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,50 @@

## v1.36.0 - TBD

### [Storage](https://github.com/googleapis/google-cloud-cpp/blob/main/google/cloud/storage/README.md)

**BREAKING CHANGE:** with this release any use of the
`storage::internal::ResumableUploadResponse` type require changes. Applications
should have little need for this type, outside mocks, so the changes should not
affect production code.

Nevertheless, we apologize for the inconvenience, and while we would have
preferred to avoid breaking changes, it was inevitable to introduce some
breaking changes to fix a data loss bug.

If you are affected by this change, you will need to change your tests following
this guide. Any place where you return a `ResumableUploadResponse` needs to
change from:

```cc
storage::internal::ResumableUploadResource{
/*upload_session_url=*/std::string{"typically-unused"},
/*last_committed_byte=*/std::uint64_t{value},
/*payload=*/absl::nullopt, // or some gcs::ObjectMetadata value
/*upload_state=*/ResumableUploadResponse::kInProgress, // or kDone
/*annotations=*/std::string{"typically-unused"}
}
```

to:

```cc
storage::internal::ResumableUploadResource{
/*upload_session_url=*/std::string{"typically-unused"},
/*upload_state=*/ResumableUploadResponse::kInProgress, // or kDone
/*committed_size=*/value + 1, // or absl::nullopt
/*payload=*/absl::nullopt, // or some gcs::ObjectMetadata value
/*annotations=*/std::string{"typically-unused"}
}
```

That is, you need to re-order the fields **and** increase the `value` to reflect
the number of committed bytes vs. the index in the last committed byte.

Changing the order of the fields was intentional. It will create a build
failure, which is easier to detect and repair than a run-time error in
your tests.

## v1.35.0 - 2022-01

### New GA Libraries
Expand Down
10 changes: 5 additions & 5 deletions google/cloud/storage/client_write_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ TEST_F(WriteObjectTest, WriteObject) {
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Return(0));
EXPECT_CALL(*mock, UploadChunk)
.WillRepeatedly(Return(make_status_or(ResumableUploadResponse{
"fake-url", 0, {}, ResumableUploadResponse::kInProgress, {}})));
"fake-url", ResumableUploadResponse::kInProgress, 0, {}, {}})));
EXPECT_CALL(*mock, ResetSession())
.WillOnce(Return(make_status_or(ResumableUploadResponse{
"fake-url", 0, {}, ResumableUploadResponse::kInProgress, {}})));
"fake-url", ResumableUploadResponse::kInProgress, 0, {}, {}})));
EXPECT_CALL(*mock, UploadFinalChunk)
.WillOnce(
Return(StatusOr<ResumableUploadResponse>(TransientError())))
.WillOnce(Return(make_status_or(ResumableUploadResponse{
"fake-url", 0, expected, ResumableUploadResponse::kDone, {}})));
"fake-url", ResumableUploadResponse::kDone, 0, expected, {}})));

return make_status_or(
std::unique_ptr<internal::ResumableUploadSession>(std::move(mock)));
Expand Down Expand Up @@ -227,7 +227,7 @@ TEST_F(WriteObjectTest, UploadStreamResumable) {
bytes_written += internal::TotalBytes(data);
EXPECT_EQ(bytes_written, size);
return make_status_or(ResumableUploadResponse{
"fake-url", 0, expected, ResumableUploadResponse::kDone, {}});
"fake-url", ResumableUploadResponse::kDone, 0, expected, {}});
});

return make_status_or(
Expand Down Expand Up @@ -285,7 +285,7 @@ TEST_F(WriteObjectTest, UploadFile) {
bytes_written += internal::TotalBytes(data);
EXPECT_EQ(bytes_written, size);
return make_status_or(ResumableUploadResponse{
"fake-url", 0, expected, ResumableUploadResponse::kDone, {}});
"fake-url", ResumableUploadResponse::kDone, 0, expected, {}});
});

return make_status_or(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@ TEST(StorageMockingSamples, MockWriteObject) {
EXPECT_CALL(*mock_result, UploadChunk)
.WillRepeatedly(Return(google::cloud::make_status_or(
ResumableUploadResponse{"fake-url",
0,
{},
ResumableUploadResponse::kInProgress,
/*committed_size=*/absl::nullopt,
/*object_metadata=*/absl::nullopt,
{}})));
EXPECT_CALL(*mock_result, UploadFinalChunk)
.WillRepeatedly(Return(google::cloud::make_status_or(
ResumableUploadResponse{"fake-url",
0,
expected_metadata,
ResumableUploadResponse::kDone,
/*committed_size=*/absl::nullopt,
/*object_metadata=*/expected_metadata,
{}})));

std::unique_ptr<gcs::internal::ResumableUploadSession> result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,9 @@ void CurlResumableUploadSession::Update(
// value. In this case we update the next expected byte using the chunk
// size, as we know the upload was successful.
next_expected_ += chunk_size;
} else if (result->last_committed_byte != 0) {
next_expected_ = result->last_committed_byte + 1;
} else {
// Nothing has been committed on the server side yet, keep resending.
next_expected_ = 0;
next_expected_ = result->committed_size.value_or(0);
}
if (session_id_.empty() && !result->upload_session_url.empty()) {
session_id_ = result->upload_session_url;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ MATCHER_P(MatchesPayload, value, "Checks whether payload matches a value") {

TEST(CurlResumableUploadSessionTest, Simple) {
auto mock = MockCurlClient::Create();
std::string test_url = "http://invalid.example.com/not-used-in-mock";
std::string test_url = "https://invalid.example.com/not-used-in-mock";
ResumableUploadRequest request("test-bucket", "test-object");
request.set_multiple_options(CustomHeader("x-test-custom", "custom-value"),
Fields("fields"), QuotaUser("quota-user"),
Expand Down Expand Up @@ -83,34 +83,34 @@ TEST(CurlResumableUploadSessionTest, Simple) {
EXPECT_EQ(0, request.source_size());
EXPECT_EQ(0, request.range_begin());
return make_status_or(ResumableUploadResponse{
"", size - 1, {}, ResumableUploadResponse::kInProgress, {}});
"", ResumableUploadResponse::kInProgress, size, {}, {}});
})
.WillOnce([&](UploadChunkRequest const& request) {
EXPECT_EQ(test_url, request.upload_session_url());
EXPECT_THAT(request.payload(), MatchesPayload(payload));
EXPECT_EQ(2 * size, request.source_size());
EXPECT_EQ(size, request.range_begin());
return make_status_or(ResumableUploadResponse{
"", 2 * size - 1, {}, ResumableUploadResponse::kDone, {}});
"", ResumableUploadResponse::kDone, 2 * size, {}, {}});
});

auto upload = session.UploadChunk({{payload}});
EXPECT_STATUS_OK(upload);
EXPECT_EQ(size - 1, upload->last_committed_byte);
EXPECT_EQ(size, upload->committed_size.value_or(0));
EXPECT_EQ(size, session.next_expected_byte());
EXPECT_FALSE(session.done());

upload = session.UploadFinalChunk({{payload}}, 2 * size, {});
EXPECT_STATUS_OK(upload);
EXPECT_EQ(2 * size - 1, upload->last_committed_byte);
EXPECT_EQ(2 * size, upload->committed_size.value_or(0));
EXPECT_EQ(2 * size, session.next_expected_byte());
EXPECT_TRUE(session.done());
}

TEST(CurlResumableUploadSessionTest, Reset) {
auto mock = MockCurlClient::Create();
std::string url1 = "http://invalid.example.com/not-used-in-mock-1";
std::string url2 = "http://invalid.example.com/not-used-in-mock-2";
std::string url1 = "https://invalid.example.com/not-used-in-mock-1";
std::string url2 = "https://invalid.example.com/not-used-in-mock-2";
ResumableUploadRequest request("test-bucket", "test-object");
request.set_multiple_options(CustomHeader("x-test-custom", "custom-value"),
Fields("fields"), QuotaUser("quota-user"),
Expand All @@ -125,14 +125,14 @@ TEST(CurlResumableUploadSessionTest, Reset) {
EXPECT_CALL(*mock, UploadChunk)
.WillOnce([&](UploadChunkRequest const&) {
return make_status_or(ResumableUploadResponse{
"", size - 1, {}, ResumableUploadResponse::kInProgress, {}});
"", ResumableUploadResponse::kInProgress, size, {}, {}});
})
.WillOnce([&](UploadChunkRequest const&) {
return StatusOr<ResumableUploadResponse>(
AsStatus(HttpResponse{308, "uh oh", {}}));
});
const ResumableUploadResponse resume_response{
url2, 2 * size - 1, {}, ResumableUploadResponse::kInProgress, {}};
url2, ResumableUploadResponse::kInProgress, 2 * size, {}, {}};
EXPECT_CALL(*mock, QueryResumableUpload)
.WillOnce([&](QueryResumableUploadRequest const& request) {
EXPECT_EQ(request.GetOption<CustomHeader>().value_or(""),
Expand Down Expand Up @@ -167,8 +167,8 @@ TEST(CurlResumableUploadSessionTest, Reset) {

TEST(CurlResumableUploadSessionTest, SessionUpdatedInChunkUpload) {
auto mock = MockCurlClient::Create();
std::string url1 = "http://invalid.example.com/not-used-in-mock-1";
std::string url2 = "http://invalid.example.com/not-used-in-mock-2";
std::string url1 = "https://invalid.example.com/not-used-in-mock-1";
std::string url2 = "https://invalid.example.com/not-used-in-mock-2";
ResumableUploadRequest request("test-bucket", "test-object");
CurlResumableUploadSession session(mock, request, url1);

Expand All @@ -179,11 +179,11 @@ TEST(CurlResumableUploadSessionTest, SessionUpdatedInChunkUpload) {
EXPECT_CALL(*mock, UploadChunk)
.WillOnce([&](UploadChunkRequest const&) {
return make_status_or(ResumableUploadResponse{
"", size - 1, {}, ResumableUploadResponse::kInProgress, {}});
"", ResumableUploadResponse::kInProgress, size, {}, {}});
})
.WillOnce([&](UploadChunkRequest const&) {
return make_status_or(ResumableUploadResponse{
url2, 2 * size - 1, {}, ResumableUploadResponse::kInProgress, {}});
url2, ResumableUploadResponse::kInProgress, 2 * size, {}, {}});
});

auto upload = session.UploadChunk({{payload}});
Expand All @@ -198,7 +198,7 @@ TEST(CurlResumableUploadSessionTest, SessionUpdatedInChunkUpload) {

TEST(CurlResumableUploadSessionTest, Empty) {
auto mock = MockCurlClient::Create();
std::string test_url = "http://invalid.example.com/not-used-in-mock";
std::string test_url = "https://invalid.example.com/not-used-in-mock";
ResumableUploadRequest request("test-bucket", "test-object");
CurlResumableUploadSession session(mock, request, test_url);

Expand All @@ -214,12 +214,12 @@ TEST(CurlResumableUploadSessionTest, Empty) {
EXPECT_EQ(0, request.source_size());
EXPECT_EQ(0, request.range_begin());
return make_status_or(ResumableUploadResponse{
"", size, {}, ResumableUploadResponse::kDone, {}});
"", ResumableUploadResponse::kDone, size, {}, {}});
});

auto upload = session.UploadFinalChunk({{payload}}, size, {});
EXPECT_STATUS_OK(upload);
EXPECT_EQ(size, upload->last_committed_byte);
EXPECT_EQ(size, upload->committed_size.value_or(0));
EXPECT_EQ(size, session.next_expected_byte());
EXPECT_TRUE(session.done());
}
Expand Down
7 changes: 2 additions & 5 deletions google/cloud/storage/internal/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,9 @@ StatusOr<ResumableUploadResponse> GrpcClient::QueryResumableUpload(

ResumableUploadResponse response;
response.upload_state = ResumableUploadResponse::kInProgress;
// TODO(#6880) - cleanup the committed_byte vs. size thing
if (status->has_persisted_size() && status->persisted_size()) {
response.last_committed_byte =
if (status->has_persisted_size()) {
response.committed_size =
static_cast<std::uint64_t>(status->persisted_size());
} else {
response.last_committed_byte = 0;
}
if (status->has_resource()) {
response.payload =
Expand Down
8 changes: 2 additions & 6 deletions google/cloud/storage/internal/grpc_object_request_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,8 @@ ResumableUploadResponse GrpcObjectRequestParser::FromProto(
google::storage::v2::WriteObjectResponse const& p, Options const& options) {
ResumableUploadResponse response;
response.upload_state = ResumableUploadResponse::kInProgress;
if (p.has_persisted_size() && p.persisted_size() > 0) {
// TODO(#6880) - cleanup the committed_byte vs. size thing
response.last_committed_byte =
static_cast<std::uint64_t>(p.persisted_size()) - 1;
} else {
response.last_committed_byte = 0;
if (p.has_persisted_size()) {
response.committed_size = static_cast<std::uint64_t>(p.persisted_size());
}
if (p.has_resource()) {
response.payload =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ StatusOr<ResumableUploadResponse> GrpcResumableUploadSession::ResetSession() {
if (!last_response_) return last_response_;

done_ = (last_response_->upload_state == ResumableUploadResponse::kDone);
next_expected_ = last_response_->last_committed_byte;
next_expected_ = last_response_->committed_size.value_or(0);
return last_response_;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ TEST(GrpcResumableUploadSessionTest, Simple) {
});

auto upload = session.UploadChunk({{payload}});
EXPECT_STATUS_OK(upload);
EXPECT_EQ(size - 1, upload->last_committed_byte);
ASSERT_STATUS_OK(upload);
EXPECT_EQ(size, upload->committed_size.value_or(0));
EXPECT_EQ(size, session.next_expected_byte());
EXPECT_FALSE(session.done());

upload = session.UploadFinalChunk({{payload}}, 2 * size, hashes);
EXPECT_STATUS_OK(upload);
EXPECT_EQ(2 * size - 1, upload->last_committed_byte);
EXPECT_EQ(2 * size, upload->committed_size.value_or(0));
EXPECT_EQ(2 * size, session.next_expected_byte());
EXPECT_TRUE(session.done());
}
Expand Down Expand Up @@ -179,13 +179,13 @@ TEST(GrpcResumableUploadSessionTest, SingleStreamForLargeChunks) {

auto upload = session.UploadChunk({{payload}});
EXPECT_STATUS_OK(upload);
EXPECT_EQ(size - 1, upload->last_committed_byte);
EXPECT_EQ(size, upload->committed_size.value_or(0));
EXPECT_EQ(size, session.next_expected_byte());
EXPECT_FALSE(session.done());

upload = session.UploadFinalChunk({{payload}}, 2 * size, HashValues{});
EXPECT_STATUS_OK(upload);
EXPECT_EQ(2 * size - 1, upload->last_committed_byte);
EXPECT_EQ(2 * size, upload->committed_size.value_or(0));
EXPECT_EQ(2 * size, session.next_expected_byte());
EXPECT_TRUE(session.done());
}
Expand Down Expand Up @@ -235,7 +235,7 @@ TEST(GrpcResumableUploadSessionTest, Reset) {
.WillOnce([&](QueryResumableUploadRequest const& request) {
EXPECT_EQ("test-upload-id", request.upload_session_url());
return make_status_or(ResumableUploadResponse{
{}, size, {}, ResumableUploadResponse::kInProgress, {}});
{}, ResumableUploadResponse::kInProgress, size, {}, {}});
});

auto upload = session.UploadChunk({{payload}});
Expand Down Expand Up @@ -303,7 +303,7 @@ TEST(GrpcResumableUploadSessionTest, ResumeFromEmpty) {
});

ResumableUploadResponse const resume_response{
{}, 0, {}, ResumableUploadResponse::kInProgress, {}};
{}, ResumableUploadResponse::kInProgress, 0, {}, {}};
EXPECT_CALL(*mock, QueryResumableUpload)
.WillOnce([&](QueryResumableUploadRequest const& request) {
EXPECT_EQ("test-upload-id", request.upload_session_url());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ TEST_F(LoggingResumableUploadSessionTest, LastResponseOk) {
auto mock = absl::make_unique<testing::MockResumableUploadSession>();

const StatusOr<ResumableUploadResponse> last_response(ResumableUploadResponse{
"upload url", 1, {}, ResumableUploadResponse::kInProgress, {}});
"upload url", ResumableUploadResponse::kInProgress, 1, {}, {}});
EXPECT_CALL(*mock, last_response()).WillOnce(ReturnRef(last_response));

LoggingResumableUploadSession session(std::move(mock));
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/storage/internal/object_write_streambuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ObjectWriteStreambuf::ObjectWriteStreambuf(
hash_validator_(std::move(hash_validator)),
auto_finalize_(auto_finalize),
last_response_(ResumableUploadResponse{
{}, 0, {}, ResumableUploadResponse::kInProgress, {}}) {
{}, ResumableUploadResponse::kInProgress, 0, absl::nullopt, {}}) {
current_ios_buffer_.resize(max_buffer_size_);
auto* pbeg = current_ios_buffer_.data();
auto* pend = pbeg + current_ios_buffer_.size();
Expand Down
Loading