Skip to content

Commit

Permalink
feat: implement Bigtable connection refresh (#5550)
Browse files Browse the repository at this point in the history
This fixes #4996.

This works, however it is not in its final form. Currently, background
threads running `CompletionQueue` are created underhandedly, without
users' control in every Bigtable `{Admin,InstanceAdmin,Data}Client`.

The choice of the `{Admin,InstanceAdmin,Data}Client` rather than e.g.
`Table` is consistent with Spanner and PubSub, where counterparts of
those clients are called Connections.

The next steps will contain removing the `CompletionQueue` from both the
underlying clients and the user-facing API and instead providing
`BackgroundThreads` to the ctors. I didn't decide to do this in chunks
because otherwise it would either be confusing, which CompletionQueues
are used or we'd have to duplicate the whole API to use the
CompletionQueue provided in the ctors.

More tests will be possible when `BackgroundThreadsFactory` will be
passed in the `ClientOptions`
  • Loading branch information
dopiera authored Dec 9, 2020
1 parent a138330 commit 4759120
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 20 deletions.
8 changes: 7 additions & 1 deletion google/cloud/bigtable/client_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ std::shared_ptr<grpc::ChannelCredentials> BigtableDefaultCredentials() {
}
return grpc::GoogleDefaultCredentials();
}
// As learned from experiments, idle gRPC connections enter IDLE state after 4m.
std::chrono::milliseconds constexpr kDefaultRefreshPeriod =
std::chrono::minutes(3);
} // anonymous namespace

