diff --git a/google/cloud/storage/internal/connection_impl.cc b/google/cloud/storage/internal/connection_impl.cc index 6fd4c291f451..8bd551c09eac 100644 --- a/google/cloud/storage/internal/connection_impl.cc +++ b/google/cloud/storage/internal/connection_impl.cc @@ -365,36 +365,39 @@ StatusOr StorageConnectionImpl::GetObjectMetadata( google::cloud::internal::CurrentOptions(), request, __func__); } -StatusOr> -StorageConnectionImpl::ReadObjectNotWrapped( - ReadObjectRangeRequest const& request, RetryPolicy& retry_policy, - BackoffPolicy& backoff_policy) { - auto const idempotency = current_idempotency_policy().IsIdempotent(request) - ? Idempotency::kIdempotent - : Idempotency::kNonIdempotent; - return RestRetryLoop( - retry_policy, backoff_policy, idempotency, - [token = MakeIdempotencyToken(), this]( - rest_internal::RestContext& context, Options const& options, - auto const& request) { - context.AddHeader(kIdempotencyTokenHeader, token); - return stub_->ReadObject(context, options, request); - }, - google::cloud::internal::CurrentOptions(), request, __func__); -} - StatusOr> StorageConnectionImpl::ReadObject( ReadObjectRangeRequest const& request) { - auto retry_policy = current_retry_policy(); - auto backoff_policy = current_backoff_policy(); - auto child = ReadObjectNotWrapped(request, *retry_policy, *backoff_policy); - if (!child) { - return child; - } + auto current = google::cloud::internal::SaveCurrentOptions(); + auto self = shared_from_this(); - return std::unique_ptr(new RetryObjectReadSource( - self, request, *std::move(child), std::move(retry_policy), - std::move(backoff_policy))); + auto const* where = __func__; + auto factory = [self = shared_from_this(), current, where]( + ReadObjectRangeRequest const& request, + RetryPolicy& retry_policy, BackoffPolicy& backoff_policy) { + auto const idempotency = + current->get()->IsIdempotent(request) + ? Idempotency::kIdempotent + : Idempotency::kNonIdempotent; + return RestRetryLoop( + retry_policy, backoff_policy, idempotency, + [self, token = self->MakeIdempotencyToken()]( + rest_internal::RestContext& context, Options const& options, + ReadObjectRangeRequest const& request) { + context.AddHeader(kIdempotencyTokenHeader, token); + return self->stub_->ReadObject(context, options, request); + }, + *current, request, where); + }; + + auto retry_policy = current->get()->clone(); + auto backoff_policy = current->get()->clone(); + auto child = factory(request, *retry_policy, *backoff_policy); + if (!child) return child; + + return std::unique_ptr( + std::make_unique( + std::move(factory), std::move(current), request, *std::move(child), + std::move(retry_policy), std::move(backoff_policy))); } StatusOr StorageConnectionImpl::ListObjects( diff --git a/google/cloud/storage/internal/connection_impl.h b/google/cloud/storage/internal/connection_impl.h index 4e48d7489e6f..cc63071f34ec 100644 --- a/google/cloud/storage/internal/connection_impl.h +++ b/google/cloud/storage/internal/connection_impl.h @@ -71,9 +71,6 @@ class StorageConnectionImpl StatusOr GetObjectMetadata( GetObjectMetadataRequest const& request) override; - /// Call ReadObject() but do not wrap the result in a RetryObjectReadSource. - StatusOr> ReadObjectNotWrapped( - ReadObjectRangeRequest const&, RetryPolicy&, BackoffPolicy&); StatusOr> ReadObject( ReadObjectRangeRequest const&) override; diff --git a/google/cloud/storage/internal/connection_impl_object_test.cc b/google/cloud/storage/internal/connection_impl_object_test.cc index 44c8af7ceecd..12e594fded65 100644 --- a/google/cloud/storage/internal/connection_impl_object_test.cc +++ b/google/cloud/storage/internal/connection_impl_object_test.cc @@ -134,7 +134,7 @@ TEST(StorageConnectionImpl, ReadObjectTooManyFailures) { StorageConnectionImpl::Create(std::move(mock), RetryTestOptions()); google::cloud::internal::OptionsSpan span(client->options()); auto response = client->ReadObject(ReadObjectRangeRequest()).status(); - EXPECT_THAT(response, StoppedOnTooManyTransients("ReadObjectNotWrapped")); + EXPECT_THAT(response, StoppedOnTooManyTransients("ReadObject")); EXPECT_THAT(transient.captured_tokens(), RetryLoopUsesSingleToken()); EXPECT_THAT(transient.captured_authority_options(), RetryLoopUsesOptions()); } @@ -148,7 +148,7 @@ TEST(StorageConnectionImpl, ReadObjectPermanentFailure) { StorageConnectionImpl::Create(std::move(mock), RetryTestOptions()); google::cloud::internal::OptionsSpan span(client->options()); auto response = client->ReadObject(ReadObjectRangeRequest()).status(); - EXPECT_THAT(response, StoppedOnPermanentError("ReadObjectNotWrapped")); + EXPECT_THAT(response, StoppedOnPermanentError("ReadObject")); EXPECT_THAT(permanent.captured_tokens(), RetryLoopUsesSingleToken()); EXPECT_THAT(permanent.captured_authority_options(), RetryLoopUsesOptions()); } diff --git a/google/cloud/storage/internal/retry_object_read_source.cc b/google/cloud/storage/internal/retry_object_read_source.cc index fddd8d03bdb2..0726713acbdf 100644 --- a/google/cloud/storage/internal/retry_object_read_source.cc +++ b/google/cloud/storage/internal/retry_object_read_source.cc @@ -25,7 +25,6 @@ namespace storage { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace internal { -using ::google::cloud::internal::CurrentOptions; using ::google::cloud::internal::OptionsSpan; std::uint64_t InitialOffset(OffsetDirection const& offset_direction, @@ -37,11 +36,14 @@ std::uint64_t InitialOffset(OffsetDirection const& offset_direction, } RetryObjectReadSource::RetryObjectReadSource( - std::shared_ptr connection, + ReadSourceFactory factory, + google::cloud::internal::ImmutableOptions options, ReadObjectRangeRequest request, std::unique_ptr child, std::unique_ptr retry_policy, - std::unique_ptr backoff_policy) - : connection_(std::move(connection)), + std::unique_ptr backoff_policy, + std::function backoff) + : factory_(std::move(factory)), + options_(std::move(options)), request_(std::move(request)), child_(std::move(child)), retry_policy_prototype_(std::move(retry_policy)), @@ -49,7 +51,19 @@ RetryObjectReadSource::RetryObjectReadSource( offset_direction_(request_.HasOption() ? kFromEnd : kFromBeginning), current_offset_(InitialOffset(offset_direction_, request_)), - span_options_(CurrentOptions()) {} + backoff_(std::move(backoff)) {} + +RetryObjectReadSource::RetryObjectReadSource( + ReadSourceFactory factory, + google::cloud::internal::ImmutableOptions options, + ReadObjectRangeRequest request, std::unique_ptr child, + std::unique_ptr retry_policy, + std::unique_ptr backoff_policy) + : RetryObjectReadSource( + std::move(factory), std::move(options), std::move(request), + std::move(child), std::move(retry_policy), std::move(backoff_policy), + [](std::chrono::milliseconds d) { std::this_thread::sleep_for(d); }) { +} StatusOr RetryObjectReadSource::Read(char* buf, std::size_t n) { @@ -74,7 +88,7 @@ StatusOr RetryObjectReadSource::Read(char* buf, auto retry_policy = retry_policy_prototype_->clone(); int counter = 0; for (; !result && retry_policy->OnFailure(result.status()); - std::this_thread::sleep_for(backoff_policy->OnCompletion()), + backoff_(backoff_policy->OnCompletion()), result = child_->Read(buf, n)) { // A Read() request failed, most likely that means the connection failed or // stalled. The current child might no longer be usable, so we will try to @@ -134,9 +148,8 @@ Status RetryObjectReadSource::MakeChild(RetryPolicy& retry_policy, return Status{}; }; - OptionsSpan const span(span_options_); - auto child = - connection_->ReadObjectNotWrapped(request_, retry_policy, backoff_policy); + OptionsSpan const span(options_); + auto child = factory_(request_, retry_policy, backoff_policy); if (!child) return std::move(child).status(); if (!is_gunzipped_) return on_success(*std::move(child)); @@ -148,7 +161,7 @@ Status RetryObjectReadSource::MakeChild(RetryPolicy& retry_policy, // Try again, eventually the retry policy will expire and this will fail. if (!retry_policy.OnFailure(child.status())) return std::move(child).status(); - std::this_thread::sleep_for(backoff_policy.OnCompletion()); + backoff_(backoff_policy.OnCompletion()); return MakeChild(retry_policy, backoff_policy); } diff --git a/google/cloud/storage/internal/retry_object_read_source.h b/google/cloud/storage/internal/retry_object_read_source.h index 6f5cebcf15da..e885b3019bf3 100644 --- a/google/cloud/storage/internal/retry_object_read_source.h +++ b/google/cloud/storage/internal/retry_object_read_source.h @@ -15,10 +15,14 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_RETRY_OBJECT_READ_SOURCE_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_RETRY_OBJECT_READ_SOURCE_H -#include "google/cloud/storage/internal/connection_impl.h" #include "google/cloud/storage/internal/object_read_source.h" +#include "google/cloud/storage/internal/object_requests.h" +#include "google/cloud/storage/retry_policy.h" #include "google/cloud/storage/version.h" +#include "google/cloud/options.h" #include "absl/types/optional.h" +#include +#include #include namespace google { @@ -39,7 +43,19 @@ enum OffsetDirection { kFromBeginning, kFromEnd }; */ class RetryObjectReadSource : public ObjectReadSource { public: - RetryObjectReadSource(std::shared_ptr connection, + using ReadSourceFactory = + std::function>( + ReadObjectRangeRequest const&, RetryPolicy&, BackoffPolicy&)>; + + RetryObjectReadSource(ReadSourceFactory factory, + google::cloud::internal::ImmutableOptions options, + ReadObjectRangeRequest request, + std::unique_ptr child, + std::unique_ptr retry_policy, + std::unique_ptr backoff_policy, + std::function backoff); + RetryObjectReadSource(ReadSourceFactory factory, + google::cloud::internal::ImmutableOptions options, ReadObjectRangeRequest request, std::unique_ptr child, std::unique_ptr retry_policy, @@ -55,7 +71,8 @@ class RetryObjectReadSource : public ObjectReadSource { StatusOr> ReadDiscard( std::unique_ptr child, std::int64_t count) const; - std::shared_ptr connection_; + ReadSourceFactory factory_; + google::cloud::internal::ImmutableOptions options_; ReadObjectRangeRequest request_; std::unique_ptr child_; absl::optional generation_; @@ -64,9 +81,7 @@ class RetryObjectReadSource : public ObjectReadSource { OffsetDirection offset_direction_; std::int64_t current_offset_ = 0; bool is_gunzipped_ = false; - // Capture the options in effect when the stream was created, to reuse as - // new requests are generated. - google::cloud::Options span_options_; + std::function backoff_; }; } // namespace internal