diff --git a/google/cloud/pubsub/CMakeLists.txt b/google/cloud/pubsub/CMakeLists.txt index 42273a441496a..5635d8a21c0e8 100644 --- a/google/cloud/pubsub/CMakeLists.txt +++ b/google/cloud/pubsub/CMakeLists.txt @@ -70,6 +70,8 @@ add_library( internal/publisher_stub.h internal/rejects_with_ordering_key.cc internal/rejects_with_ordering_key.h + internal/schema_auth.cc + internal/schema_auth.h internal/schema_logging.cc internal/schema_logging.h internal/schema_metadata.cc @@ -245,6 +247,7 @@ function (google_cloud_cpp_pubsub_client_define_tests) internal/publisher_metadata_test.cc internal/publisher_round_robin_test.cc internal/rejects_with_ordering_key_test.cc + internal/schema_auth_test.cc internal/schema_logging_test.cc internal/schema_metadata_test.cc internal/sequential_batch_sink_test.cc diff --git a/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl b/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl index d8b6217db77fd..b85996099b2a3 100644 --- a/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl +++ b/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl @@ -35,6 +35,7 @@ google_cloud_cpp_pubsub_hdrs = [ "internal/publisher_round_robin.h", "internal/publisher_stub.h", "internal/rejects_with_ordering_key.h", + "internal/schema_auth.h", "internal/schema_logging.h", "internal/schema_metadata.h", "internal/schema_stub.h", @@ -93,6 +94,7 @@ google_cloud_cpp_pubsub_srcs = [ "internal/publisher_round_robin.cc", "internal/publisher_stub.cc", "internal/rejects_with_ordering_key.cc", + "internal/schema_auth.cc", "internal/schema_logging.cc", "internal/schema_metadata.cc", "internal/schema_stub.cc", diff --git a/google/cloud/pubsub/integration_tests/schema_admin_integration_test.cc b/google/cloud/pubsub/integration_tests/schema_admin_integration_test.cc index 4915b1303285a..99af51a797d7d 100644 --- a/google/cloud/pubsub/integration_tests/schema_admin_integration_test.cc +++ b/google/cloud/pubsub/integration_tests/schema_admin_integration_test.cc @@ -16,6 +16,7 @@ #include "google/cloud/pubsub/schema_admin_client.h" #include "google/cloud/pubsub/testing/random_names.h" #include "google/cloud/pubsub/testing/test_retry_policies.h" +#include "google/cloud/credentials.h" #include "google/cloud/internal/getenv.h" #include "google/cloud/testing_util/integration_test.h" #include "google/cloud/testing_util/is_proto_equal.h" @@ -35,8 +36,13 @@ using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::ScopedEnvironment; using ::google::cloud::testing_util::StatusIs; using ::testing::Contains; +using ::testing::IsEmpty; using ::testing::Not; +bool UsingEmulator() { + return google::cloud::internal::GetEnv("PUBSUB_EMULATOR_HOST").has_value(); +} + SchemaAdminClient MakeTestSchemaAdminClient() { return SchemaAdminClient(MakeSchemaAdminConnection(MakeTestOptions())); } @@ -93,6 +99,26 @@ TEST_F(SchemaAdminIntegrationTest, SchemaCRUD) { EXPECT_THAT(deleted, IsOk()); } +TEST_F(SchemaAdminIntegrationTest, UnifiedCredentials) { + auto project_id = + google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or(""); + ASSERT_THAT(project_id, Not(IsEmpty())); + auto options = + Options{}.set(MakeGoogleDefaultCredentials()); + if (UsingEmulator()) { + options = Options{} + .set(MakeAccessTokenCredentials( + "test-only-invalid", std::chrono::system_clock::now() + + std::chrono::minutes(15))) + .set(true); + } + auto client = + SchemaAdminClient(MakeSchemaAdminConnection(std::move(options))); + for (auto&& r : client.ListSchemas(project_id)) { + EXPECT_THAT(r, IsOk()); + } +} + TEST_F(SchemaAdminIntegrationTest, CreateSchema) { ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1"); auto schema_admin = MakeTestSchemaAdminClient(); diff --git a/google/cloud/pubsub/internal/schema_auth.cc b/google/cloud/pubsub/internal/schema_auth.cc new file mode 100644 index 0000000000000..7e6e816563dcb --- /dev/null +++ b/google/cloud/pubsub/internal/schema_auth.cc @@ -0,0 +1,75 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/pubsub/internal/schema_auth.h" +#include "google/cloud/internal/log_wrapper.h" + +namespace google { +namespace cloud { +namespace pubsub_internal { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { + +StatusOr SchemaAuth::CreateSchema( + grpc::ClientContext& context, + google::pubsub::v1::CreateSchemaRequest const& request) { + auto status = auth_->ConfigureContext(context); + if (!status.ok()) return status; + return child_->CreateSchema(context, request); +} + +StatusOr SchemaAuth::GetSchema( + grpc::ClientContext& context, + google::pubsub::v1::GetSchemaRequest const& request) { + auto status = auth_->ConfigureContext(context); + if (!status.ok()) return status; + return child_->GetSchema(context, request); +} + +StatusOr SchemaAuth::ListSchemas( + grpc::ClientContext& context, + google::pubsub::v1::ListSchemasRequest const& request) { + auto status = auth_->ConfigureContext(context); + if (!status.ok()) return status; + return child_->ListSchemas(context, request); +} + +Status SchemaAuth::DeleteSchema( + grpc::ClientContext& context, + google::pubsub::v1::DeleteSchemaRequest const& request) { + auto status = auth_->ConfigureContext(context); + if (!status.ok()) return status; + return child_->DeleteSchema(context, request); +} + +StatusOr SchemaAuth::ValidateSchema( + grpc::ClientContext& context, + google::pubsub::v1::ValidateSchemaRequest const& request) { + auto status = auth_->ConfigureContext(context); + if (!status.ok()) return status; + return child_->ValidateSchema(context, request); +} + +StatusOr +SchemaAuth::ValidateMessage( + grpc::ClientContext& context, + google::pubsub::v1::ValidateMessageRequest const& request) { + auto status = auth_->ConfigureContext(context); + if (!status.ok()) return status; + return child_->ValidateMessage(context, request); +} + +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/pubsub/internal/schema_auth.h b/google/cloud/pubsub/internal/schema_auth.h new file mode 100644 index 0000000000000..7339a93380cc5 --- /dev/null +++ b/google/cloud/pubsub/internal/schema_auth.h @@ -0,0 +1,67 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SCHEMA_AUTH_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SCHEMA_AUTH_H + +#include "google/cloud/pubsub/internal/schema_stub.h" +#include "google/cloud/pubsub/version.h" +#include "google/cloud/internal/unified_grpc_credentials.h" +#include + +namespace google { +namespace cloud { +namespace pubsub_internal { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { + +/** + * A Decorator for `SchemaStub` that logs each request and response. + */ +class SchemaAuth : public SchemaStub { + public: + SchemaAuth( + std::shared_ptr auth, + std::shared_ptr child) + : auth_(std::move(auth)), child_(std::move(child)) {} + + StatusOr CreateSchema( + grpc::ClientContext& context, + google::pubsub::v1::CreateSchemaRequest const& request) override; + StatusOr GetSchema( + grpc::ClientContext& context, + google::pubsub::v1::GetSchemaRequest const& request) override; + StatusOr ListSchemas( + grpc::ClientContext& context, + google::pubsub::v1::ListSchemasRequest const& request) override; + Status DeleteSchema( + grpc::ClientContext& context, + google::pubsub::v1::DeleteSchemaRequest const& request) override; + StatusOr ValidateSchema( + grpc::ClientContext& context, + google::pubsub::v1::ValidateSchemaRequest const& request) override; + StatusOr ValidateMessage( + grpc::ClientContext& context, + google::pubsub::v1::ValidateMessageRequest const& request) override; + + private: + std::shared_ptr auth_; + std::shared_ptr child_; +}; + +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SCHEMA_AUTH_H diff --git a/google/cloud/pubsub/internal/schema_auth_test.cc b/google/cloud/pubsub/internal/schema_auth_test.cc new file mode 100644 index 0000000000000..11cc3291429c5 --- /dev/null +++ b/google/cloud/pubsub/internal/schema_auth_test.cc @@ -0,0 +1,134 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/pubsub/internal/schema_auth.h" +#include "google/cloud/pubsub/testing/mock_schema_stub.h" +#include "google/cloud/testing_util/mock_grpc_authentication_strategy.h" +#include "google/cloud/testing_util/status_matchers.h" +#include "absl/memory/memory.h" +#include + +namespace google { +namespace cloud { +namespace pubsub_internal { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { +namespace { + +using ::google::cloud::testing_util::MakeTypicalMockAuth; +using ::google::cloud::testing_util::StatusIs; +using ::testing::IsNull; +using ::testing::Not; +using ::testing::Return; + +TEST(SchemaAuthTest, CreateSchema) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, CreateSchema) + .WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh"))); + auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock); + grpc::ClientContext ctx; + google::pubsub::v1::CreateSchemaRequest request; + auto auth_failure = under_test.CreateSchema(ctx, request); + EXPECT_THAT(ctx.credentials(), IsNull()); + EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument)); + + auto auth_success = under_test.CreateSchema(ctx, request); + EXPECT_THAT(ctx.credentials(), Not(IsNull())); + EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied)); +} + +TEST(SchemaAuthTest, GetSchema) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, GetSchema) + .WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh"))); + auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock); + grpc::ClientContext ctx; + google::pubsub::v1::GetSchemaRequest request; + auto auth_failure = under_test.GetSchema(ctx, request); + EXPECT_THAT(ctx.credentials(), IsNull()); + EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument)); + + auto auth_success = under_test.GetSchema(ctx, request); + EXPECT_THAT(ctx.credentials(), Not(IsNull())); + EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied)); +} + +TEST(SchemaAuthTest, ListSchemas) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, ListSchemas) + .WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh"))); + auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock); + grpc::ClientContext ctx; + google::pubsub::v1::ListSchemasRequest request; + auto auth_failure = under_test.ListSchemas(ctx, request); + EXPECT_THAT(ctx.credentials(), IsNull()); + EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument)); + + auto auth_success = under_test.ListSchemas(ctx, request); + EXPECT_THAT(ctx.credentials(), Not(IsNull())); + EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied)); +} + +TEST(SchemaAuthTest, DeleteSchema) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, DeleteSchema) + .WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh"))); + auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock); + grpc::ClientContext ctx; + google::pubsub::v1::DeleteSchemaRequest request; + auto auth_failure = under_test.DeleteSchema(ctx, request); + EXPECT_THAT(ctx.credentials(), IsNull()); + EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument)); + + auto auth_success = under_test.DeleteSchema(ctx, request); + EXPECT_THAT(ctx.credentials(), Not(IsNull())); + EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied)); +} + +TEST(SchemaAuthTest, ValidateSchema) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, ValidateSchema) + .WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh"))); + auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock); + grpc::ClientContext ctx; + google::pubsub::v1::ValidateSchemaRequest request; + auto auth_failure = under_test.ValidateSchema(ctx, request); + EXPECT_THAT(ctx.credentials(), IsNull()); + EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument)); + + auto auth_success = under_test.ValidateSchema(ctx, request); + EXPECT_THAT(ctx.credentials(), Not(IsNull())); + EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied)); +} + +TEST(SchemaAuthTest, ValidateMessage) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, ValidateMessage) + .WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh"))); + auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock); + grpc::ClientContext ctx; + google::pubsub::v1::ValidateMessageRequest request; + auto auth_failure = under_test.ValidateMessage(ctx, request); + EXPECT_THAT(ctx.credentials(), IsNull()); + EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument)); + + auto auth_success = under_test.ValidateMessage(ctx, request); + EXPECT_THAT(ctx.credentials(), Not(IsNull())); + EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied)); +} + +} // namespace +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/pubsub/internal/schema_stub.cc b/google/cloud/pubsub/internal/schema_stub.cc index d28aac7a549c2..8fead3db06b5b 100644 --- a/google/cloud/pubsub/internal/schema_stub.cc +++ b/google/cloud/pubsub/internal/schema_stub.cc @@ -90,11 +90,10 @@ class DefaultSchemaStub : public SchemaStub { }; } // namespace -std::shared_ptr CreateDefaultSchemaStub(Options const& opts, - int channel_id) { +std::shared_ptr CreateDefaultSchemaStub( + std::shared_ptr channel) { return std::make_shared( - google::pubsub::v1::SchemaService::NewStub( - CreateChannel(opts, channel_id))); + google::pubsub::v1::SchemaService::NewStub(std::move(channel))); } } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS diff --git a/google/cloud/pubsub/internal/schema_stub.h b/google/cloud/pubsub/internal/schema_stub.h index 934c3103a04c1..faa6c63368431 100644 --- a/google/cloud/pubsub/internal/schema_stub.h +++ b/google/cloud/pubsub/internal/schema_stub.h @@ -71,13 +71,10 @@ class SchemaStub { }; /** - * Creates a SchemaStub configured with @p opts and @p channel_id. - * - * @p channel_id should be unique among all stubs in the same Connection pool, - * to ensure they use different underlying connections. + * Creates a SchemaStub with a pre-configured channel. */ -std::shared_ptr CreateDefaultSchemaStub(Options const& opts, - int channel_id); +std::shared_ptr CreateDefaultSchemaStub( + std::shared_ptr channel); } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS } // namespace pubsub_internal diff --git a/google/cloud/pubsub/pubsub_client_unit_tests.bzl b/google/cloud/pubsub/pubsub_client_unit_tests.bzl index 2fbd49f0b0592..6ebcd99c7e9ec 100644 --- a/google/cloud/pubsub/pubsub_client_unit_tests.bzl +++ b/google/cloud/pubsub/pubsub_client_unit_tests.bzl @@ -28,6 +28,7 @@ pubsub_client_unit_tests = [ "internal/publisher_metadata_test.cc", "internal/publisher_round_robin_test.cc", "internal/rejects_with_ordering_key_test.cc", + "internal/schema_auth_test.cc", "internal/schema_logging_test.cc", "internal/schema_metadata_test.cc", "internal/sequential_batch_sink_test.cc", diff --git a/google/cloud/pubsub/schema_admin_connection.cc b/google/cloud/pubsub/schema_admin_connection.cc index d2c3519d8362e..753573403108c 100644 --- a/google/cloud/pubsub/schema_admin_connection.cc +++ b/google/cloud/pubsub/schema_admin_connection.cc @@ -14,6 +14,7 @@ #include "google/cloud/pubsub/schema_admin_connection.h" #include "google/cloud/pubsub/internal/defaults.h" +#include "google/cloud/pubsub/internal/schema_auth.h" #include "google/cloud/pubsub/internal/schema_logging.h" #include "google/cloud/pubsub/internal/schema_metadata.h" #include "google/cloud/pubsub/internal/schema_stub.h" @@ -35,10 +36,12 @@ using ::google::cloud::internal::RetryLoop; class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { public: explicit SchemaAdminConnectionImpl( + std::unique_ptr background, std::shared_ptr stub, std::unique_ptr retry_policy, std::unique_ptr backoff_policy) - : stub_(std::move(stub)), + : background_(std::move(background)), + stub_(std::move(stub)), retry_policy_(std::move(retry_policy)), backoff_policy_(std::move(backoff_policy)) {} @@ -135,22 +138,41 @@ class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { } private: + std::unique_ptr background_; std::shared_ptr stub_; std::unique_ptr retry_policy_; std::unique_ptr backoff_policy_; }; -} // namespace -std::shared_ptr MakeSchemaAdminConnection( - Options const& opts, std::shared_ptr stub) { - stub = std::make_shared(std::move(stub)); +// Decorates a SchemaAdminStub. This works for both mock and real stubs. +std::shared_ptr DecorateSchemaAdminStub( + Options const& opts, + std::shared_ptr auth, + std::shared_ptr stub) { + if (auth->RequiresConfigureContext()) { + stub = std::make_shared(std::move(auth), + std::move(stub)); + } + stub = std::make_shared(std::move(stub)); if (internal::Contains(opts.get(), "rpc")) { GCP_LOG(INFO) << "Enabled logging for gRPC calls"; - stub = std::make_shared( + stub = std::make_shared( std::move(stub), opts.get()); } + return stub; +} + +} // namespace + +std::shared_ptr MakeSchemaAdminConnection( + Options const& opts, std::shared_ptr stub) { + auto background = internal::MakeBackgroundThreadsFactory(opts)(); + auto auth = google::cloud::internal::CreateAuthenticationStrategy( + google::cloud::MakeInsecureCredentials(), background->cq(), opts); + stub = DecorateSchemaAdminStub(opts, std::move(auth), std::move(stub)); return std::make_shared( - std::move(stub), opts.get()->clone(), + std::move(background), std::move(stub), + opts.get()->clone(), opts.get()->clone()); } @@ -171,9 +193,20 @@ std::shared_ptr MakeSchemaAdminConnection(Options opts) { internal::CheckExpectedOptions(opts, __func__); opts = pubsub_internal::DefaultCommonOptions(std::move(opts)); - auto stub = pubsub_internal::CreateDefaultSchemaStub(opts, /*channel_id=*/0); - return pubsub_internal::MakeSchemaAdminConnection(std::move(opts), - std::move(stub)); + + auto background = internal::MakeBackgroundThreadsFactory(opts)(); + auto auth = google::cloud::internal::CreateAuthenticationStrategy( + background->cq(), opts); + + auto stub = pubsub_internal::CreateDefaultSchemaStub(auth->CreateChannel( + opts.get(), internal::MakeChannelArguments(opts))); + + stub = pubsub_internal::DecorateSchemaAdminStub(opts, std::move(auth), + std::move(stub)); + return std::make_shared( + std::move(background), std::move(stub), + opts.get()->clone(), + opts.get()->clone()); } std::shared_ptr MakeSchemaAdminConnection(