From 5386f75d05b36ebd1692ce1ae9ea5179c1933cc0 Mon Sep 17 00:00:00 2001 From: Marek Dopiera Date: Wed, 25 Nov 2020 14:53:12 +0000 Subject: [PATCH] feat: implement Bigtable connection refresh This fixes #4996. This works, however it is not in its final form. Currently, background threads running `CompletionQueue` are created underhandedly, without users' control in every Bigtable `{Admin,InstanceAdmin,Data}Client`. The choice of the `{Admin,InstanceAdmin,Data}Client` rather than e.g. `Table` is consistent with Spanner and PubSub, where counterparts of those clients are called Connections. The next steps will contain removing the `CompletionQueue` from both the underlying clients and the user-facing API and instead providing `BackgroundThreads` to the ctors. I didn't decide to do this in chunks because otherwise it would either be confusing, which CompletionQueues are used or we'd have to duplicate the whole API to use the CompletionQueue provided in the ctors. More tests will be possible when `BackgroundThreadsFactory` will be passed in the `ClientOptions` --- google/cloud/bigtable/client_options.cc | 8 +- google/cloud/bigtable/client_options.h | 20 ++++ .../cloud/bigtable/internal/common_client.cc | 64 ++++++++++--- .../cloud/bigtable/internal/common_client.h | 93 ++++++++++++++++++- .../bigtable/tests/data_integration_test.cc | 66 +++++++++++++ 5 files changed, 231 insertions(+), 20 deletions(-) diff --git a/google/cloud/bigtable/client_options.cc b/google/cloud/bigtable/client_options.cc index 4ec7096b02c05..7f1af131b79eb 100644 --- a/google/cloud/bigtable/client_options.cc +++ b/google/cloud/bigtable/client_options.cc @@ -27,6 +27,9 @@ std::shared_ptr BigtableDefaultCredentials() { } return grpc::GoogleDefaultCredentials(); } +// As learned from experiments, idle gRPC connections enter IDLE state after 4m. +std::chrono::milliseconds constexpr kDefaultRefreshPeriod = + std::chrono::minutes(3); } // anonymous namespace namespace google { @@ -96,7 +99,10 @@ ClientOptions::ClientOptions(std::shared_ptr creds) admin_endpoint_("bigtableadmin.googleapis.com"), instance_admin_endpoint_("bigtableadmin.googleapis.com"), tracing_components_(internal::DefaultTracingComponents()), - tracing_options_(internal::DefaultTracingOptions()) { + tracing_options_(internal::DefaultTracingOptions()), + // Refresh connections before the server has a chance to disconnect them + // due to being idle. + max_conn_refresh_period_(kDefaultRefreshPeriod) { static std::string const kUserAgentPrefix = UserAgentPrefix(); channel_arguments_.SetUserAgentPrefix(kUserAgentPrefix); channel_arguments_.SetMaxSendMessageSize( diff --git a/google/cloud/bigtable/client_options.h b/google/cloud/bigtable/client_options.h index e991b783d9d1a..d17dc91a2b146 100644 --- a/google/cloud/bigtable/client_options.h +++ b/google/cloud/bigtable/client_options.h @@ -326,6 +326,25 @@ class ClientOptions { /// Return the options for use when tracing RPCs. TracingOptions const& tracing_options() const { return tracing_options_; } + /** + * Maximum connection refresh period, as set via `set_max_conn_refresh_period` + */ + std::chrono::milliseconds max_conn_refresh_period() { + return max_conn_refresh_period_; + } + + /** + * If set to a positive number, the client will refresh connections at random + * moments not more apart from each other than this duration. This is + * necessary to avoid all connections simultaneously expiring and causing + * latency spikes. + */ + ClientOptions& set_max_conn_refresh_period( + std::chrono::milliseconds max_conn_refresh_period) { + max_conn_refresh_period_ = max_conn_refresh_period; + return *this; + } + private: friend struct internal::InstanceAdminTraits; friend struct ClientOptionsTestTraits; @@ -348,6 +367,7 @@ class ClientOptions { std::string instance_admin_endpoint_; std::set tracing_components_; TracingOptions tracing_options_; + std::chrono::milliseconds max_conn_refresh_period_; }; } // namespace BIGTABLE_CLIENT_NS } // namespace bigtable diff --git a/google/cloud/bigtable/internal/common_client.cc b/google/cloud/bigtable/internal/common_client.cc index 898d1e7ade19c..4369047b7b484 100644 --- a/google/cloud/bigtable/internal/common_client.cc +++ b/google/cloud/bigtable/internal/common_client.cc @@ -19,20 +19,56 @@ namespace bigtable { inline namespace BIGTABLE_CLIENT_NS { namespace internal { -std::vector> CreateChannelPool( - std::string const& endpoint, bigtable::ClientOptions const& options) { - std::vector> result; - for (std::size_t i = 0; i != options.connection_pool_size(); ++i) { - auto args = options.channel_arguments(); - if (!options.connection_pool_name().empty()) { - args.SetString("cbt-c++/connection-pool-name", - options.connection_pool_name()); - } - args.SetInt("cbt-c++/connection-pool-id", static_cast(i)); - result.push_back( - grpc::CreateCustomChannel(endpoint, options.credentials(), args)); - } - return result; +ConnectionRefreshState::ConnectionRefreshState( + std::chrono::milliseconds max_conn_refresh_period) + : max_conn_refresh_period_(max_conn_refresh_period), + rng_(std::random_device{}()) {} + +std::chrono::milliseconds ConnectionRefreshState::RandomizedRefreshDelay() { + std::lock_guard lk(mu_); + return std::chrono::milliseconds( + std::uniform_int_distribution( + 1, max_conn_refresh_period_.count())(rng_)); +} + +void ScheduleChannelRefresh( + std::shared_ptr const& cq, + std::shared_ptr const& state, + std::shared_ptr const& channel) { + // The timers will only hold weak pointers to the channel or to the + // completion queue, so if either of them are destroyed, the timer chain + // will simply not continue. Unfortunately, that means that some stray + // timers may remain in the `CompletionQueue` for a while, but this is + // generally unavoidable because there is no way to cancel individual + // timers. + std::weak_ptr weak_channel(channel); + std::weak_ptr weak_cq(cq); + using TimerFuture = future>; + cq->MakeRelativeTimer(state->RandomizedRefreshDelay()) + .then([weak_channel, weak_cq, state](TimerFuture fut) { + if (!fut.get()) { + // Timer cancelled. + return; + } + auto channel = weak_channel.lock(); + if (!channel) return; + auto cq = weak_cq.lock(); + if (!cq) return; + cq->AsyncWaitConnectionReady(channel, std::chrono::system_clock::now() + + kConnectionReadyTimeout) + .then([weak_channel, weak_cq, state](future fut) { + auto conn_status = fut.get(); + if (!conn_status.ok()) { + GCP_LOG(WARNING) + << "Failed to refresh connection. Error: " << conn_status; + } + auto channel = weak_channel.lock(); + if (!channel) return; + auto cq = weak_cq.lock(); + if (!cq) return; + ScheduleChannelRefresh(cq, state, channel); + }); + }); } } // namespace internal diff --git a/google/cloud/bigtable/internal/common_client.h b/google/cloud/bigtable/internal/common_client.h index 4e1b9544dbce4..f2e7c8247d414 100644 --- a/google/cloud/bigtable/internal/common_client.h +++ b/google/cloud/bigtable/internal/common_client.h @@ -17,6 +17,10 @@ #include "google/cloud/bigtable/client_options.h" #include "google/cloud/bigtable/version.h" +#include "google/cloud/connection_options.h" +#include "google/cloud/internal/random.h" +#include "google/cloud/log.h" +#include "google/cloud/status_or.h" #include namespace google { @@ -25,9 +29,40 @@ namespace bigtable { inline namespace BIGTABLE_CLIENT_NS { namespace internal { -/// Create a pool of `grpc::Channel` objects based on the client options. -std::vector> CreateChannelPool( - std::string const& endpoint, bigtable::ClientOptions const& options); +/** + * Time after which we bail out waiting for a connection to become ready. + * + * This number was copied from the Java client and there doesn't seem to be a + * well-founded reason for it to be exactly this. It should not bee too large + * since waiting for a connection to become ready is not cancellable. + */ +std::chrono::seconds constexpr kConnectionReadyTimeout(10); + +/** + * State required by timers scheduled by `CommonClient`. + * + * The scheduled timers might outlive `CommonClient`. They need some shared, + * persistent state. Objects of this class implement it. + */ +class ConnectionRefreshState { + public: + explicit ConnectionRefreshState( + std::chrono::milliseconds max_conn_refresh_period); + std::chrono::milliseconds RandomizedRefreshDelay(); + + private: + std::mutex mu_; + std::chrono::milliseconds max_conn_refresh_period_; + google::cloud::internal::DefaultPRNG rng_; +}; + +/** + * Schedule a chain of timers to refresh the connection. + */ +void ScheduleChannelRefresh( + std::shared_ptr const& cq, + std::shared_ptr const& state, + std::shared_ptr const& channel); /** * Refactor implementation of `bigtable::{Data,Admin,InstanceAdmin}Client`. @@ -54,7 +89,21 @@ class CommonClient { //@} explicit CommonClient(bigtable::ClientOptions options) - : options_(std::move(options)), current_index_(0) {} + : options_(std::move(options)), + current_index_(0), + background_threads_( + google::cloud::internal::DefaultBackgroundThreads(1)), + cq_(std::make_shared(background_threads_->cq())), + refresh_state_(std::make_shared( + options_.max_conn_refresh_period())) {} + + ~CommonClient() { + // This will stop the refresh of the channels. + channels_.clear(); + // TODO(2567): remove this call when the user will have to provide their own + // `CompletionQueues` + background_threads_->cq().CancelAll(); + } /** * Reset the channel and stub. @@ -103,7 +152,7 @@ class CommonClient { // introduce attributes in the implementation of CreateChannelPool() to // create one socket per element in the pool. lk.unlock(); - auto channels = CreateChannelPool(Traits::Endpoint(options_), options_); + auto channels = CreateChannelPool(); std::vector tmp; std::transform(channels.begin(), channels.end(), std::back_inserter(tmp), [](std::shared_ptr ch) { @@ -125,6 +174,30 @@ class CommonClient { } } + ChannelPtr CreateChannel(std::size_t idx) { + auto args = options_.channel_arguments(); + if (!options_.connection_pool_name().empty()) { + args.SetString("cbt-c++/connection-pool-name", + options_.connection_pool_name()); + } + args.SetInt("cbt-c++/connection-pool-id", static_cast(idx)); + auto res = grpc::CreateCustomChannel(Traits::Endpoint(options_), + options_.credentials(), args); + if (options_.max_conn_refresh_period().count() == 0) { + return res; + } + ScheduleChannelRefresh(cq_, refresh_state_, res); + return res; + } + + std::vector> CreateChannelPool() { + std::vector> result; + for (std::size_t i = 0; i != options_.connection_pool_size(); ++i) { + result.emplace_back(CreateChannel(i)); + } + return result; + } + /// Get the current index for round-robin over connections. std::size_t GetIndex() { std::size_t current = current_index_++; @@ -136,10 +209,20 @@ class CommonClient { } std::mutex mu_; + std::size_t num_pending_refreshes_{}; ClientOptions options_; std::vector channels_; std::vector stubs_; std::size_t current_index_; + std::unique_ptr background_threads_; + // Timers, which we schedule for refreshes, need to reference the completion + // queue. We cannot make the completion queue's underlying implementation + // become owned solely by the operations scheduled on it (because we risk a + // deadlock). We solve both problems by holding only weak pointers to the + // completion queue in the operations scheduled on it. In order to do it, we + // need to hold one instance by a shared pointer. + std::shared_ptr cq_; + std::shared_ptr refresh_state_; }; } // namespace internal diff --git a/google/cloud/bigtable/tests/data_integration_test.cc b/google/cloud/bigtable/tests/data_integration_test.cc index 931bf3fcedabe..fd6f92a7682a6 100644 --- a/google/cloud/bigtable/tests/data_integration_test.cc +++ b/google/cloud/bigtable/tests/data_integration_test.cc @@ -600,6 +600,72 @@ TEST_F(DataIntegrationTest, TableApplyWithLogging) { google::cloud::LogSink::Instance().RemoveBackend(id); } +TEST(ConnectionRefresh, Disabled) { + auto client_options = bigtable::ClientOptions().set_max_conn_refresh_period( + std::chrono::seconds(0)); + auto data_client = bigtable::CreateDefaultDataClient( + testing::TableTestEnvironment::project_id(), + testing::TableTestEnvironment::instance_id(), client_options); + // In general, it is hard to show that something has *not* happened, at best + // we can show that its side-effects have not happened. In this case we want + // to show that the channels have not been refreshed. A side-effect of + // refreshing a channel is that it enters the `READY` state, so we check that + // this has not taken place and take that as evidence that no refresh has + // taken place. + // + // After the `CompletionQueue` argument is removed from the `Bigtable` API, we + // will have an option to provide a mock `CompletionQueue` to the `DataClient` + // for test purposes and verify that no timers are created, which will be a + // superior way to write this test. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + for (std::size_t i = 0; i < client_options.connection_pool_size(); ++i) { + auto channel = data_client->Channel(); + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false)); + } + // Make sure things still work. + bigtable::Table table(data_client, testing::TableTestEnvironment::table_id()); + std::string const row_key = "row-key-1"; + std::vector created{{row_key, kFamily4, "c0", 1000, "v1000"}, + {row_key, kFamily4, "c1", 2000, "v2000"}}; + Apply(table, row_key, created); + // After performing some operations, some of the channels should be in ready + // state. + auto check_if_some_channel_is_ready = [&] { + for (std::size_t i = 0; i < client_options.connection_pool_size(); ++i) { + if (data_client->Channel()->GetState(false) == GRPC_CHANNEL_READY) { + return true; + } + } + return false; + }; + EXPECT_TRUE(check_if_some_channel_is_ready()); +} + +TEST(ConnectionRefresh, Frequent) { + auto data_client = bigtable::CreateDefaultDataClient( + testing::TableTestEnvironment::project_id(), + testing::TableTestEnvironment::instance_id(), + bigtable::ClientOptions().set_max_conn_refresh_period( + std::chrono::milliseconds(100))); + + for (;;) { + if (data_client->Channel()->GetState(false) == GRPC_CHANNEL_READY) { + // We've found a channel which changed its state from IDLE to READY, + // which means that our refreshing mechanism works. + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + // Make sure things still work. + bigtable::Table table(data_client, testing::TableTestEnvironment::table_id()); + std::string const row_key = "row-key-1"; + std::vector created{{row_key, kFamily4, "c0", 1000, "v1000"}, + {row_key, kFamily4, "c1", 2000, "v2000"}}; + Apply(table, row_key, created); +} + } // namespace } // namespace BIGTABLE_CLIENT_NS } // namespace bigtable