Skip to content

Commit

Permalink
feat: implement Bigtable connection refresh
Browse files Browse the repository at this point in the history
This fixes googleapis#4996.

This works, however it is not in its final form. Currently, background
threads running `CompletionQueue` are created underhandedly, without
users' control in every Bigtable `{Admin,InstanceAdmin,Data}Client`.

The choice of the `{Admin,InstanceAdmin,Data}Client` rather than e.g.
`Table` is consistent with Spanner and PubSub, where counterparts of
those clients are called Connections.

The next steps will contain removing the `CompletionQueue` from both the
underlying clients and the user-facing API and instead providing
`BackgroundThreads` to the ctors. I didn't decide to do this in chunks
because otherwise it would either be confusing, which CompletionQueues
are used or we'd have to duplicate the whole API to use the
CompletionQueue provided in the ctors.

More tests will be possible when `BackgroundThreadsFactory` will be
passed in the `ClientOptions`
  • Loading branch information
dopiera committed Dec 2, 2020
1 parent 83538fe commit deec432
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 51 deletions.
1 change: 0 additions & 1 deletion google/cloud/bigtable/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ add_library(
internal/bulk_mutator.cc
internal/bulk_mutator.h
internal/client_options_defaults.h
internal/common_client.cc
internal/common_client.h
internal/google_bytes_traits.cc
internal/google_bytes_traits.h
Expand Down
1 change: 0 additions & 1 deletion google/cloud/bigtable/bigtable_client.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ bigtable_client_srcs = [
"instance_update_config.cc",
"internal/async_bulk_apply.cc",
"internal/bulk_mutator.cc",
"internal/common_client.cc",
"internal/google_bytes_traits.cc",
"internal/prefix_range_end.cc",
"internal/readrowsparser.cc",
Expand Down
8 changes: 7 additions & 1 deletion google/cloud/bigtable/client_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ std::shared_ptr<grpc::ChannelCredentials> BigtableDefaultCredentials() {
}
return grpc::GoogleDefaultCredentials();
}
// As learned from experiments, idle gRPC connections enter IDLE state after 4m.
constexpr std::chrono::milliseconds kDefaultRefreshPeriod =
std::chrono::minutes(3);
} // anonymous namespace

