Skip to content

Commit

Permalink
refactor(storage): make some code more testable (#14429)
Browse files Browse the repository at this point in the history
This just changes `storage::internal::RetryObjectReadSource` to make the
class more testable. The backoff function is provided as an argument, so
it can be mocked. Likewise, the code to start new downloads is provided
by a factory function.

Incidentally, I captured the current options using
`internal::ImmutableOptions`, saving a number of copies in the process.
  • Loading branch information
coryan committed Jul 3, 2024
1 parent 8ef423c commit b2bf3cb
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 48 deletions.
57 changes: 30 additions & 27 deletions google/cloud/storage/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,36 +365,39 @@ StatusOr<ObjectMetadata> StorageConnectionImpl::GetObjectMetadata(
google::cloud::internal::CurrentOptions(), request, __func__);
}

StatusOr<std::unique_ptr<ObjectReadSource>>
StorageConnectionImpl::ReadObjectNotWrapped(
ReadObjectRangeRequest const& request, RetryPolicy& retry_policy,
BackoffPolicy& backoff_policy) {
auto const idempotency = current_idempotency_policy().IsIdempotent(request)
? Idempotency::kIdempotent
: Idempotency::kNonIdempotent;
return RestRetryLoop(
retry_policy, backoff_policy, idempotency,
[token = MakeIdempotencyToken(), this](
rest_internal::RestContext& context, Options const& options,
auto const& request) {
context.AddHeader(kIdempotencyTokenHeader, token);
return stub_->ReadObject(context, options, request);
},
google::cloud::internal::CurrentOptions(), request, __func__);
}

StatusOr<std::unique_ptr<ObjectReadSource>> StorageConnectionImpl::ReadObject(
ReadObjectRangeRequest const& request) {
auto retry_policy = current_retry_policy();
auto backoff_policy = current_backoff_policy();
auto child = ReadObjectNotWrapped(request, *retry_policy, *backoff_policy);
if (!child) {
return child;
}
auto current = google::cloud::internal::SaveCurrentOptions();

auto self = shared_from_this();
return std::unique_ptr<ObjectReadSource>(new RetryObjectReadSource(
self, request, *std::move(child), std::move(retry_policy),
std::move(backoff_policy)));
auto const* where = __func__;
auto factory = [self = shared_from_this(), current, where](
ReadObjectRangeRequest const& request,
RetryPolicy& retry_policy, BackoffPolicy& backoff_policy) {
auto const idempotency =
current->get<IdempotencyPolicyOption>()->IsIdempotent(request)
? Idempotency::kIdempotent
: Idempotency::kNonIdempotent;
return RestRetryLoop(
retry_policy, backoff_policy, idempotency,
[self, token = self->MakeIdempotencyToken()](
rest_internal::RestContext& context, Options const& options,
ReadObjectRangeRequest const& request) {
context.AddHeader(kIdempotencyTokenHeader, token);
return self->stub_->ReadObject(context, options, request);
},
*current, request, where);
};

auto retry_policy = current->get<RetryPolicyOption>()->clone();
auto backoff_policy = current->get<BackoffPolicyOption>()->clone();
auto child = factory(request, *retry_policy, *backoff_policy);
if (!child) return child;

return std::unique_ptr<ObjectReadSource>(
std::make_unique<RetryObjectReadSource>(
std::move(factory), std::move(current), request, *std::move(child),
std::move(retry_policy), std::move(backoff_policy)));
}

