Skip to content

Commit

Permalink
feat(pubsub): implement GUAC for SchemaAdmin (#7436)
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan authored Oct 11, 2021
1 parent 1e29dce commit e38b7c1
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 20 deletions.
3 changes: 3 additions & 0 deletions google/cloud/pubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ add_library(
internal/publisher_stub.h
internal/rejects_with_ordering_key.cc
internal/rejects_with_ordering_key.h
internal/schema_auth.cc
internal/schema_auth.h
internal/schema_logging.cc
internal/schema_logging.h
internal/schema_metadata.cc
Expand Down Expand Up @@ -245,6 +247,7 @@ function (google_cloud_cpp_pubsub_client_define_tests)
internal/publisher_metadata_test.cc
internal/publisher_round_robin_test.cc
internal/rejects_with_ordering_key_test.cc
internal/schema_auth_test.cc
internal/schema_logging_test.cc
internal/schema_metadata_test.cc
internal/sequential_batch_sink_test.cc
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/pubsub/google_cloud_cpp_pubsub.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ google_cloud_cpp_pubsub_hdrs = [
"internal/publisher_round_robin.h",
"internal/publisher_stub.h",
"internal/rejects_with_ordering_key.h",
"internal/schema_auth.h",
"internal/schema_logging.h",
"internal/schema_metadata.h",
"internal/schema_stub.h",
Expand Down Expand Up @@ -93,6 +94,7 @@ google_cloud_cpp_pubsub_srcs = [
"internal/publisher_round_robin.cc",
"internal/publisher_stub.cc",
"internal/rejects_with_ordering_key.cc",
"internal/schema_auth.cc",
"internal/schema_logging.cc",
"internal/schema_metadata.cc",
"internal/schema_stub.cc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "google/cloud/pubsub/schema_admin_client.h"
#include "google/cloud/pubsub/testing/random_names.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/credentials.h"
#include "google/cloud/internal/getenv.h"
#include "google/cloud/testing_util/integration_test.h"
#include "google/cloud/testing_util/is_proto_equal.h"
Expand All @@ -35,8 +36,13 @@ using ::google::cloud::testing_util::IsProtoEqual;
using ::google::cloud::testing_util::ScopedEnvironment;
using ::google::cloud::testing_util::StatusIs;
using ::testing::Contains;
using ::testing::IsEmpty;
using ::testing::Not;

bool UsingEmulator() {
return google::cloud::internal::GetEnv("PUBSUB_EMULATOR_HOST").has_value();
}

SchemaAdminClient MakeTestSchemaAdminClient() {
return SchemaAdminClient(MakeSchemaAdminConnection(MakeTestOptions()));
}
Expand Down Expand Up @@ -93,6 +99,26 @@ TEST_F(SchemaAdminIntegrationTest, SchemaCRUD) {
EXPECT_THAT(deleted, IsOk());
}

TEST_F(SchemaAdminIntegrationTest, UnifiedCredentials) {
auto project_id =
google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
ASSERT_THAT(project_id, Not(IsEmpty()));
auto options =
Options{}.set<UnifiedCredentialsOption>(MakeGoogleDefaultCredentials());
if (UsingEmulator()) {
options = Options{}
.set<UnifiedCredentialsOption>(MakeAccessTokenCredentials(
"test-only-invalid", std::chrono::system_clock::now() +
std::chrono::minutes(15)))
.set<internal::UseInsecureChannelOption>(true);
}
auto client =
SchemaAdminClient(MakeSchemaAdminConnection(std::move(options)));
for (auto&& r : client.ListSchemas(project_id)) {
EXPECT_THAT(r, IsOk());
}
}

TEST_F(SchemaAdminIntegrationTest, CreateSchema) {
ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
auto schema_admin = MakeTestSchemaAdminClient();
Expand Down
75 changes: 75 additions & 0 deletions google/cloud/pubsub/internal/schema_auth.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/pubsub/internal/schema_auth.h"
#include "google/cloud/internal/log_wrapper.h"

namespace google {
namespace cloud {
namespace pubsub_internal {
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {

StatusOr<google::pubsub::v1::Schema> SchemaAuth::CreateSchema(
grpc::ClientContext& context,
google::pubsub::v1::CreateSchemaRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->CreateSchema(context, request);
}

StatusOr<google::pubsub::v1::Schema> SchemaAuth::GetSchema(
grpc::ClientContext& context,
google::pubsub::v1::GetSchemaRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->GetSchema(context, request);
}

StatusOr<google::pubsub::v1::ListSchemasResponse> SchemaAuth::ListSchemas(
grpc::ClientContext& context,
google::pubsub::v1::ListSchemasRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->ListSchemas(context, request);
}

Status SchemaAuth::DeleteSchema(
grpc::ClientContext& context,
google::pubsub::v1::DeleteSchemaRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->DeleteSchema(context, request);
}

StatusOr<google::pubsub::v1::ValidateSchemaResponse> SchemaAuth::ValidateSchema(
grpc::ClientContext& context,
google::pubsub::v1::ValidateSchemaRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->ValidateSchema(context, request);
}

StatusOr<google::pubsub::v1::ValidateMessageResponse>
SchemaAuth::ValidateMessage(
grpc::ClientContext& context,
google::pubsub::v1::ValidateMessageRequest const& request) {
auto status = auth_->ConfigureContext(context);
if (!status.ok()) return status;
return child_->ValidateMessage(context, request);
}

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
} // namespace cloud
} // namespace google
67 changes: 67 additions & 0 deletions google/cloud/pubsub/internal/schema_auth.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SCHEMA_AUTH_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SCHEMA_AUTH_H

#include "google/cloud/pubsub/internal/schema_stub.h"
#include "google/cloud/pubsub/version.h"
#include "google/cloud/internal/unified_grpc_credentials.h"
#include <memory>

namespace google {
namespace cloud {
namespace pubsub_internal {
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {

/**
* A Decorator for `SchemaStub` that logs each request and response.
*/
class SchemaAuth : public SchemaStub {
public:
SchemaAuth(
std::shared_ptr<google::cloud::internal::GrpcAuthenticationStrategy> auth,
std::shared_ptr<SchemaStub> child)
: auth_(std::move(auth)), child_(std::move(child)) {}

StatusOr<google::pubsub::v1::Schema> CreateSchema(
grpc::ClientContext& context,
google::pubsub::v1::CreateSchemaRequest const& request) override;
StatusOr<google::pubsub::v1::Schema> GetSchema(
grpc::ClientContext& context,
google::pubsub::v1::GetSchemaRequest const& request) override;
StatusOr<google::pubsub::v1::ListSchemasResponse> ListSchemas(
grpc::ClientContext& context,
google::pubsub::v1::ListSchemasRequest const& request) override;
Status DeleteSchema(
grpc::ClientContext& context,
google::pubsub::v1::DeleteSchemaRequest const& request) override;
StatusOr<google::pubsub::v1::ValidateSchemaResponse> ValidateSchema(
grpc::ClientContext& context,
google::pubsub::v1::ValidateSchemaRequest const& request) override;
StatusOr<google::pubsub::v1::ValidateMessageResponse> ValidateMessage(
grpc::ClientContext& context,
google::pubsub::v1::ValidateMessageRequest const& request) override;

private:
std::shared_ptr<google::cloud::internal::GrpcAuthenticationStrategy> auth_;
std::shared_ptr<SchemaStub> child_;
};

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SCHEMA_AUTH_H
134 changes: 134 additions & 0 deletions google/cloud/pubsub/internal/schema_auth_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/pubsub/internal/schema_auth.h"
#include "google/cloud/pubsub/testing/mock_schema_stub.h"
#include "google/cloud/testing_util/mock_grpc_authentication_strategy.h"
#include "google/cloud/testing_util/status_matchers.h"
#include "absl/memory/memory.h"
#include <gmock/gmock.h>

namespace google {
namespace cloud {
namespace pubsub_internal {
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
namespace {

using ::google::cloud::testing_util::MakeTypicalMockAuth;
using ::google::cloud::testing_util::StatusIs;
using ::testing::IsNull;
using ::testing::Not;
using ::testing::Return;

TEST(SchemaAuthTest, CreateSchema) {
auto mock = std::make_shared<pubsub_testing::MockSchemaStub>();
EXPECT_CALL(*mock, CreateSchema)
.WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh")));
auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock);
grpc::ClientContext ctx;
google::pubsub::v1::CreateSchemaRequest request;
auto auth_failure = under_test.CreateSchema(ctx, request);
EXPECT_THAT(ctx.credentials(), IsNull());
EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument));

auto auth_success = under_test.CreateSchema(ctx, request);
EXPECT_THAT(ctx.credentials(), Not(IsNull()));
EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied));
}

TEST(SchemaAuthTest, GetSchema) {
auto mock = std::make_shared<pubsub_testing::MockSchemaStub>();
EXPECT_CALL(*mock, GetSchema)
.WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh")));
auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock);
grpc::ClientContext ctx;
google::pubsub::v1::GetSchemaRequest request;
auto auth_failure = under_test.GetSchema(ctx, request);
EXPECT_THAT(ctx.credentials(), IsNull());
EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument));

auto auth_success = under_test.GetSchema(ctx, request);
EXPECT_THAT(ctx.credentials(), Not(IsNull()));
EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied));
}

TEST(SchemaAuthTest, ListSchemas) {
auto mock = std::make_shared<pubsub_testing::MockSchemaStub>();
EXPECT_CALL(*mock, ListSchemas)
.WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh")));
auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock);
grpc::ClientContext ctx;
google::pubsub::v1::ListSchemasRequest request;
auto auth_failure = under_test.ListSchemas(ctx, request);
EXPECT_THAT(ctx.credentials(), IsNull());
EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument));

auto auth_success = under_test.ListSchemas(ctx, request);
EXPECT_THAT(ctx.credentials(), Not(IsNull()));
EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied));
}

