From 00fc96c41729be265e65e4b00884684ac2982aa0 Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Mon, 11 Oct 2021 22:30:23 +0000 Subject: [PATCH 1/3] feat(common): GUAC for async read-write streams --- google/cloud/CMakeLists.txt | 2 + google/cloud/google_cloud_cpp_grpc_utils.bzl | 1 + ...google_cloud_cpp_grpc_utils_unit_tests.bzl | 1 + .../internal/async_read_write_stream_auth.h | 103 ++++++++++++++++++ .../async_read_write_stream_auth_test.cc | 93 ++++++++++++++++ .../cloud/pubsub/internal/subscriber_auth.cc | 21 ++-- .../pubsub/internal/subscriber_auth_test.cc | 4 +- 7 files changed, 216 insertions(+), 9 deletions(-) create mode 100644 google/cloud/internal/async_read_write_stream_auth.h create mode 100644 google/cloud/internal/async_read_write_stream_auth_test.cc diff --git a/google/cloud/CMakeLists.txt b/google/cloud/CMakeLists.txt index 1c9d7da144968..a17773cee7b2d 100644 --- a/google/cloud/CMakeLists.txt +++ b/google/cloud/CMakeLists.txt @@ -402,6 +402,7 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC) internal/async_polling_loop.cc internal/async_polling_loop.h internal/async_read_stream_impl.h + internal/async_read_write_stream_auth.h internal/async_read_write_stream_impl.h internal/async_retry_loop.h internal/async_retry_unary_rpc.h @@ -541,6 +542,7 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC) internal/async_connection_ready_test.cc internal/async_long_running_operation_test.cc internal/async_polling_loop_test.cc + internal/async_read_write_stream_auth_test.cc internal/async_read_write_stream_impl_test.cc internal/async_retry_loop_test.cc internal/async_retry_unary_rpc_test.cc diff --git a/google/cloud/google_cloud_cpp_grpc_utils.bzl b/google/cloud/google_cloud_cpp_grpc_utils.bzl index a24898e0591a5..2e87e7ded5fe5 100644 --- a/google/cloud/google_cloud_cpp_grpc_utils.bzl +++ b/google/cloud/google_cloud_cpp_grpc_utils.bzl @@ -32,6 +32,7 @@ google_cloud_cpp_grpc_utils_hdrs = [ "internal/async_long_running_operation.h", "internal/async_polling_loop.h", "internal/async_read_stream_impl.h", + "internal/async_read_write_stream_auth.h", "internal/async_read_write_stream_impl.h", "internal/async_retry_loop.h", "internal/async_retry_unary_rpc.h", diff --git a/google/cloud/google_cloud_cpp_grpc_utils_unit_tests.bzl b/google/cloud/google_cloud_cpp_grpc_utils_unit_tests.bzl index f8834dd60d80b..90d0f46d270e1 100644 --- a/google/cloud/google_cloud_cpp_grpc_utils_unit_tests.bzl +++ b/google/cloud/google_cloud_cpp_grpc_utils_unit_tests.bzl @@ -24,6 +24,7 @@ google_cloud_cpp_grpc_utils_unit_tests = [ "internal/async_connection_ready_test.cc", "internal/async_long_running_operation_test.cc", "internal/async_polling_loop_test.cc", + "internal/async_read_write_stream_auth_test.cc", "internal/async_read_write_stream_impl_test.cc", "internal/async_retry_loop_test.cc", "internal/async_retry_unary_rpc_test.cc", diff --git a/google/cloud/internal/async_read_write_stream_auth.h b/google/cloud/internal/async_read_write_stream_auth.h new file mode 100644 index 0000000000000..0cba25bac7d9e --- /dev/null +++ b/google/cloud/internal/async_read_write_stream_auth.h @@ -0,0 +1,103 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_AUTH_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_AUTH_H + +#include "google/cloud/internal/async_read_write_stream_impl.h" +#include "google/cloud/internal/unified_grpc_credentials.h" +#include "google/cloud/version.h" +#include +#include + +namespace google { +namespace cloud { +inline namespace GOOGLE_CLOUD_CPP_NS { +namespace internal { + +template +class AsyncStreamingReadWriteRpcAuth + : public AsyncStreamingReadWriteRpc { + public: + using StreamFactory = std::function< + std::unique_ptr>( + std::unique_ptr)>; + + AsyncStreamingReadWriteRpcAuth( + std::unique_ptr context, + std::shared_ptr auth, StreamFactory factory) + : context_(std::move(context)), + auth_(std::move(auth)), + factory_(std::move(factory)) {} + + void Cancel() override { + if (context_) return context_->TryCancel(); + if (stream_) return stream_->Cancel(); + } + + future Start() override { + using Result = StatusOr>; + + return auth_->AsyncConfigureContext(std::move(context_)) + .then([this](future f) mutable { + auto context = f.get(); + if (!context) { + stream_ = absl::make_unique< + AsyncStreamingReadWriteRpcError>( + std::move(context).status()); + return make_ready_future(false); + } + stream_ = factory_(*std::move(context)); + return stream_->Start(); + }); + } + + future> Read() override { + if (!stream_) return make_ready_future(absl::optional{}); + return stream_->Read(); + } + + future Write(Request const& request, + grpc::WriteOptions options) override { + if (!stream_) return make_ready_future(false); + return stream_->Write(request, std::move(options)); + } + + future WritesDone() override { + if (!stream_) return make_ready_future(false); + return stream_->WritesDone(); + } + + future Finish() override { + if (!stream_) { + return make_ready_future( + Status(StatusCode::kInvalidArgument, + "uninitialized GrpcReadWriteStreamAuth<>")); + } + return stream_->Finish(); + } + + private: + std::unique_ptr context_; + std::shared_ptr auth_; + StreamFactory factory_; + std::unique_ptr> stream_; +}; + +} // namespace internal +} // namespace GOOGLE_CLOUD_CPP_NS +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_AUTH_H diff --git a/google/cloud/internal/async_read_write_stream_auth_test.cc b/google/cloud/internal/async_read_write_stream_auth_test.cc new file mode 100644 index 0000000000000..08c3e17aabcb6 --- /dev/null +++ b/google/cloud/internal/async_read_write_stream_auth_test.cc @@ -0,0 +1,93 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/internal/async_read_write_stream_auth.h" +#include "google/cloud/completion_queue.h" +#include "google/cloud/testing_util/mock_grpc_authentication_strategy.h" +#include "google/cloud/testing_util/status_matchers.h" +#include "absl/memory/memory.h" +#include +#include +#include + +namespace google { +namespace cloud { +inline namespace GOOGLE_CLOUD_CPP_NS { +namespace internal { +namespace { + +using ::google::cloud::testing_util::IsOk; +using ::google::cloud::testing_util::MockAuthenticationStrategy; + +struct FakeRequest { + std::string key; +}; + +struct FakeResponse { + std::string key; + std::string value; +}; + +using BaseStream = AsyncStreamingReadWriteRpc; +using AuthStream = AsyncStreamingReadWriteRpcAuth; + +class MockStream : public BaseStream { + public: + MOCK_METHOD(void, Cancel, (), (override)); + MOCK_METHOD(future, Start, (), (override)); + MOCK_METHOD(future>, Read, (), (override)); + MOCK_METHOD(future, Write, (FakeRequest const&, grpc::WriteOptions), + (override)); + MOCK_METHOD(future, WritesDone, (), (override)); + MOCK_METHOD(future, Finish, (), (override)); +}; + +TEST(AsyncStreamReadWriteAuth, Start) { + auto factory = [](std::unique_ptr) { + auto mock = absl::make_unique(); + EXPECT_CALL(*mock, Start).WillOnce([] { return make_ready_future(true); }); + EXPECT_CALL(*mock, Write).WillOnce([] { return make_ready_future(true); }); + EXPECT_CALL(*mock, Read).WillOnce([] { + return make_ready_future(absl::make_optional(FakeResponse{"k0", "v0"})); + }); + EXPECT_CALL(*mock, WritesDone).WillOnce([] { + return make_ready_future(true); + }); + EXPECT_CALL(*mock, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + return std::unique_ptr(std::move(mock)); + }; + auto strategy = std::make_shared(); + EXPECT_CALL(*strategy, AsyncConfigureContext) + .WillOnce([](std::unique_ptr context) { + return make_ready_future(make_status_or(std::move(context))); + }); + auto uut = std::make_shared( + absl::make_unique(), strategy, factory); + EXPECT_TRUE(uut->Start().get()); + EXPECT_TRUE(uut->Write(FakeRequest{"k"}, grpc::WriteOptions()).get()); + auto response = uut->Read().get(); + ASSERT_TRUE(response.has_value()); + EXPECT_EQ(response->key, "k0"); + EXPECT_EQ(response->value, "v0"); + EXPECT_TRUE(uut->WritesDone().get()); + EXPECT_THAT(uut->Finish().get(), IsOk()); +} + +} // namespace +} // namespace internal +} // namespace GOOGLE_CLOUD_CPP_NS +} // namespace cloud +} // namespace google diff --git a/google/cloud/pubsub/internal/subscriber_auth.cc b/google/cloud/pubsub/internal/subscriber_auth.cc index 9cd4e6a23b675..8a031b456e8cc 100644 --- a/google/cloud/pubsub/internal/subscriber_auth.cc +++ b/google/cloud/pubsub/internal/subscriber_auth.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/pubsub/internal/subscriber_auth.h" +#include "google/cloud/internal/async_read_write_stream_auth.h" namespace google { namespace cloud { @@ -73,14 +74,18 @@ SubscriberAuth::AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, google::pubsub::v1::StreamingPullRequest const& request) { - using ErrorStream = - ::google::cloud::internal::AsyncStreamingReadWriteRpcError< - google::pubsub::v1::StreamingPullRequest, - google::pubsub::v1::StreamingPullResponse>; - - auto status = auth_->ConfigureContext(*context); - if (!status.ok()) return absl::make_unique(std::move(status)); - return child_->AsyncStreamingPull(cq, std::move(context), request); + using StreamAuth = google::cloud::internal::AsyncStreamingReadWriteRpcAuth< + google::pubsub::v1::StreamingPullRequest, + google::pubsub::v1::StreamingPullResponse>; + + auto child = child_; + auto call = [child, cq, + request](std::unique_ptr ctx) mutable { + return child->AsyncStreamingPull(cq, std::move(ctx), request); + }; + auto factory = StreamAuth::StreamFactory(std::move(call)); + return absl::make_unique(std::move(context), auth_, + std::move(factory)); } future SubscriberAuth::AsyncAcknowledge( diff --git a/google/cloud/pubsub/internal/subscriber_auth_test.cc b/google/cloud/pubsub/internal/subscriber_auth_test.cc index fe73a6bd567d2..953dc26f02feb 100644 --- a/google/cloud/pubsub/internal/subscriber_auth_test.cc +++ b/google/cloud/pubsub/internal/subscriber_auth_test.cc @@ -140,16 +140,18 @@ TEST(SubscriberAuthTest, AsyncStreamingPull) { return absl::make_unique( Status(StatusCode::kPermissionDenied, "uh-oh")); }); - auto under_test = SubscriberAuth(MakeTypicalMockAuth(), mock); + auto under_test = SubscriberAuth(MakeTypicalAsyncMockAuth(), mock); google::cloud::CompletionQueue cq; google::pubsub::v1::StreamingPullRequest request; auto auth_failure = under_test.AsyncStreamingPull( cq, absl::make_unique(), request); + ASSERT_FALSE(auth_failure->Start().get()); EXPECT_THAT(auth_failure->Finish().get(), StatusIs(StatusCode::kInvalidArgument)); auto auth_success = under_test.AsyncStreamingPull( cq, absl::make_unique(), request); + ASSERT_FALSE(auth_success->Start().get()); EXPECT_THAT(auth_success->Finish().get(), StatusIs(StatusCode::kPermissionDenied)); } From e7f7859e3f30ec3879e5581f6f1f374c0cfadd1f Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Mon, 11 Oct 2021 23:10:27 +0000 Subject: [PATCH 2/3] Fix smelly code with a 'this' pointer --- .../internal/async_read_write_stream_auth.h | 34 ++++++++++++------- ...treaming_subscription_batch_source_test.cc | 2 +- .../cloud/pubsub/internal/subscriber_auth.cc | 6 ++-- .../cloud/pubsub/internal/subscriber_auth.h | 2 +- .../pubsub/internal/subscriber_logging.cc | 8 ++--- .../pubsub/internal/subscriber_logging.h | 6 ++-- .../pubsub/internal/subscriber_metadata.cc | 2 +- .../pubsub/internal/subscriber_metadata.h | 2 +- .../pubsub/internal/subscriber_round_robin.cc | 2 +- .../pubsub/internal/subscriber_round_robin.h | 2 +- .../cloud/pubsub/internal/subscriber_stub.cc | 2 +- .../cloud/pubsub/internal/subscriber_stub.h | 2 +- .../pubsub/testing/mock_subscriber_stub.h | 2 +- 13 files changed, 40 insertions(+), 32 deletions(-) diff --git a/google/cloud/internal/async_read_write_stream_auth.h b/google/cloud/internal/async_read_write_stream_auth.h index 0cba25bac7d9e..b9aeb6c842a47 100644 --- a/google/cloud/internal/async_read_write_stream_auth.h +++ b/google/cloud/internal/async_read_write_stream_auth.h @@ -28,10 +28,12 @@ namespace internal { template class AsyncStreamingReadWriteRpcAuth - : public AsyncStreamingReadWriteRpc { + : public AsyncStreamingReadWriteRpc, + public std::enable_shared_from_this< + AsyncStreamingReadWriteRpcAuth> { public: using StreamFactory = std::function< - std::unique_ptr>( + std::shared_ptr>( std::unique_ptr)>; AsyncStreamingReadWriteRpcAuth( @@ -49,17 +51,12 @@ class AsyncStreamingReadWriteRpcAuth future Start() override { using Result = StatusOr>; + auto weak = + std::weak_ptr(this->shared_from_this()); return auth_->AsyncConfigureContext(std::move(context_)) - .then([this](future f) mutable { - auto context = f.get(); - if (!context) { - stream_ = absl::make_unique< - AsyncStreamingReadWriteRpcError>( - std::move(context).status()); - return make_ready_future(false); - } - stream_ = factory_(*std::move(context)); - return stream_->Start(); + .then([weak](future f) mutable { + if (auto self = weak.lock()) return self->OnStart(f.get()); + return make_ready_future(false); }); } @@ -89,10 +86,21 @@ class AsyncStreamingReadWriteRpcAuth } private: + future OnStart(StatusOr> context) { + if (!context) { + stream_ = + absl::make_unique>( + std::move(context).status()); + return make_ready_future(false); + } + stream_ = factory_(*std::move(context)); + return stream_->Start(); + } + std::unique_ptr context_; std::shared_ptr auth_; StreamFactory factory_; - std::unique_ptr> stream_; + std::shared_ptr> stream_; }; } // namespace internal diff --git a/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc b/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc index 15333d4b21c00..daf82cab659ad 100644 --- a/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc +++ b/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc @@ -325,7 +325,7 @@ TEST(StreamingSubscriptionBatchSourceTest, StartUnexpected) { .WillRepeatedly([](google::cloud::CompletionQueue&, std::unique_ptr, google::pubsub::v1::StreamingPullRequest const&) { - return std::unique_ptr{}; + return std::shared_ptr{}; }); auto shutdown = std::make_shared(); diff --git a/google/cloud/pubsub/internal/subscriber_auth.cc b/google/cloud/pubsub/internal/subscriber_auth.cc index 8a031b456e8cc..abf6f997376a7 100644 --- a/google/cloud/pubsub/internal/subscriber_auth.cc +++ b/google/cloud/pubsub/internal/subscriber_auth.cc @@ -69,7 +69,7 @@ Status SubscriberAuth::ModifyPushConfig( return child_->ModifyPushConfig(context, request); } -std::unique_ptr +std::shared_ptr SubscriberAuth::AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, @@ -84,8 +84,8 @@ SubscriberAuth::AsyncStreamingPull( return child->AsyncStreamingPull(cq, std::move(ctx), request); }; auto factory = StreamAuth::StreamFactory(std::move(call)); - return absl::make_unique(std::move(context), auth_, - std::move(factory)); + return std::make_shared(std::move(context), auth_, + std::move(factory)); } future SubscriberAuth::AsyncAcknowledge( diff --git a/google/cloud/pubsub/internal/subscriber_auth.h b/google/cloud/pubsub/internal/subscriber_auth.h index 78265bc8d7a66..ddc0094f744b1 100644 --- a/google/cloud/pubsub/internal/subscriber_auth.h +++ b/google/cloud/pubsub/internal/subscriber_auth.h @@ -56,7 +56,7 @@ class SubscriberAuth : public SubscriberStub { grpc::ClientContext& context, google::pubsub::v1::ModifyPushConfigRequest const& request) override; - std::unique_ptr AsyncStreamingPull( + std::shared_ptr AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, google::pubsub::v1::StreamingPullRequest const& request) override; diff --git a/google/cloud/pubsub/internal/subscriber_logging.cc b/google/cloud/pubsub/internal/subscriber_logging.cc index 321e6a176bcfd..5df9bf8cfe542 100644 --- a/google/cloud/pubsub/internal/subscriber_logging.cc +++ b/google/cloud/pubsub/internal/subscriber_logging.cc @@ -92,7 +92,7 @@ Status SubscriberLogging::ModifyPushConfig( context, request, __func__, tracing_options_); } -std::unique_ptr +std::shared_ptr SubscriberLogging::AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, @@ -103,8 +103,8 @@ SubscriberLogging::AsyncStreamingPull( << " << request=" << DebugString(request, tracing_options_); auto stream = child_->AsyncStreamingPull(cq, std::move(context), request); if (!trace_streams_) return stream; - return absl::make_unique( - std::move(stream), tracing_options_, request_id); + return std::make_shared(std::move(stream), + tracing_options_, request_id); } future SubscriberLogging::AsyncAcknowledge( @@ -201,7 +201,7 @@ StatusOr SubscriberLogging::Seek( } LoggingAsyncPullStream::LoggingAsyncPullStream( - std::unique_ptr child, + std::shared_ptr child, TracingOptions tracing_options, std::string request_id) : child_(std::move(child)), tracing_options_(std::move(tracing_options)), diff --git a/google/cloud/pubsub/internal/subscriber_logging.h b/google/cloud/pubsub/internal/subscriber_logging.h index 106faf4acc6f6..d8e856e7106e9 100644 --- a/google/cloud/pubsub/internal/subscriber_logging.h +++ b/google/cloud/pubsub/internal/subscriber_logging.h @@ -58,7 +58,7 @@ class SubscriberLogging : public SubscriberStub { grpc::ClientContext& context, google::pubsub::v1::ModifyPushConfigRequest const& request) override; - std::unique_ptr AsyncStreamingPull( + std::shared_ptr AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, google::pubsub::v1::StreamingPullRequest const& request) override; @@ -105,7 +105,7 @@ class SubscriberLogging : public SubscriberStub { class LoggingAsyncPullStream : public SubscriberStub::AsyncPullStream { public: - LoggingAsyncPullStream(std::unique_ptr child, + LoggingAsyncPullStream(std::shared_ptr child, TracingOptions tracing_options, std::string request_id); @@ -119,7 +119,7 @@ class LoggingAsyncPullStream : public SubscriberStub::AsyncPullStream { future Finish() override; private: - std::unique_ptr child_; + std::shared_ptr child_; TracingOptions tracing_options_; std::string request_id_; }; diff --git a/google/cloud/pubsub/internal/subscriber_metadata.cc b/google/cloud/pubsub/internal/subscriber_metadata.cc index 7c335d4a4464f..936878522949f 100644 --- a/google/cloud/pubsub/internal/subscriber_metadata.cc +++ b/google/cloud/pubsub/internal/subscriber_metadata.cc @@ -70,7 +70,7 @@ Status SubscriberMetadata::ModifyPushConfig( return child_->ModifyPushConfig(context, request); } -std::unique_ptr +std::shared_ptr SubscriberMetadata::AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, diff --git a/google/cloud/pubsub/internal/subscriber_metadata.h b/google/cloud/pubsub/internal/subscriber_metadata.h index 2a510598b9f63..1a48a0beb96dd 100644 --- a/google/cloud/pubsub/internal/subscriber_metadata.h +++ b/google/cloud/pubsub/internal/subscriber_metadata.h @@ -54,7 +54,7 @@ class SubscriberMetadata : public SubscriberStub { grpc::ClientContext& context, google::pubsub::v1::ModifyPushConfigRequest const& request) override; - std::unique_ptr AsyncStreamingPull( + std::shared_ptr AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, google::pubsub::v1::StreamingPullRequest const& request) override; diff --git a/google/cloud/pubsub/internal/subscriber_round_robin.cc b/google/cloud/pubsub/internal/subscriber_round_robin.cc index 2ecab08f786ff..eaf760f40ad3c 100644 --- a/google/cloud/pubsub/internal/subscriber_round_robin.cc +++ b/google/cloud/pubsub/internal/subscriber_round_robin.cc @@ -59,7 +59,7 @@ Status SubscriberRoundRobin::ModifyPushConfig( return Child()->ModifyPushConfig(context, request); } -std::unique_ptr +std::shared_ptr SubscriberRoundRobin::AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, diff --git a/google/cloud/pubsub/internal/subscriber_round_robin.h b/google/cloud/pubsub/internal/subscriber_round_robin.h index 055eb918de1c0..32f7bdf03bd5f 100644 --- a/google/cloud/pubsub/internal/subscriber_round_robin.h +++ b/google/cloud/pubsub/internal/subscriber_round_robin.h @@ -56,7 +56,7 @@ class SubscriberRoundRobin : public SubscriberStub { grpc::ClientContext& context, google::pubsub::v1::ModifyPushConfigRequest const& request) override; - std::unique_ptr AsyncStreamingPull( + std::shared_ptr AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, google::pubsub::v1::StreamingPullRequest const& request) override; diff --git a/google/cloud/pubsub/internal/subscriber_stub.cc b/google/cloud/pubsub/internal/subscriber_stub.cc index ede9b102a6843..1e256c65af40e 100644 --- a/google/cloud/pubsub/internal/subscriber_stub.cc +++ b/google/cloud/pubsub/internal/subscriber_stub.cc @@ -84,7 +84,7 @@ class DefaultSubscriberStub : public SubscriberStub { return {}; } - std::unique_ptr AsyncStreamingPull( + std::shared_ptr AsyncStreamingPull( google::cloud::CompletionQueue& cq, std::unique_ptr context, google::pubsub::v1::StreamingPullRequest const&) override { diff --git a/google/cloud/pubsub/internal/subscriber_stub.h b/google/cloud/pubsub/internal/subscriber_stub.h index 2a76cb0b3e53e..0c34085bbe1b2 100644 --- a/google/cloud/pubsub/internal/subscriber_stub.h +++ b/google/cloud/pubsub/internal/subscriber_stub.h @@ -76,7 +76,7 @@ class SubscriberStub { google::pubsub::v1::StreamingPullResponse>; /// Start a bi-directional stream to read messages and send ack/nacks. - virtual std::unique_ptr AsyncStreamingPull( + virtual std::shared_ptr AsyncStreamingPull( google::cloud::CompletionQueue&, std::unique_ptr, google::pubsub::v1::StreamingPullRequest const& request) = 0; diff --git a/google/cloud/pubsub/testing/mock_subscriber_stub.h b/google/cloud/pubsub/testing/mock_subscriber_stub.h index 1dfffeef93698..786ac6cf24d6d 100644 --- a/google/cloud/pubsub/testing/mock_subscriber_stub.h +++ b/google/cloud/pubsub/testing/mock_subscriber_stub.h @@ -61,7 +61,7 @@ class MockSubscriberStub : public pubsub_internal::SubscriberStub { google::pubsub::v1::ModifyPushConfigRequest const& request), (override)); - MOCK_METHOD(std::unique_ptr, + MOCK_METHOD(std::shared_ptr, AsyncStreamingPull, (google::cloud::CompletionQueue&, std::unique_ptr, From 4eacd41a42db80207bee36e60a2b0df1a4dea86d Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Mon, 11 Oct 2021 23:11:19 +0000 Subject: [PATCH 3/3] Fix build problems with (I think) older gtest --- google/cloud/internal/async_read_write_stream_auth_test.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/google/cloud/internal/async_read_write_stream_auth_test.cc b/google/cloud/internal/async_read_write_stream_auth_test.cc index 08c3e17aabcb6..fbc97fc7a34ab 100644 --- a/google/cloud/internal/async_read_write_stream_auth_test.cc +++ b/google/cloud/internal/async_read_write_stream_auth_test.cc @@ -57,7 +57,10 @@ TEST(AsyncStreamReadWriteAuth, Start) { auto factory = [](std::unique_ptr) { auto mock = absl::make_unique(); EXPECT_CALL(*mock, Start).WillOnce([] { return make_ready_future(true); }); - EXPECT_CALL(*mock, Write).WillOnce([] { return make_ready_future(true); }); + EXPECT_CALL(*mock, Write) + .WillOnce([](FakeRequest const&, grpc::WriteOptions) { + return make_ready_future(true); + }); EXPECT_CALL(*mock, Read).WillOnce([] { return make_ready_future(absl::make_optional(FakeResponse{"k0", "v0"})); });