namespace google {
Expand Down Expand Up @@ -96,7 +99,10 @@ ClientOptions::ClientOptions(std::shared_ptr<grpc::ChannelCredentials> creds)
admin_endpoint_("bigtableadmin.googleapis.com"),
instance_admin_endpoint_("bigtableadmin.googleapis.com"),
tracing_components_(internal::DefaultTracingComponents()),
tracing_options_(internal::DefaultTracingOptions()) {
tracing_options_(internal::DefaultTracingOptions()),
// Refresh connections before the server has a chance to disconnect them
// due to being idle.
max_conn_refresh_period_(kDefaultRefreshPeriod) {
static std::string const kUserAgentPrefix = UserAgentPrefix();
channel_arguments_.SetUserAgentPrefix(kUserAgentPrefix);
channel_arguments_.SetMaxSendMessageSize(
Expand Down
20 changes: 20 additions & 0 deletions google/cloud/bigtable/client_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,25 @@ class ClientOptions {
/// Return the options for use when tracing RPCs.
TracingOptions const& tracing_options() const { return tracing_options_; }

/**
* Maximum connection refresh period, as set via `set_max_conn_refresh_period`
*/
std::chrono::milliseconds max_conn_refresh_period() {
return max_conn_refresh_period_;
}

/**
* If set to a positive number, the client will refresh connections at random
* moments not more apart from each other than this duration. This is
* necessary to avoid all connections simultaneously expiring and causing
* latency spikes.
*/
ClientOptions& set_max_conn_refresh_period(
std::chrono::milliseconds max_conn_refresh_period) {
max_conn_refresh_period_ = max_conn_refresh_period;
return *this;
}

private:
friend struct internal::InstanceAdminTraits;
friend struct ClientOptionsTestTraits;
Expand All @@ -348,6 +367,7 @@ class ClientOptions {
std::string instance_admin_endpoint_;
std::set<std::string> tracing_components_;
TracingOptions tracing_options_;
std::chrono::milliseconds max_conn_refresh_period_;
};
} // namespace BIGTABLE_CLIENT_NS
} // namespace bigtable
Expand Down
42 changes: 0 additions & 42 deletions google/cloud/bigtable/internal/common_client.cc

This file was deleted.

111 changes: 105 additions & 6 deletions google/cloud/bigtable/internal/common_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "google/cloud/bigtable/client_options.h"
#include "google/cloud/bigtable/version.h"
#include "google/cloud/connection_options.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/log.h"
#include "google/cloud/status_or.h"
#include <grpcpp/grpcpp.h>

namespace google {
Expand All @@ -25,10 +29,6 @@ namespace bigtable {
inline namespace BIGTABLE_CLIENT_NS {
namespace internal {

/// Create a pool of `grpc::Channel` objects based on the client options.
std::vector<std::shared_ptr<grpc::Channel>> CreateChannelPool(
std::string const& endpoint, bigtable::ClientOptions const& options);

/**
* Refactor implementation of `bigtable::{Data,Admin,InstanceAdmin}Client`.
*
Expand All @@ -54,7 +54,10 @@ class CommonClient {
//@}

explicit CommonClient(bigtable::ClientOptions options)
: options_(std::move(options)), current_index_(0) {}
: options_(std::move(options)),
current_index_(0),
background_threads_(
google::cloud::internal::DefaultBackgroundThreads(1)) {}

/**
* Reset the channel and stub.
Expand Down Expand Up @@ -86,6 +89,15 @@ class CommonClient {

ClientOptions& Options() { return options_; }

~CommonClient() {
std::unique_lock<std::mutex> lk(mu_);
stop_refreshes_ = true;
// Make sure all timers finish before we start destroying structures which
// the timers potentially touch.
background_threads_->cq().CancelAll();
WaitForNoRefreshes(lk);
}

private:
/// Make sure the connections exit, and create them if needed.
void CheckConnections(std::unique_lock<std::mutex>& lk) {
Expand All @@ -103,7 +115,7 @@ class CommonClient {
// introduce attributes in the implementation of CreateChannelPool() to
// create one socket per element in the pool.
lk.unlock();
auto channels = CreateChannelPool(Traits::Endpoint(options_), options_);
auto channels = CreateChannelPool();
std::vector<StubPtr> tmp;
std::transform(channels.begin(), channels.end(), std::back_inserter(tmp),
[](std::shared_ptr<grpc::Channel> ch) {
Expand All @@ -125,6 +137,88 @@ class CommonClient {
}
}

std::chrono::milliseconds RandomizedRefreshDelay() {
return std::chrono::milliseconds(
std::uniform_int_distribution<std::chrono::milliseconds::rep>(
1, options_.max_conn_refresh_period().count())(rng_));
}

void IncNumRefreshesPending() {
std::lock_guard<std::mutex> lk(mu_);
++num_pending_refreshes_;
}

void DecNumRefreshesPending() {
std::lock_guard<std::mutex> lk(mu_);
if (--num_pending_refreshes_ == 0) {
no_more_refreshes_cond_.notify_all();
}
}

void WaitForNoRefreshes(std::unique_lock<std::mutex>& lk) {
no_more_refreshes_cond_.wait(
lk, [this] { return num_pending_refreshes_ == 0; });
}

void ScheduleChannelRefresh(std::size_t idx) {
IncNumRefreshesPending();
std::unique_lock<std::mutex> lk(mu_);
if (stop_refreshes_) {
return;
}
auto timer_fut =
background_threads_->cq().MakeRelativeTimer(RandomizedRefreshDelay());
lk.unlock();

timer_fut.then(
[this,
idx](future<StatusOr<std::chrono::system_clock::time_point>> fut) {
if (!fut.get()) {
DecNumRefreshesPending();
// Timer cancelled.
return;
}
background_threads_->cq()
.AsyncWaitConnectionReady(
channels_[idx],
std::chrono::system_clock::now() + std::chrono::seconds(10))
.then([this, idx](future<Status> fut) {
auto conn_status = fut.get();
if (!conn_status.ok()) {
GCP_LOG(WARNING) << "Failed to refresh connection to "
<< Traits::Endpoint(options_)
<< ". Error: " << conn_status;
}
ScheduleChannelRefresh(idx);
DecNumRefreshesPending();
});
});
}

ChannelPtr CreateChannel(std::size_t idx) {
auto args = options_.channel_arguments();
if (!options_.connection_pool_name().empty()) {
args.SetString("cbt-c++/connection-pool-name",
options_.connection_pool_name());
}
args.SetInt("cbt-c++/connection-pool-id", static_cast<int>(idx));
auto res = grpc::CreateCustomChannel(Traits::Endpoint(options_),
options_.credentials(), args);
if (options_.max_conn_refresh_period().count() == 0) {
return res;
}
ScheduleChannelRefresh(idx);
return res;
}

std::vector<std::shared_ptr<grpc::Channel>> CreateChannelPool() {
std::vector<std::shared_ptr<grpc::Channel>> result;
for (std::size_t i = 0; i != options_.connection_pool_size(); ++i) {
result.emplace_back(CreateChannel(i));
}
return result;
}

/// Get the current index for round-robin over connections.
std::size_t GetIndex() {
std::size_t current = current_index_++;
Expand All @@ -136,10 +230,15 @@ class CommonClient {
}

std::mutex mu_;
std::size_t num_pending_refreshes_{};
bool stop_refreshes_{};
std::condition_variable no_more_refreshes_cond_;
ClientOptions options_;
std::random_device rng_;
std::vector<ChannelPtr> channels_;
std::vector<StubPtr> stubs_;
std::size_t current_index_;
std::unique_ptr<BackgroundThreads> background_threads_;
};

} // namespace internal
Expand Down
48 changes: 48 additions & 0 deletions google/cloud/bigtable/tests/data_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,54 @@ TEST_F(DataIntegrationTest, TableApplyWithLogging) {
google::cloud::LogSink::Instance().RemoveBackend(id);
}

TEST(ConnectionRefresh, Disabled) {
auto data_client = bigtable::CreateDefaultDataClient(
testing::TableTestEnvironment::project_id(),
testing::TableTestEnvironment::instance_id(),
bigtable::ClientOptions().set_max_conn_refresh_period(
std::chrono::seconds(0)));
// Make sure something would have failed if were in fact actively warming up
// connections.
std::this_thread::sleep_for(std::chrono::milliseconds(100));

for (std::size_t i = 0; i < 10U; ++i) {
auto channel = data_client->Channel();
// It's not a proof but none of the selected channels was actively warmed
// up, it makes it likely we're not doing it.
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false));
}
// Make sure things still work.
bigtable::Table table(data_client, testing::TableTestEnvironment::table_id());
std::string const row_key = "row-key-1";
std::vector<Cell> created{{row_key, kFamily4, "c0", 1000, "v1000"},
{row_key, kFamily4, "c1", 2000, "v2000"}};
Apply(table, row_key, created);
}

TEST(ConnectionRefresh, Frequent) {
auto data_client = bigtable::CreateDefaultDataClient(
testing::TableTestEnvironment::project_id(),
testing::TableTestEnvironment::instance_id(),
bigtable::ClientOptions().set_max_conn_refresh_period(
std::chrono::milliseconds(100)));

for (;;) {
if (data_client->Channel()->GetState(false) == GRPC_CHANNEL_READY) {
// We've found a channel which changed its state from IDLE to READY, which
// means that our refreshing mechanism works.
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// Make sure things still work.
bigtable::Table table(data_client, testing::TableTestEnvironment::table_id());
std::string const row_key = "row-key-1";
std::vector<Cell> created{{row_key, kFamily4, "c0", 1000, "v1000"},
{row_key, kFamily4, "c1", 2000, "v2000"}};
Apply(table, row_key, created);
}

} // namespace
} // namespace BIGTABLE_CLIENT_NS
} // namespace bigtable
Expand Down

0 comments on commit deec432

Please sign in to comment.