Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(GCS+gRPC): support timeouts for all requests #7299

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions google/cloud/storage/internal/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ std::shared_ptr<StorageStub> CreateStorageStub(CompletionQueue cq,
return stub;
}

std::shared_ptr<GrpcClient> GrpcClient::Create(Options const& opts) {
std::shared_ptr<GrpcClient> GrpcClient::Create(Options opts) {
// Cannot use std::make_shared<> as the constructor is private.
return std::shared_ptr<GrpcClient>(new GrpcClient(opts));
return std::shared_ptr<GrpcClient>(new GrpcClient(std::move(opts)));
}

std::shared_ptr<GrpcClient> GrpcClient::CreateMock(
Expand All @@ -156,27 +156,37 @@ std::shared_ptr<GrpcClient> GrpcClient::CreateMock(
new GrpcClient(std::move(stub), DefaultOptionsGrpc(std::move(opts))));
}

GrpcClient::GrpcClient(Options const& opts)
: backwards_compatibility_options_(
MakeBackwardsCompatibleClientOptions(opts)),
background_(MakeBackgroundThreadsFactory(opts)()),
stub_(CreateStorageStub(background_->cq(), opts)) {}
GrpcClient::GrpcClient(Options opts)
: options_(std::move(opts)),
backwards_compatibility_options_(
MakeBackwardsCompatibleClientOptions(options_)),
background_(MakeBackgroundThreadsFactory(options_)()),
stub_(CreateStorageStub(background_->cq(), options_)) {}

GrpcClient::GrpcClient(std::shared_ptr<StorageStub> stub, Options const& opts)
: backwards_compatibility_options_(
MakeBackwardsCompatibleClientOptions(opts)),
background_(MakeBackgroundThreadsFactory(opts)()),
GrpcClient::GrpcClient(std::shared_ptr<StorageStub> stub, Options opts)
: options_(std::move(opts)),
backwards_compatibility_options_(
MakeBackwardsCompatibleClientOptions(options_)),
background_(MakeBackgroundThreadsFactory(options_)()),
stub_(std::move(stub)) {}

std::unique_ptr<GrpcClient::WriteObjectStream> GrpcClient::CreateUploadWriter(
std::unique_ptr<grpc::ClientContext> context) {
auto const timeout = options_.get<TransferStallTimeoutOption>();
if (timeout.count() != 0) {
context->set_deadline(std::chrono::system_clock::now() + timeout);
}
return stub_->WriteObject(std::move(context));
}

StatusOr<ResumableUploadResponse> GrpcClient::QueryResumableUpload(
QueryResumableUploadRequest const& request) {
grpc::ClientContext context;
ApplyQueryParameters(context, request, "resource");
auto const timeout = options_.get<TransferStallTimeoutOption>();
if (timeout.count() != 0) {
context.set_deadline(std::chrono::system_clock::now() + timeout);
}
auto status = stub_->QueryWriteStatus(context, ToProto(request));
if (!status) return std::move(status).status();

Expand Down Expand Up @@ -341,10 +351,9 @@ StatusOr<std::unique_ptr<ObjectReadSource>> GrpcClient::ReadObject(
}
auto context = absl::make_unique<grpc::ClientContext>();
ApplyQueryParameters(*context, request);
if (backwards_compatibility_options_.download_stall_timeout().count() != 0) {
context->set_deadline(
std::chrono::system_clock::now() +
backwards_compatibility_options_.download_stall_timeout());
auto const timeout = options_.get<TransferStallTimeoutOption>();
if (timeout.count() != 0) {
context->set_deadline(std::chrono::system_clock::now() + timeout);
}
auto proto_request = ToProto(request);
if (!proto_request) return std::move(proto_request).status();
Expand Down Expand Up @@ -392,6 +401,10 @@ GrpcClient::CreateResumableSession(ResumableUploadRequest const& request) {

grpc::ClientContext context;
ApplyQueryParameters(context, request, "resource");
auto const timeout = options_.get<TransferStallTimeoutOption>();
if (timeout.count() != 0) {
context.set_deadline(std::chrono::system_clock::now() + timeout);
}
auto response = stub_->StartResumableWrite(context, *proto_request);
if (!response.ok()) return std::move(response).status();

Expand Down
8 changes: 5 additions & 3 deletions google/cloud/storage/internal/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class StorageStub;
class GrpcClient : public RawClient,
public std::enable_shared_from_this<GrpcClient> {
public:
static std::shared_ptr<GrpcClient> Create(Options const& opts);
// Creates a new instance, assumes the options have all default values set.
static std::shared_ptr<GrpcClient> Create(Options opts);

// This is used to create a client from a mocked StorageStub.
static std::shared_ptr<GrpcClient> CreateMock(
Expand Down Expand Up @@ -217,10 +218,11 @@ class GrpcClient : public RawClient,
static std::string ComputeMD5Hash(std::string const& payload);

protected:
explicit GrpcClient(Options const& opts);
explicit GrpcClient(std::shared_ptr<StorageStub> stub, Options const& opts);
explicit GrpcClient(Options opts);
explicit GrpcClient(std::shared_ptr<StorageStub> stub, Options opts);

private:
Options options_;
ClientOptions backwards_compatibility_options_;
std::unique_ptr<google::cloud::BackgroundThreads> background_;
std::shared_ptr<StorageStub> stub_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ GrpcResumableUploadSession::last_response() const {

StatusOr<ResumableUploadResponse> GrpcResumableUploadSession::UploadGeneric(
ConstBufferSequence buffers, bool final_chunk, HashValues const& hashes) {
// TODO(#4216) - set the timeout
auto context = absl::make_unique<grpc::ClientContext>();
ApplyQueryParameters(*context, request_, "resource");
auto writer = client_->CreateUploadWriter(std::move(context));
Expand Down