Skip to content

Commit

Permalink
impl(bigtable): duplicate AsyncRowSampler (#9221)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc authored Jun 8, 2022
1 parent 1b3b597 commit 0ed8eac
Show file tree
Hide file tree
Showing 8 changed files with 671 additions and 10 deletions.
6 changes: 3 additions & 3 deletions google/cloud/bigtable/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ add_library(
internal/async_bulk_apply.h
internal/async_retry_op.h
internal/async_row_reader.h
internal/async_row_sampler.cc
internal/async_row_sampler.h
internal/async_streaming_read.h
internal/bigtable_auth_decorator.cc
internal/bigtable_auth_decorator.h
Expand Down Expand Up @@ -186,6 +184,8 @@ add_library(
internal/defaults.h
internal/google_bytes_traits.cc
internal/google_bytes_traits.h
internal/legacy_async_row_sampler.cc
internal/legacy_async_row_sampler.h
internal/legacy_row_reader.cc
internal/legacy_row_reader.h
internal/logging_data_client.cc
Expand Down Expand Up @@ -330,7 +330,6 @@ if (BUILD_TESTING)
internal/admin_client_params_test.cc
internal/async_bulk_apply_test.cc
internal/async_row_reader_test.cc
internal/async_row_sampler_test.cc
internal/async_streaming_read_test.cc
internal/bigtable_channel_refresh_test.cc
internal/bigtable_round_robin_test.cc
Expand All @@ -342,6 +341,7 @@ if (BUILD_TESTING)
internal/default_row_reader_test.cc
internal/defaults_test.cc
internal/google_bytes_traits_test.cc
internal/legacy_async_row_sampler_test.cc
internal/legacy_bulk_mutator_test.cc
internal/legacy_row_reader_test.cc
internal/logging_data_client_test.cc
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigtable/bigtable_client_unit_tests.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ bigtable_client_unit_tests = [
"internal/admin_client_params_test.cc",
"internal/async_bulk_apply_test.cc",
"internal/async_row_reader_test.cc",
"internal/async_row_sampler_test.cc",
"internal/async_streaming_read_test.cc",
"internal/bigtable_channel_refresh_test.cc",
"internal/bigtable_round_robin_test.cc",
Expand All @@ -51,6 +50,7 @@ bigtable_client_unit_tests = [
"internal/default_row_reader_test.cc",
"internal/defaults_test.cc",
"internal/google_bytes_traits_test.cc",
"internal/legacy_async_row_sampler_test.cc",
"internal/legacy_bulk_mutator_test.cc",
"internal/legacy_row_reader_test.cc",
"internal/logging_data_client_test.cc",
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable/data_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
class Table;
namespace internal {
class AsyncRetryBulkApply;
class AsyncRowSampler;
class LegacyAsyncRowSampler;
class BulkMutator;
class LoggingDataClient;
} // namespace internal
Expand Down Expand Up @@ -114,7 +114,7 @@ class DataClient {
protected:
friend class Table;
friend class internal::AsyncRetryBulkApply;
friend class internal::AsyncRowSampler;
friend class internal::LegacyAsyncRowSampler;
friend class internal::BulkMutator;
friend class bigtable_internal::LegacyRowReader;
template <typename RowFunctor, typename FinishFunctor>
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable/google_cloud_cpp_bigtable.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ google_cloud_cpp_bigtable_hdrs = [
"internal/async_bulk_apply.h",
"internal/async_retry_op.h",
"internal/async_row_reader.h",
"internal/async_row_sampler.h",
"internal/async_streaming_read.h",
"internal/bigtable_auth_decorator.h",
"internal/bigtable_channel_refresh.h",
Expand All @@ -84,6 +83,7 @@ google_cloud_cpp_bigtable_hdrs = [
"internal/default_row_reader.h",
"internal/defaults.h",
"internal/google_bytes_traits.h",
"internal/legacy_async_row_sampler.h",
"internal/legacy_row_reader.h",
"internal/logging_data_client.h",
"internal/prefix_range_end.h",
Expand Down Expand Up @@ -152,7 +152,6 @@ google_cloud_cpp_bigtable_srcs = [
"instance_update_config.cc",
"internal/admin_client_params.cc",
"internal/async_bulk_apply.cc",
"internal/async_row_sampler.cc",
"internal/bigtable_auth_decorator.cc",
"internal/bigtable_channel_refresh.cc",
"internal/bigtable_logging_decorator.cc",
Expand All @@ -168,6 +167,7 @@ google_cloud_cpp_bigtable_srcs = [
"internal/default_row_reader.cc",
"internal/defaults.cc",
"internal/google_bytes_traits.cc",
"internal/legacy_async_row_sampler.cc",
"internal/legacy_row_reader.cc",
"internal/logging_data_client.cc",
"internal/prefix_range_end.cc",
Expand Down
124 changes: 124 additions & 0 deletions google/cloud/bigtable/internal/legacy_async_row_sampler.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/bigtable/internal/legacy_async_row_sampler.h"
#include "google/cloud/grpc_error_delegate.h"
#include "absl/memory/memory.h"
#include <grpcpp/client_context.h>
#include <grpcpp/completion_queue.h>
#include <chrono>

namespace google {
namespace cloud {
namespace bigtable {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace internal {

namespace btproto = ::google::bigtable::v2;

future<StatusOr<std::vector<RowKeySample>>> LegacyAsyncRowSampler::Create(
CompletionQueue cq, std::shared_ptr<DataClient> client,
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
MetadataUpdatePolicy metadata_update_policy, std::string app_profile_id,
std::string table_name) {
std::shared_ptr<LegacyAsyncRowSampler> sampler(new LegacyAsyncRowSampler(
std::move(cq), std::move(client), std::move(rpc_retry_policy),
std::move(rpc_backoff_policy), std::move(metadata_update_policy),
std::move(app_profile_id), std::move(table_name)));
sampler->StartIteration();
return sampler->promise_.get_future();
}

LegacyAsyncRowSampler::LegacyAsyncRowSampler(
CompletionQueue cq, std::shared_ptr<DataClient> client,
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
MetadataUpdatePolicy metadata_update_policy, std::string app_profile_id,
std::string table_name)
: cq_(std::move(cq)),
client_(std::move(client)),
rpc_retry_policy_(std::move(rpc_retry_policy)),
rpc_backoff_policy_(std::move(rpc_backoff_policy)),
metadata_update_policy_(std::move(metadata_update_policy)),
app_profile_id_(std::move(app_profile_id)),
table_name_(std::move(table_name)),
promise_([this] { stream_cancelled_ = true; }) {}

void LegacyAsyncRowSampler::StartIteration() {
btproto::SampleRowKeysRequest request;
request.set_app_profile_id(app_profile_id_);
request.set_table_name(table_name_);

auto context = absl::make_unique<grpc::ClientContext>();
rpc_retry_policy_->Setup(*context);
rpc_backoff_policy_->Setup(*context);
metadata_update_policy_.Setup(*context);

auto client = client_;
auto self = this->shared_from_this();
cq_.MakeStreamingReadRpc(
[client](grpc::ClientContext* context,
btproto::SampleRowKeysRequest const& request,
grpc::CompletionQueue* cq) {
return client->PrepareAsyncSampleRowKeys(context, request, cq);
},
request, std::move(context),
[self](btproto::SampleRowKeysResponse response) {
return self->OnRead(std::move(response));
},
[self](Status const& status) { self->OnFinish(status); });
}

future<bool> LegacyAsyncRowSampler::OnRead(
btproto::SampleRowKeysResponse response) {
if (stream_cancelled_) return make_ready_future(false);

RowKeySample row_sample;
row_sample.offset_bytes = response.offset_bytes();
row_sample.row_key = std::move(*response.mutable_row_key());
samples_.emplace_back(std::move(row_sample));
return make_ready_future(true);
}

void LegacyAsyncRowSampler::OnFinish(Status const& status) {
if (status.ok()) {
promise_.set_value(std::move(samples_));
return;
}
if (!rpc_retry_policy_->OnFailure(status)) {
promise_.set_value(std::move(status));
return;
}

using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;

samples_.clear();
auto self = this->shared_from_this();
auto delay = rpc_backoff_policy_->OnCompletion(std::move(status));
cq_.MakeRelativeTimer(delay).then([self](TimerFuture result) {
if (result.get()) {
self->StartIteration();
} else {
self->promise_.set_value(
Status(StatusCode::kCancelled, "call cancelled"));
}
});
}

} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable
} // namespace cloud
} // namespace google
83 changes: 83 additions & 0 deletions google/cloud/bigtable/internal/legacy_async_row_sampler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_LEGACY_ASYNC_ROW_SAMPLER_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_LEGACY_ASYNC_ROW_SAMPLER_H

#include "google/cloud/bigtable/completion_queue.h"
#include "google/cloud/bigtable/data_client.h"
#include "google/cloud/bigtable/metadata_update_policy.h"
#include "google/cloud/bigtable/row_key_sample.h"
#include "google/cloud/bigtable/rpc_backoff_policy.h"
#include "google/cloud/bigtable/rpc_retry_policy.h"
#include "google/cloud/bigtable/version.h"
#include "google/cloud/future_generic.h"
#include "google/cloud/status.h"
#include "google/cloud/status_or.h"
#include <google/bigtable/v2/bigtable.pb.h>
#include <memory>
#include <string>
#include <vector>

namespace google {
namespace cloud {
namespace bigtable {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace internal {

/**
* Objects of this class represent the state of receiving row keys via
* AsyncSampleRows.
*/
class LegacyAsyncRowSampler
: public std::enable_shared_from_this<LegacyAsyncRowSampler> {
public:
static future<StatusOr<std::vector<RowKeySample>>> Create(
CompletionQueue cq, std::shared_ptr<DataClient> client,
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
MetadataUpdatePolicy metadata_update_policy, std::string app_profile_id,
std::string table_name);

private:
LegacyAsyncRowSampler(CompletionQueue cq, std::shared_ptr<DataClient> client,
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
MetadataUpdatePolicy metadata_update_policy,
std::string app_profile_id, std::string table_name);

void StartIteration();
future<bool> OnRead(google::bigtable::v2::SampleRowKeysResponse response);
void OnFinish(Status const& status);

CompletionQueue cq_;
std::shared_ptr<DataClient> client_;
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy_;
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy_;
MetadataUpdatePolicy metadata_update_policy_;
std::string app_profile_id_;
std::string table_name_;

bool stream_cancelled_ = false;
std::vector<RowKeySample> samples_;
promise<StatusOr<std::vector<RowKeySample>>> promise_;
};

} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_LEGACY_ASYNC_ROW_SAMPLER_H
Loading

0 comments on commit 0ed8eac

Please sign in to comment.