TEST(SchemaAuthTest, DeleteSchema) {
auto mock = std::make_shared<pubsub_testing::MockSchemaStub>();
EXPECT_CALL(*mock, DeleteSchema)
.WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh")));
auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock);
grpc::ClientContext ctx;
google::pubsub::v1::DeleteSchemaRequest request;
auto auth_failure = under_test.DeleteSchema(ctx, request);
EXPECT_THAT(ctx.credentials(), IsNull());
EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument));

auto auth_success = under_test.DeleteSchema(ctx, request);
EXPECT_THAT(ctx.credentials(), Not(IsNull()));
EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied));
}

TEST(SchemaAuthTest, ValidateSchema) {
auto mock = std::make_shared<pubsub_testing::MockSchemaStub>();
EXPECT_CALL(*mock, ValidateSchema)
.WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh")));
auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock);
grpc::ClientContext ctx;
google::pubsub::v1::ValidateSchemaRequest request;
auto auth_failure = under_test.ValidateSchema(ctx, request);
EXPECT_THAT(ctx.credentials(), IsNull());
EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument));

auto auth_success = under_test.ValidateSchema(ctx, request);
EXPECT_THAT(ctx.credentials(), Not(IsNull()));
EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied));
}

TEST(SchemaAuthTest, ValidateMessage) {
auto mock = std::make_shared<pubsub_testing::MockSchemaStub>();
EXPECT_CALL(*mock, ValidateMessage)
.WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh")));
auto under_test = SchemaAuth(MakeTypicalMockAuth(), mock);
grpc::ClientContext ctx;
google::pubsub::v1::ValidateMessageRequest request;
auto auth_failure = under_test.ValidateMessage(ctx, request);
EXPECT_THAT(ctx.credentials(), IsNull());
EXPECT_THAT(auth_failure, StatusIs(StatusCode::kInvalidArgument));

auto auth_success = under_test.ValidateMessage(ctx, request);
EXPECT_THAT(ctx.credentials(), Not(IsNull()));
EXPECT_THAT(auth_success, StatusIs(StatusCode::kPermissionDenied));
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
} // namespace cloud
} // namespace google
7 changes: 3 additions & 4 deletions google/cloud/pubsub/internal/schema_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ class DefaultSchemaStub : public SchemaStub {
};
} // namespace

std::shared_ptr<SchemaStub> CreateDefaultSchemaStub(Options const& opts,
int channel_id) {
std::shared_ptr<SchemaStub> CreateDefaultSchemaStub(
std::shared_ptr<grpc::Channel> channel) {
return std::make_shared<DefaultSchemaStub>(
google::pubsub::v1::SchemaService::NewStub(
CreateChannel(opts, channel_id)));
google::pubsub::v1::SchemaService::NewStub(std::move(channel)));
}

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
Expand Down
9 changes: 3 additions & 6 deletions google/cloud/pubsub/internal/schema_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,10 @@ class SchemaStub {
};

/**
* Creates a SchemaStub configured with @p opts and @p channel_id.
*
* @p channel_id should be unique among all stubs in the same Connection pool,
* to ensure they use different underlying connections.
* Creates a SchemaStub with a pre-configured channel.
*/
std::shared_ptr<SchemaStub> CreateDefaultSchemaStub(Options const& opts,
int channel_id);
std::shared_ptr<SchemaStub> CreateDefaultSchemaStub(
std::shared_ptr<grpc::Channel> channel);

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
Expand Down
Loading

0 comments on commit e38b7c1

Please sign in to comment.