Skip to content

Commit

Permalink
feat(GCS+gRPC): support timeouts for all requests (#7299)
Browse files Browse the repository at this point in the history
This change makes the gRPC plugin support timeouts for all requests,
just like we do in REST.  The REST timeouts are more clever, based on
progress vs. completion, I am planning to create a separate bug to
improve GCS+gRPC and support progress-based timeouts too.
  • Loading branch information
coryan authored Sep 10, 2021
1 parent c5cd4bc commit 7ee1d91
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
43 changes: 28 additions & 15 deletions google/cloud/storage/internal/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ std::shared_ptr<StorageStub> CreateStorageStub(CompletionQueue cq,
return stub;
}

std::shared_ptr<GrpcClient> GrpcClient::Create(Options const& opts) {
std::shared_ptr<GrpcClient> GrpcClient::Create(Options opts) {
// Cannot use std::make_shared<> as the constructor is private.
return std::shared_ptr<GrpcClient>(new GrpcClient(opts));
return std::shared_ptr<GrpcClient>(new GrpcClient(std::move(opts)));
}

std::shared_ptr<GrpcClient> GrpcClient::CreateMock(
Expand All @@ -156,27 +156,37 @@ std::shared_ptr<GrpcClient> GrpcClient::CreateMock(
new GrpcClient(std::move(stub), DefaultOptionsGrpc(std::move(opts))));
}

GrpcClient::GrpcClient(Options const& opts)
: backwards_compatibility_options_(
MakeBackwardsCompatibleClientOptions(opts)),
background_(MakeBackgroundThreadsFactory(opts)()),
stub_(CreateStorageStub(background_->cq(), opts)) {}
GrpcClient::GrpcClient(Options opts)
: options_(std::move(opts)),
backwards_compatibility_options_(
MakeBackwardsCompatibleClientOptions(options_)),
background_(MakeBackgroundThreadsFactory(options_)()),
stub_(CreateStorageStub(background_->cq(), options_)) {}

GrpcClient::GrpcClient(std::shared_ptr<StorageStub> stub, Options const& opts)
: backwards_compatibility_options_(
MakeBackwardsCompatibleClientOptions(opts)),
background_(MakeBackgroundThreadsFactory(opts)()),
GrpcClient::GrpcClient(std::shared_ptr<StorageStub> stub, Options opts)
: options_(std::move(opts)),
backwards_compatibility_options_(
MakeBackwardsCompatibleClientOptions(options_)),
background_(MakeBackgroundThreadsFactory(options_)()),
stub_(std::move(stub)) {}

std::unique_ptr<GrpcClient::WriteObjectStream> GrpcClient::CreateUploadWriter(
std::unique_ptr<grpc::ClientContext> context) {
auto const timeout = options_.get<TransferStallTimeoutOption>();
if (timeout.count() != 0) {
context->set_deadline(std::chrono::system_clock::now() + timeout);
}
return stub_->WriteObject(std::move(context));
}

StatusOr<ResumableUploadResponse> GrpcClient::QueryResumableUpload(
QueryResumableUploadRequest const& request) {
grpc::ClientContext context;
ApplyQueryParameters(context, request, "resource");
auto const timeout = options_.get<TransferStallTimeoutOption>();
if (timeout.count() != 0) {
context.set_deadline(std::chrono::system_clock::now() + timeout);
}
auto status = stub_->QueryWriteStatus(context, ToProto(request));
if (!status) return std::move(status).status();

Expand Down Expand Up @@ -341,10 +351,9 @@ StatusOr<std::unique_ptr<ObjectReadSource>> GrpcClient::ReadObject(
}
auto context = absl::make_unique<grpc::ClientContext>();
ApplyQueryParameters(*context, request);
if (backwards_compatibility_options_.download_stall_timeout().count() != 0) {
context->set_deadline(
std::chrono::system_clock::now() +
backwards_compatibility_options_.download_stall_timeout());
auto const timeout = options_.get<TransferStallTimeoutOption>();
if (timeout.count() != 0) {
context->set_deadline(std::chrono::system_clock::now() + timeout);
}
auto proto_request = ToProto(request);
if (!proto_request) return std::move(proto_request).status();
Expand Down Expand Up @@ -392,6 +401,10 @@ GrpcClient::CreateResumableSession(ResumableUploadRequest const& request) {

grpc::ClientContext context;
ApplyQueryParameters(context, request, "resource");
auto const timeout = options_.get<TransferStallTimeoutOption>();
if (timeout.count() != 0) {
context.set_deadline(std::chrono::system_clock::now() + timeout);
}
auto response = stub_->StartResumableWrite(context, *proto_request);
if (!response.ok()) return std::move(response).status();

Expand Down
8 changes: 5 additions & 3 deletions google/cloud/storage/internal/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class StorageStub;
class GrpcClient : public RawClient,
public std::enable_shared_from_this<GrpcClient> {
public:
static std::shared_ptr<GrpcClient> Create(Options const& opts);
// Creates a new instance, assumes the options have all default values set.
static std::shared_ptr<GrpcClient> Create(Options opts);

// This is used to create a client from a mocked StorageStub.
static std::shared_ptr<GrpcClient> CreateMock(
Expand Down Expand Up @@ -217,10 +218,11 @@ class GrpcClient : public RawClient,
static std::string ComputeMD5Hash(std::string const& payload);

protected:
explicit GrpcClient(Options const& opts);
explicit GrpcClient(std::shared_ptr<StorageStub> stub, Options const& opts);
explicit GrpcClient(Options opts);
explicit GrpcClient(std::shared_ptr<StorageStub> stub, Options opts);

private:
Options options_;
ClientOptions backwards_compatibility_options_;
std::unique_ptr<google::cloud::BackgroundThreads> background_;
std::shared_ptr<StorageStub> stub_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ GrpcResumableUploadSession::last_response() const {

StatusOr<ResumableUploadResponse> GrpcResumableUploadSession::UploadGeneric(
ConstBufferSequence buffers, bool final_chunk, HashValues const& hashes) {
// TODO(#4216) - set the timeout
auto context = absl::make_unique<grpc::ClientContext>();
ApplyQueryParameters(*context, request_, "resource");
auto writer = client_->CreateUploadWriter(std::move(context));
Expand Down

0 comments on commit 7ee1d91

Please sign in to comment.