namespace google {
Expand Down Expand Up @@ -96,7 +99,10 @@ ClientOptions::ClientOptions(std::shared_ptr<grpc::ChannelCredentials> creds)
admin_endpoint_("bigtableadmin.googleapis.com"),
instance_admin_endpoint_("bigtableadmin.googleapis.com"),
tracing_components_(internal::DefaultTracingComponents()),
tracing_options_(internal::DefaultTracingOptions()) {
tracing_options_(internal::DefaultTracingOptions()),
// Refresh connections before the server has a chance to disconnect them
// due to being idle.
max_conn_refresh_period_(kDefaultRefreshPeriod) {
static std::string const kUserAgentPrefix = UserAgentPrefix();
channel_arguments_.SetUserAgentPrefix(kUserAgentPrefix);
channel_arguments_.SetMaxSendMessageSize(
Expand Down
20 changes: 20 additions & 0 deletions google/cloud/bigtable/client_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,25 @@ class ClientOptions {
/// Return the options for use when tracing RPCs.
TracingOptions const& tracing_options() const { return tracing_options_; }

/**
* Maximum connection refresh period, as set via `set_max_conn_refresh_period`
*/
std::chrono::milliseconds max_conn_refresh_period() {
return max_conn_refresh_period_;
}

/**
* If set to a positive number, the client will refresh connections at random
* moments not more apart from each other than this duration. This is
* necessary to avoid all connections simultaneously expiring and causing
* latency spikes.
*/
ClientOptions& set_max_conn_refresh_period(
std::chrono::milliseconds max_conn_refresh_period) {
max_conn_refresh_period_ = max_conn_refresh_period;
return *this;
}

private:
friend struct internal::InstanceAdminTraits;
friend struct ClientOptionsTestTraits;
Expand All @@ -348,6 +367,7 @@ class ClientOptions {
std::string instance_admin_endpoint_;
std::set<std::string> tracing_components_;
TracingOptions tracing_options_;
std::chrono::milliseconds max_conn_refresh_period_;
};
} // namespace BIGTABLE_CLIENT_NS
} // namespace bigtable
Expand Down
64 changes: 50 additions & 14 deletions google/cloud/bigtable/internal/common_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,56 @@ namespace bigtable {
inline namespace BIGTABLE_CLIENT_NS {
namespace internal {

std::vector<std::shared_ptr<grpc::Channel>> CreateChannelPool(
std::string const& endpoint, bigtable::ClientOptions const& options) {
std::vector<std::shared_ptr<grpc::Channel>> result;
for (std::size_t i = 0; i != options.connection_pool_size(); ++i) {
auto args = options.channel_arguments();
if (!options.connection_pool_name().empty()) {
args.SetString("cbt-c++/connection-pool-name",
options.connection_pool_name());
}
args.SetInt("cbt-c++/connection-pool-id", static_cast<int>(i));
result.push_back(
grpc::CreateCustomChannel(endpoint, options.credentials(), args));
}
return result;
ConnectionRefreshState::ConnectionRefreshState(
std::chrono::milliseconds max_conn_refresh_period)
: max_conn_refresh_period_(max_conn_refresh_period),
rng_(std::random_device{}()) {}

std::chrono::milliseconds ConnectionRefreshState::RandomizedRefreshDelay() {
std::lock_guard<std::mutex> lk(mu_);
return std::chrono::milliseconds(
std::uniform_int_distribution<decltype(max_conn_refresh_period_)::rep>(
1, max_conn_refresh_period_.count())(rng_));
}

void ScheduleChannelRefresh(
std::shared_ptr<CompletionQueue> const& cq,
std::shared_ptr<ConnectionRefreshState> const& state,
std::shared_ptr<grpc::Channel> const& channel) {
// The timers will only hold weak pointers to the channel or to the
// completion queue, so if either of them are destroyed, the timer chain
// will simply not continue. Unfortunately, that means that some stray
// timers may remain in the `CompletionQueue` for a while, but this is
// generally unavoidable because there is no way to cancel individual
// timers.
std::weak_ptr<grpc::Channel> weak_channel(channel);
std::weak_ptr<CompletionQueue> weak_cq(cq);
using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;
cq->MakeRelativeTimer(state->RandomizedRefreshDelay())
.then([weak_channel, weak_cq, state](TimerFuture fut) {
if (!fut.get()) {
// Timer cancelled.
return;
}
auto channel = weak_channel.lock();
if (!channel) return;
auto cq = weak_cq.lock();
if (!cq) return;
cq->AsyncWaitConnectionReady(channel, std::chrono::system_clock::now() +
kConnectionReadyTimeout)
.then([weak_channel, weak_cq, state](future<Status> fut) {
auto conn_status = fut.get();
if (!conn_status.ok()) {
GCP_LOG(WARNING)
<< "Failed to refresh connection. Error: " << conn_status;
}
auto channel = weak_channel.lock();
if (!channel) return;
auto cq = weak_cq.lock();
if (!cq) return;
ScheduleChannelRefresh(cq, state, channel);
});
});
}

} // namespace internal
Expand Down
93 changes: 88 additions & 5 deletions google/cloud/bigtable/internal/common_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "google/cloud/bigtable/client_options.h"
#include "google/cloud/bigtable/version.h"
#include "google/cloud/connection_options.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/log.h"
#include "google/cloud/status_or.h"
#include <grpcpp/grpcpp.h>

namespace google {
Expand All @@ -25,9 +29,40 @@ namespace bigtable {
inline namespace BIGTABLE_CLIENT_NS {
namespace internal {

/// Create a pool of `grpc::Channel` objects based on the client options.
std::vector<std::shared_ptr<grpc::Channel>> CreateChannelPool(
std::string const& endpoint, bigtable::ClientOptions const& options);
/**
* Time after which we bail out waiting for a connection to become ready.
*
* This number was copied from the Java client and there doesn't seem to be a
* well-founded reason for it to be exactly this. It should not bee too large
* since waiting for a connection to become ready is not cancellable.
*/
std::chrono::seconds constexpr kConnectionReadyTimeout(10);

/**
* State required by timers scheduled by `CommonClient`.
*
* The scheduled timers might outlive `CommonClient`. They need some shared,
* persistent state. Objects of this class implement it.
*/
class ConnectionRefreshState {
public:
explicit ConnectionRefreshState(
std::chrono::milliseconds max_conn_refresh_period);
std::chrono::milliseconds RandomizedRefreshDelay();

private:
std::mutex mu_;
std::chrono::milliseconds max_conn_refresh_period_;
google::cloud::internal::DefaultPRNG rng_;
};

/**
* Schedule a chain of timers to refresh the connection.
*/
void ScheduleChannelRefresh(
std::shared_ptr<CompletionQueue> const& cq,
std::shared_ptr<ConnectionRefreshState> const& state,
std::shared_ptr<grpc::Channel> const& channel);

/**
* Refactor implementation of `bigtable::{Data,Admin,InstanceAdmin}Client`.
Expand All @@ -54,7 +89,21 @@ class CommonClient {
//@}

explicit CommonClient(bigtable::ClientOptions options)
: options_(std::move(options)), current_index_(0) {}
: options_(std::move(options)),
current_index_(0),
background_threads_(
google::cloud::internal::DefaultBackgroundThreads(1)),
cq_(std::make_shared<CompletionQueue>(background_threads_->cq())),
refresh_state_(std::make_shared<ConnectionRefreshState>(
options_.max_conn_refresh_period())) {}

~CommonClient() {
// This will stop the refresh of the channels.
channels_.clear();
// TODO(2567): remove this call when the user will have to provide their own
// `CompletionQueues`
background_threads_->cq().CancelAll();
}

/**
* Reset the channel and stub.
Expand Down Expand Up @@ -103,7 +152,7 @@ class CommonClient {
// introduce attributes in the implementation of CreateChannelPool() to
// create one socket per element in the pool.
lk.unlock();
auto channels = CreateChannelPool(Traits::Endpoint(options_), options_);
auto channels = CreateChannelPool();
std::vector<StubPtr> tmp;
std::transform(channels.begin(), channels.end(), std::back_inserter(tmp),
[](std::shared_ptr<grpc::Channel> ch) {
Expand All @@ -125,6 +174,30 @@ class CommonClient {
}
}

ChannelPtr CreateChannel(std::size_t idx) {
auto args = options_.channel_arguments();
if (!options_.connection_pool_name().empty()) {
args.SetString("cbt-c++/connection-pool-name",
options_.connection_pool_name());
}
args.SetInt("cbt-c++/connection-pool-id", static_cast<int>(idx));
auto res = grpc::CreateCustomChannel(Traits::Endpoint(options_),
options_.credentials(), args);
if (options_.max_conn_refresh_period().count() == 0) {
return res;
}
ScheduleChannelRefresh(cq_, refresh_state_, res);
return res;
}

std::vector<std::shared_ptr<grpc::Channel>> CreateChannelPool() {
std::vector<std::shared_ptr<grpc::Channel>> result;
for (std::size_t i = 0; i != options_.connection_pool_size(); ++i) {
result.emplace_back(CreateChannel(i));
}
return result;
}

/// Get the current index for round-robin over connections.
std::size_t GetIndex() {
std::size_t current = current_index_++;
Expand All @@ -136,10 +209,20 @@ class CommonClient {
}

std::mutex mu_;
std::size_t num_pending_refreshes_{};
ClientOptions options_;
std::vector<ChannelPtr> channels_;
std::vector<StubPtr> stubs_;
std::size_t current_index_;
std::unique_ptr<BackgroundThreads> background_threads_;
// Timers, which we schedule for refreshes, need to reference the completion
// queue. We cannot make the completion queue's underlying implementation
// become owned solely by the operations scheduled on it (because we risk a
// deadlock). We solve both problems by holding only weak pointers to the
// completion queue in the operations scheduled on it. In order to do it, we
// need to hold one instance by a shared pointer.
std::shared_ptr<CompletionQueue> cq_;
std::shared_ptr<ConnectionRefreshState> refresh_state_;
};

} // namespace internal
Expand Down
66 changes: 66 additions & 0 deletions google/cloud/bigtable/tests/data_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,72 @@ TEST_F(DataIntegrationTest, TableApplyWithLogging) {
google::cloud::LogSink::Instance().RemoveBackend(id);
}

TEST(ConnectionRefresh, Disabled) {
auto client_options = bigtable::ClientOptions().set_max_conn_refresh_period(
std::chrono::seconds(0));
auto data_client = bigtable::CreateDefaultDataClient(
testing::TableTestEnvironment::project_id(),
testing::TableTestEnvironment::instance_id(), client_options);
// In general, it is hard to show that something has *not* happened, at best
// we can show that its side-effects have not happened. In this case we want
// to show that the channels have not been refreshed. A side-effect of
// refreshing a channel is that it enters the `READY` state, so we check that
// this has not taken place and take that as evidence that no refresh has
// taken place.
//
// After the `CompletionQueue` argument is removed from the `Bigtable` API, we
// will have an option to provide a mock `CompletionQueue` to the `DataClient`
// for test purposes and verify that no timers are created, which will be a
// superior way to write this test.
std::this_thread::sleep_for(std::chrono::milliseconds(100));

for (std::size_t i = 0; i < client_options.connection_pool_size(); ++i) {
auto channel = data_client->Channel();
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false));
}
// Make sure things still work.
bigtable::Table table(data_client, testing::TableTestEnvironment::table_id());
std::string const row_key = "row-key-1";
std::vector<Cell> created{{row_key, kFamily4, "c0", 1000, "v1000"},
{row_key, kFamily4, "c1", 2000, "v2000"}};
Apply(table, row_key, created);
// After performing some operations, some of the channels should be in ready
// state.
auto check_if_some_channel_is_ready = [&] {
for (std::size_t i = 0; i < client_options.connection_pool_size(); ++i) {
if (data_client->Channel()->GetState(false) == GRPC_CHANNEL_READY) {
return true;
}
}
return false;
};
EXPECT_TRUE(check_if_some_channel_is_ready());
}

TEST(ConnectionRefresh, Frequent) {
auto data_client = bigtable::CreateDefaultDataClient(
testing::TableTestEnvironment::project_id(),
testing::TableTestEnvironment::instance_id(),
bigtable::ClientOptions().set_max_conn_refresh_period(
std::chrono::milliseconds(100)));

for (;;) {
if (data_client->Channel()->GetState(false) == GRPC_CHANNEL_READY) {
// We've found a channel which changed its state from IDLE to READY,
// which means that our refreshing mechanism works.
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// Make sure things still work.
bigtable::Table table(data_client, testing::TableTestEnvironment::table_id());
std::string const row_key = "row-key-1";
std::vector<Cell> created{{row_key, kFamily4, "c0", 1000, "v1000"},
{row_key, kFamily4, "c1", 2000, "v2000"}};
Apply(table, row_key, created);
}

} // namespace
} // namespace BIGTABLE_CLIENT_NS
} // namespace bigtable
Expand Down

0 comments on commit 4759120

Please sign in to comment.