Skip to content

Commit

Permalink
feat: implement Bigtable connection refresh
Browse files Browse the repository at this point in the history
This fixes googleapis#4996.

This works, however it is not in its final form. Currently, background
threads running `CompletionQueue` are created underhandedly, without
users' control in every Bigtable `{Admin,InstanceAdmin,Data}Client`.

The choice of the `{Admin,InstanceAdmin,Data}Client` rather than e.g.
`Table` is consistent with Spanner and PubSub, where counterparts of
those clients are called Connections.

The next steps will contain removing the `CompletionQueue` from both the
underlying clients and the user-facing API and instead providing
`BackgroundThreads` to the ctors. I didn't decide to do this in chunks
because otherwise it would either be confusing, which CompletionQueues
are used or we'd have to duplicate the whole API to use the
CompletionQueue provided in the ctors.

More tests will be possible when `BackgroundThreadsFactory` will be
passed in the `ClientOptions`
  • Loading branch information
dopiera committed Dec 3, 2020
1 parent 83538fe commit 21cbdeb
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 20 deletions.
8 changes: 7 additions & 1 deletion google/cloud/bigtable/client_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ std::shared_ptr<grpc::ChannelCredentials> BigtableDefaultCredentials() {
}
return grpc::GoogleDefaultCredentials();
}
// As learned from experiments, idle gRPC connections enter IDLE state after 4m.
constexpr std::chrono::milliseconds kDefaultRefreshPeriod =
std::chrono::minutes(3);
} // anonymous namespace