StatusOr<ListObjectsResponse> StorageConnectionImpl::ListObjects(
Expand Down
3 changes: 0 additions & 3 deletions google/cloud/storage/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ class StorageConnectionImpl
StatusOr<ObjectMetadata> GetObjectMetadata(
GetObjectMetadataRequest const& request) override;

/// Call ReadObject() but do not wrap the result in a RetryObjectReadSource.
StatusOr<std::unique_ptr<ObjectReadSource>> ReadObjectNotWrapped(
ReadObjectRangeRequest const&, RetryPolicy&, BackoffPolicy&);
StatusOr<std::unique_ptr<ObjectReadSource>> ReadObject(
ReadObjectRangeRequest const&) override;

Expand Down
4 changes: 2 additions & 2 deletions google/cloud/storage/internal/connection_impl_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ TEST(StorageConnectionImpl, ReadObjectTooManyFailures) {
StorageConnectionImpl::Create(std::move(mock), RetryTestOptions());
google::cloud::internal::OptionsSpan span(client->options());
auto response = client->ReadObject(ReadObjectRangeRequest()).status();
EXPECT_THAT(response, StoppedOnTooManyTransients("ReadObjectNotWrapped"));
EXPECT_THAT(response, StoppedOnTooManyTransients("ReadObject"));
EXPECT_THAT(transient.captured_tokens(), RetryLoopUsesSingleToken());
EXPECT_THAT(transient.captured_authority_options(), RetryLoopUsesOptions());
}
Expand All @@ -148,7 +148,7 @@ TEST(StorageConnectionImpl, ReadObjectPermanentFailure) {
StorageConnectionImpl::Create(std::move(mock), RetryTestOptions());
google::cloud::internal::OptionsSpan span(client->options());
auto response = client->ReadObject(ReadObjectRangeRequest()).status();
EXPECT_THAT(response, StoppedOnPermanentError("ReadObjectNotWrapped"));
EXPECT_THAT(response, StoppedOnPermanentError("ReadObject"));
EXPECT_THAT(permanent.captured_tokens(), RetryLoopUsesSingleToken());
EXPECT_THAT(permanent.captured_authority_options(), RetryLoopUsesOptions());
}
Expand Down
33 changes: 23 additions & 10 deletions google/cloud/storage/internal/retry_object_read_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ namespace storage {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace internal {

using ::google::cloud::internal::CurrentOptions;
using ::google::cloud::internal::OptionsSpan;

std::uint64_t InitialOffset(OffsetDirection const& offset_direction,
Expand All @@ -37,19 +36,34 @@ std::uint64_t InitialOffset(OffsetDirection const& offset_direction,
}

RetryObjectReadSource::RetryObjectReadSource(
std::shared_ptr<StorageConnectionImpl> connection,
ReadSourceFactory factory,
google::cloud::internal::ImmutableOptions options,
ReadObjectRangeRequest request, std::unique_ptr<ObjectReadSource> child,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: connection_(std::move(connection)),
std::unique_ptr<BackoffPolicy> backoff_policy,
std::function<void(std::chrono::milliseconds)> backoff)
: factory_(std::move(factory)),
options_(std::move(options)),
request_(std::move(request)),
child_(std::move(child)),
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
offset_direction_(request_.HasOption<ReadLast>() ? kFromEnd
: kFromBeginning),
current_offset_(InitialOffset(offset_direction_, request_)),
span_options_(CurrentOptions()) {}
backoff_(std::move(backoff)) {}

RetryObjectReadSource::RetryObjectReadSource(
ReadSourceFactory factory,
google::cloud::internal::ImmutableOptions options,
ReadObjectRangeRequest request, std::unique_ptr<ObjectReadSource> child,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: RetryObjectReadSource(
std::move(factory), std::move(options), std::move(request),
std::move(child), std::move(retry_policy), std::move(backoff_policy),
[](std::chrono::milliseconds d) { std::this_thread::sleep_for(d); }) {
}

StatusOr<ReadSourceResult> RetryObjectReadSource::Read(char* buf,
std::size_t n) {
Expand All @@ -74,7 +88,7 @@ StatusOr<ReadSourceResult> RetryObjectReadSource::Read(char* buf,
auto retry_policy = retry_policy_prototype_->clone();
int counter = 0;
for (; !result && retry_policy->OnFailure(result.status());
std::this_thread::sleep_for(backoff_policy->OnCompletion()),
backoff_(backoff_policy->OnCompletion()),
result = child_->Read(buf, n)) {
// A Read() request failed, most likely that means the connection failed or
// stalled. The current child might no longer be usable, so we will try to
Expand Down Expand Up @@ -134,9 +148,8 @@ Status RetryObjectReadSource::MakeChild(RetryPolicy& retry_policy,
return Status{};
};

OptionsSpan const span(span_options_);
auto child =
connection_->ReadObjectNotWrapped(request_, retry_policy, backoff_policy);
OptionsSpan const span(options_);
auto child = factory_(request_, retry_policy, backoff_policy);
if (!child) return std::move(child).status();
if (!is_gunzipped_) return on_success(*std::move(child));

Expand All @@ -148,7 +161,7 @@ Status RetryObjectReadSource::MakeChild(RetryPolicy& retry_policy,

// Try again, eventually the retry policy will expire and this will fail.
if (!retry_policy.OnFailure(child.status())) return std::move(child).status();
std::this_thread::sleep_for(backoff_policy.OnCompletion());
backoff_(backoff_policy.OnCompletion());

return MakeChild(retry_policy, backoff_policy);
}
Expand Down
27 changes: 21 additions & 6 deletions google/cloud/storage/internal/retry_object_read_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_RETRY_OBJECT_READ_SOURCE_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_RETRY_OBJECT_READ_SOURCE_H

#include "google/cloud/storage/internal/connection_impl.h"
#include "google/cloud/storage/internal/object_read_source.h"
#include "google/cloud/storage/internal/object_requests.h"
#include "google/cloud/storage/retry_policy.h"
#include "google/cloud/storage/version.h"
#include "google/cloud/options.h"
#include "absl/types/optional.h"
#include <chrono>
#include <functional>
#include <memory>

namespace google {
Expand All @@ -39,7 +43,19 @@ enum OffsetDirection { kFromBeginning, kFromEnd };
*/
class RetryObjectReadSource : public ObjectReadSource {
public:
RetryObjectReadSource(std::shared_ptr<StorageConnectionImpl> connection,
using ReadSourceFactory =
std::function<StatusOr<std::unique_ptr<ObjectReadSource>>(
ReadObjectRangeRequest const&, RetryPolicy&, BackoffPolicy&)>;

RetryObjectReadSource(ReadSourceFactory factory,
google::cloud::internal::ImmutableOptions options,
ReadObjectRangeRequest request,
std::unique_ptr<ObjectReadSource> child,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy,
std::function<void(std::chrono::milliseconds)> backoff);
RetryObjectReadSource(ReadSourceFactory factory,
google::cloud::internal::ImmutableOptions options,
ReadObjectRangeRequest request,
std::unique_ptr<ObjectReadSource> child,
std::unique_ptr<RetryPolicy> retry_policy,
Expand All @@ -55,7 +71,8 @@ class RetryObjectReadSource : public ObjectReadSource {
StatusOr<std::unique_ptr<ObjectReadSource>> ReadDiscard(
std::unique_ptr<ObjectReadSource> child, std::int64_t count) const;

std::shared_ptr<StorageConnectionImpl> connection_;
ReadSourceFactory factory_;
google::cloud::internal::ImmutableOptions options_;
ReadObjectRangeRequest request_;
std::unique_ptr<ObjectReadSource> child_;
absl::optional<std::int64_t> generation_;
Expand All @@ -64,9 +81,7 @@ class RetryObjectReadSource : public ObjectReadSource {
OffsetDirection offset_direction_;
std::int64_t current_offset_ = 0;
bool is_gunzipped_ = false;
// Capture the options in effect when the stream was created, to reuse as
// new requests are generated.
google::cloud::Options span_options_;
std::function<void(std::chrono::milliseconds)> backoff_;
};

} // namespace internal
Expand Down

0 comments on commit b2bf3cb

Please sign in to comment.