namespace google {
Expand Down Expand Up @@ -96,7 +99,10 @@ ClientOptions::ClientOptions(std::shared_ptr<grpc::ChannelCredentials> creds)
admin_endpoint_("bigtableadmin.googleapis.com"),
instance_admin_endpoint_("bigtableadmin.googleapis.com"),
tracing_components_(internal::DefaultTracingComponents()),
tracing_options_(internal::DefaultTracingOptions()) {
tracing_options_(internal::DefaultTracingOptions()),
// Refresh connections before the server has a chance to disconnect them
// due to being idle.
max_conn_refresh_period_(kDefaultRefreshPeriod) {
static std::string const kUserAgentPrefix = UserAgentPrefix();
channel_arguments_.SetUserAgentPrefix(kUserAgentPrefix);
channel_arguments_.SetMaxSendMessageSize(
Expand Down
20 changes: 20 additions & 0 deletions google/cloud/bigtable/client_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,25 @@ class ClientOptions {
/// Return the options for use when tracing RPCs.
TracingOptions const& tracing_options() const { return tracing_options_; }

/**
* Maximum connection refresh period, as set via `set_max_conn_refresh_period`
*/
std::chrono::milliseconds max_conn_refresh_period() {
return max_conn_refresh_period_;
}

/**
* If set to a positive number, the client will refresh connections at random
* moments not more apart from each other than this duration. This is
* necessary to avoid all connections simultaneously expiring and causing
* latency spikes.
*/
ClientOptions& set_max_conn_refresh_period(
std::chrono::milliseconds max_conn_refresh_period) {
max_conn_refresh_period_ = max_conn_refresh_period;
return *this;
}

private:
friend struct internal::InstanceAdminTraits;
friend struct ClientOptionsTestTraits;
Expand All @@ -348,6 +367,7 @@ class ClientOptions {
std::string instance_admin_endpoint_;
std::set<std::string> tracing_components_;
TracingOptions tracing_options_;
std::chrono::milliseconds max_conn_refresh_period_;
};
} // namespace BIGTABLE_CLIENT_NS
} // namespace bigtable
Expand Down
71 changes: 57 additions & 14 deletions google/cloud/bigtable/internal/common_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,64 @@ namespace cloud {
namespace bigtable {
inline namespace BIGTABLE_CLIENT_NS {
namespace internal {
namespace {

std::vector<std::shared_ptr<grpc::Channel>> CreateChannelPool(
std::string const& endpoint, bigtable::ClientOptions const& options) {
std::vector<std::shared_ptr<grpc::Channel>> result;
for (std::size_t i = 0; i != options.connection_pool_size(); ++i) {
auto args = options.channel_arguments();
if (!options.connection_pool_name().empty()) {
args.SetString("cbt-c++/connection-pool-name",
options.connection_pool_name());
}
args.SetInt("cbt-c++/connection-pool-id", static_cast<int>(i));
result.push_back(
grpc::CreateCustomChannel(endpoint, options.credentials(), args));
}
return result;
std::chrono::milliseconds RandomizedRefreshDelay(
std::chrono::milliseconds max_conn_refresh_period) {
static google::cloud::internal::DefaultPRNG rng(std::random_device{}());
return std::chrono::milliseconds(
std::uniform_int_distribution<decltype(max_conn_refresh_period)::rep>(
1, max_conn_refresh_period.count())(rng));
}

} // namespace

void ScheduleChannelRefresh(std::shared_ptr<CompletionQueue> const& cq,
std::shared_ptr<grpc::Channel> const& channel,
std::chrono::milliseconds max_conn_refresh_period) {
// The timers will only hold weak pointers to the channel or to the
// completion queue, so if either of them are destroyed, the timer chain
// will simply not continue. Unfortunately, that means that some stray
// timers may remain in the `CompletionQueue` for a while, but this is
// generally unavoidable because there is no way to cancel individual
// timers.
std::weak_ptr<grpc::Channel> weak_channel(channel);
std::weak_ptr<CompletionQueue> weak_cq(cq);
using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;
cq->MakeRelativeTimer(RandomizedRefreshDelay(max_conn_refresh_period))
.then([weak_channel, weak_cq, max_conn_refresh_period](TimerFuture fut) {
if (!fut.get()) {
// Timer cancelled.
return;
}
auto channel = weak_channel.lock();
if (!channel) {
return;
}
auto cq = weak_cq.lock();
if (!cq) {
return;
}
cq->AsyncWaitConnectionReady(channel, std::chrono::system_clock::now() +
kConnectionReadyTimeout)
.then([weak_channel, weak_cq,
max_conn_refresh_period](future<Status> fut) {
auto conn_status = fut.get();
if (!conn_status.ok()) {
GCP_LOG(WARNING)
<< "Failed to refresh connection. Error: " << conn_status;
}
auto channel = weak_channel.lock();
if (!channel) {
return;
}
auto cq = weak_cq.lock();
if (!cq) {
return;
}
ScheduleChannelRefresh(cq, channel, max_conn_refresh_period);
});
});
}

} // namespace internal
Expand Down
69 changes: 64 additions & 5 deletions google/cloud/bigtable/internal/common_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "google/cloud/bigtable/client_options.h"
#include "google/cloud/bigtable/version.h"
#include "google/cloud/connection_options.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/log.h"
#include "google/cloud/status_or.h"
#include <grpcpp/grpcpp.h>

namespace google {
Expand All @@ -25,9 +29,21 @@ namespace bigtable {
inline namespace BIGTABLE_CLIENT_NS {
namespace internal {

/// Create a pool of `grpc::Channel` objects based on the client options.
std::vector<std::shared_ptr<grpc::Channel>> CreateChannelPool(
std::string const& endpoint, bigtable::ClientOptions const& options);
/**
* Time after which we bail out waiting for a connection to become ready.
*
* This number was copied from the Java client and there doesn't seem to be a
* well-founded reason for it to be exactly this. It should not bee too large
* since waiting for a connection to become ready is not cancellable.
*/
constexpr std::chrono::seconds kConnectionReadyTimeout(10);

/**
* Schedule a chain of timers to refresh the connection.
*/
void ScheduleChannelRefresh(std::shared_ptr<CompletionQueue> const& cq,
std::shared_ptr<grpc::Channel> const& channel,
std::chrono::milliseconds max_conn_refresh_period);

/**
* Refactor implementation of `bigtable::{Data,Admin,InstanceAdmin}Client`.
Expand All @@ -54,7 +70,17 @@ class CommonClient {
//@}

explicit CommonClient(bigtable::ClientOptions options)
: options_(std::move(options)), current_index_(0) {}
: options_(std::move(options)),
current_index_(0),
background_threads_(
google::cloud::internal::DefaultBackgroundThreads(1)),
cq_(std::make_shared<CompletionQueue>(background_threads_->cq())) {}

~CommonClient() {
// This will stop the refresh of the channels.
channels_.clear();
background_threads_->cq().CancelAll();
}

/**
* Reset the channel and stub.
Expand Down Expand Up @@ -103,7 +129,7 @@ class CommonClient {
// introduce attributes in the implementation of CreateChannelPool() to
// create one socket per element in the pool.
lk.unlock();
auto channels = CreateChannelPool(Traits::Endpoint(options_), options_);
auto channels = CreateChannelPool();
std::vector<StubPtr> tmp;
std::transform(channels.begin(), channels.end(), std::back_inserter(tmp),
[](std::shared_ptr<grpc::Channel> ch) {
Expand All @@ -125,6 +151,30 @@ class CommonClient {
}
}

ChannelPtr CreateChannel(std::size_t idx) {
auto args = options_.channel_arguments();
if (!options_.connection_pool_name().empty()) {
args.SetString("cbt-c++/connection-pool-name",
options_.connection_pool_name());
}
args.SetInt("cbt-c++/connection-pool-id", static_cast<int>(idx));
auto res = grpc::CreateCustomChannel(Traits::Endpoint(options_),
options_.credentials(), args);
if (options_.max_conn_refresh_period().count() == 0) {
return res;
}
ScheduleChannelRefresh(cq_, res, options_.max_conn_refresh_period());
return res;
}

std::vector<std::shared_ptr<grpc::Channel>> CreateChannelPool() {
std::vector<std::shared_ptr<grpc::Channel>> result;
for (std::size_t i = 0; i != options_.connection_pool_size(); ++i) {
result.emplace_back(CreateChannel(i));
}
return result;
}

/// Get the current index for round-robin over connections.
std::size_t GetIndex() {
std::size_t current = current_index_++;
Expand All @@ -136,10 +186,19 @@ class CommonClient {
}

std::mutex mu_;
std::size_t num_pending_refreshes_{};
ClientOptions options_;
std::vector<ChannelPtr> channels_;
std::vector<StubPtr> stubs_;
std::size_t current_index_;
std::unique_ptr<BackgroundThreads> background_threads_;
// Timers, which we schedule for refreshes, need to reference the completion
// queue. We cannot make the completion queue's underlying implementation
// become owned solely by the operations scheduled on it (because we risk a
// deadlock). We solve both problems by holding only weak pointers to the
// completion queue in the operations scheduled on it. In order to do it, we
// need to hold one instance by a shared pointer.
std::shared_ptr<CompletionQueue> cq_;
};

} // namespace internal
Expand Down
64 changes: 64 additions & 0 deletions google/cloud/bigtable/tests/data_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,70 @@ TEST_F(DataIntegrationTest, TableApplyWithLogging) {
google::cloud::LogSink::Instance().RemoveBackend(id);
}

TEST(ConnectionRefresh, Disabled) {
auto client_options = bigtable::ClientOptions().set_max_conn_refresh_period(
std::chrono::seconds(0));
auto data_client = bigtable::CreateDefaultDataClient(
testing::TableTestEnvironment::project_id(),
testing::TableTestEnvironment::instance_id(), client_options);
// There doesn't seem to be a reliable way to check if we're really not
// refreshing the channels, so we're taking the dummy approach of waiting for
// a while and checking if none of a sample of channels entered the READY
// state (which is an indication that it wasn't refreshed).
//
// After the `CompletionQueue` argument is removed from the `Bigtable` API, we
// will have an option to provide a mock `CompletionQueue` to the `DataClient`
// for test purposes and verify that no timers are created, which will be a
// superior way to write this test.
std::this_thread::sleep_for(std::chrono::milliseconds(100));

for (std::size_t i = 0; i < client_options.connection_pool_size(); ++i) {
auto channel = data_client->Channel();
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false));
}
// Make sure things still work.
bigtable::Table table(data_client, testing::TableTestEnvironment::table_id());
std::string const row_key = "row-key-1";
std::vector<Cell> created{{row_key, kFamily4, "c0", 1000, "v1000"},
{row_key, kFamily4, "c1", 2000, "v2000"}};
Apply(table, row_key, created);
// After performing some operations, some of the channels should be in ready
// state.
auto check_if_some_channel_is_ready = [&] {
for (std::size_t i = 0; i < client_options.connection_pool_size(); ++i) {
if (data_client->Channel()->GetState(false) == GRPC_CHANNEL_READY) {
return true;
}
}
return false;
};
EXPECT_TRUE(check_if_some_channel_is_ready());
}

TEST(ConnectionRefresh, Frequent) {
auto data_client = bigtable::CreateDefaultDataClient(
testing::TableTestEnvironment::project_id(),
testing::TableTestEnvironment::instance_id(),
bigtable::ClientOptions().set_max_conn_refresh_period(
std::chrono::milliseconds(100)));

for (;;) {
if (data_client->Channel()->GetState(false) == GRPC_CHANNEL_READY) {
// We've found a channel which changed its state from IDLE to READY,
// which means that our refreshing mechanism works.
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// Make sure things still work.
bigtable::Table table(data_client, testing::TableTestEnvironment::table_id());
std::string const row_key = "row-key-1";
std::vector<Cell> created{{row_key, kFamily4, "c0", 1000, "v1000"},
{row_key, kFamily4, "c1", 2000, "v2000"}};
Apply(table, row_key, created);
}

} // namespace
} // namespace BIGTABLE_CLIENT_NS
} // namespace bigtable
Expand Down

0 comments on commit 21cbdeb

Please sign in to comment.