Skip to content

Commit

Permalink
[#10611][xCluster] Slow SetupReplication requests when using too many…
Browse files Browse the repository at this point in the history
… tables

Summary:
Currently, setup_replication times out when using too many tables.  The workaround is to
batch into multiple entries with ~10 tables.  When investigating, found that we were serially
creating a YBClient object, per table, that required 5 sec to startup.  Modified to use a cached
object and added 2 basic unit tests to run the setup_replication command with 100 tables and verify
a reasonable runtime.

Test Plan:
TwoDCTestParams/TwoDCTest.BootstrapAndSetupLargeTableCount/0 -n 10
TwoDCTestParams/TwoDCTest.SetupUniverseReplicationLargeTableCount/0 -n 10

Reviewers: jhe, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D15256
  • Loading branch information
nspiegelberg committed Feb 16, 2022
1 parent 912efee commit 718e14b
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 20 deletions.
196 changes: 196 additions & 0 deletions ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ DECLARE_int32(async_replication_idle_delay_ms);
DECLARE_int32(async_replication_max_idle_wait);
DECLARE_int32(external_intent_cleanup_secs);
DECLARE_int32(yb_num_shards_per_tserver);
DECLARE_uint64(TEST_yb_inbound_big_calls_parse_delay_ms);
DECLARE_int64(rpc_throttle_threshold_bytes);
DECLARE_bool(enable_automatic_tablet_splitting);

namespace yb {

Expand Down Expand Up @@ -638,6 +641,199 @@ TEST_P(TwoDCTest, SetupUniverseReplicationMultipleTables) {
Destroy();
}

TEST_P(TwoDCTest, SetupUniverseReplicationLargeTableCount) {
if (IsSanitizer()) {
LOG(INFO) << "Skipping slow test";
return;
}

// Setup the two clusters without any tables.
auto tables = ASSERT_RESULT(SetUpWithParams({}, {}, 1));
FLAGS_enable_automatic_tablet_splitting = false;

// Create a large number of tables to test the performance of setup_replication.
int table_count = 2;
int amplification[2] = {1, 5};
MonoDelta setup_latency[2];
std::string table_prefix = "stress_table_";
bool passed_test = false;

for (int retries = 0; retries < 3 && !passed_test; ++retries) {
for (int a : {0, 1}) {
std::vector<std::shared_ptr<client::YBTable>> producer_tables;
for (int i = 0; i < table_count * amplification[a]; i++) {
std::string cur_table =
table_prefix + std::to_string(amplification[a]) + "-" + std::to_string(i);
ASSERT_RESULT(CreateTable(consumer_client(), kNamespaceName, cur_table, 3));
auto t = ASSERT_RESULT(CreateTable(producer_client(), kNamespaceName, cur_table, 3));
std::shared_ptr<client::YBTable> producer_table;
ASSERT_OK(producer_client()->OpenTable(t, &producer_table));
producer_tables.push_back(producer_table);
}

// Add delays to all rpc calls to simulate live environment and ensure the test is IO bound.
FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 200;
FLAGS_rpc_throttle_threshold_bytes = 200;

auto start_time = CoarseMonoClock::Now();

// Setup universe replication on all tables.
ASSERT_OK(SetupUniverseReplication(
producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));

// Verify that universe was setup on consumer.
master::GetUniverseReplicationResponsePB resp;
ASSERT_OK(
VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
ASSERT_EQ(resp.entry().producer_id(), kUniverseId);
ASSERT_EQ(resp.entry().tables_size(), producer_tables.size());
for (uint32_t i = 0; i < producer_tables.size(); i++) {
ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id());
}

setup_latency[a] = CoarseMonoClock::Now() - start_time;
LOG(INFO) << "SetupReplication [" << a << "] took: " << setup_latency[a].ToSeconds() << "s";

// Remove delays for cleanup and next setup.
FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 0;

ASSERT_OK(DeleteUniverseReplication(kUniverseId));
}

// We increased our table count by 5x, but we shouldn't have a linear latency increase.
passed_test = (setup_latency[1] < setup_latency[0] * 3);
}

ASSERT_TRUE(passed_test);

Destroy();
}

TEST_P(TwoDCTest, BootstrapAndSetupLargeTableCount) {
if (IsSanitizer()) {
LOG(INFO) << "Skipping slow test";
return;
}

// Setup the two clusters without any tables.
auto tables = ASSERT_RESULT(SetUpWithParams({}, {}, 1));
FLAGS_enable_automatic_tablet_splitting = false;

// Create a medium, then large number of tables to test the performance of our CLI commands.
int table_count = 2;
int amplification[2] = {1, 5};
MonoDelta bootstrap_latency[2];
MonoDelta setup_latency[2];
std::string table_prefix = "stress_table_";
bool passed_test = false;

for (int retries = 0; retries < 3 && !passed_test; ++retries) {
for (int a : {0, 1}) {
std::vector<std::shared_ptr<client::YBTable>> producer_tables;
for (int i = 0; i < table_count * amplification[a]; i++) {
std::string cur_table =
table_prefix + std::to_string(amplification[a]) + "-" + std::to_string(i);
ASSERT_RESULT(CreateTable(consumer_client(), kNamespaceName, cur_table, 3));
auto t = ASSERT_RESULT(CreateTable(producer_client(), kNamespaceName, cur_table, 3));
std::shared_ptr<client::YBTable> producer_table;
ASSERT_OK(producer_client()->OpenTable(t, &producer_table));
producer_tables.push_back(producer_table);
}

// Add delays to all rpc calls to simulate live environment and ensure the test is IO bound.
FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 200;
FLAGS_rpc_throttle_threshold_bytes = 200;

// Performance test of BootstrapProducer.
cdc::BootstrapProducerResponsePB boot_resp;
{
cdc::BootstrapProducerRequestPB req;

for (const auto& producer_table : producer_tables) {
req.add_table_ids(producer_table->id());
}

auto start_time = CoarseMonoClock::Now();

auto producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(
&producer_client()->proxy_cache(),
HostPort::FromBoundEndpoint(
producer_cluster()->mini_tablet_server(0)->bound_rpc_addr()));
rpc::RpcController rpc;
ASSERT_OK(producer_cdc_proxy->BootstrapProducer(req, &boot_resp, &rpc));
ASSERT_FALSE(boot_resp.has_error());
ASSERT_EQ(boot_resp.cdc_bootstrap_ids().size(), producer_tables.size());

bootstrap_latency[a] = CoarseMonoClock::Now() - start_time;
LOG(INFO) << "BootstrapProducer [" << a << "] took: " << bootstrap_latency[a].ToSeconds()
<< "s";
}

// Performance test of SetupReplication, with Bootstrap IDs.
{
auto start_time = CoarseMonoClock::Now();

// Calling the SetupUniverse API directly so we can use producer_bootstrap_ids.
master::SetupUniverseReplicationRequestPB req;
master::SetupUniverseReplicationResponsePB resp;
req.set_producer_id(kUniverseId);
auto master_addrs = producer_cluster()->GetMasterAddresses();
auto vec = ASSERT_RESULT(HostPort::ParseStrings(master_addrs, 0));
HostPortsToPBs(vec, req.mutable_producer_master_addresses());
for (const auto& table : producer_tables) {
req.add_producer_table_ids(table->id());
}
for (const auto& bootstrap_id : boot_resp.cdc_bootstrap_ids()) {
req.add_producer_bootstrap_ids(bootstrap_id);
}

auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client()->proxy_cache(),
ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
if (!master_proxy->SetupUniverseReplication(req, &resp, &rpc).ok()) {
return false;
}
if (resp.has_error()) {
return false;
}
return true;
},
MonoDelta::FromSeconds(30), "Setup universe replication"));

// Verify that universe was setup on consumer.
{
master::GetUniverseReplicationResponsePB resp;
ASSERT_OK(
VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
ASSERT_EQ(resp.entry().producer_id(), kUniverseId);
ASSERT_EQ(resp.entry().tables_size(), producer_tables.size());
for (uint32_t i = 0; i < producer_tables.size(); i++) {
ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id());
}
}

setup_latency[a] = CoarseMonoClock::Now() - start_time;
LOG(INFO) << "SetupReplication [" << a << "] took: " << setup_latency[a].ToSeconds() << "s";
}

// Remove delays for cleanup and next setup.
FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 0;

ASSERT_OK(DeleteUniverseReplication(kUniverseId));
}
// We increased our table count by 5x, but we shouldn't have a linear latency increase.
// ASSERT_LT(bootstrap_latency[1], bootstrap_latency[0] * 5);
passed_test = (setup_latency[1] < setup_latency[0] * 3);
}
ASSERT_TRUE(passed_test);

Destroy();
}

TEST_P(TwoDCTest, PollWithConsumerRestart) {
// Avoid long delays with node failures so we can run with more aggressive test timing
FLAGS_replication_failure_delay_exponent = 7; // 2^7 == 128ms
Expand Down
3 changes: 2 additions & 1 deletion ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

CHECKED_STATUS InitCDCConsumer(const std::vector<CDCConsumerStreamInfo>& consumer_info,
const std::string& master_addrs,
const std::string& producer_universe_uuid);
const std::string& producer_universe_uuid,
std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks);

void HandleCreateTabletSnapshotResponse(TabletInfo *tablet, bool error) override;

Expand Down
39 changes: 24 additions & 15 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3539,10 +3539,8 @@ void CatalogManager::GetCDCStreamCallback(
}
new_entry.set_state(master::SysCDCStreamEntryPB::ACTIVE);

auto s = cdc_rpc->client()->UpdateCDCStream(bootstrap_id, new_entry);
if (!s.ok()) {
LOG(WARNING) << "Unable to update CDC stream options " << s;
}
WARN_NOT_OK(cdc_rpc->client()->UpdateCDCStream(bootstrap_id, new_entry),
"Unable to update CDC stream options");
});
}
}
Expand Down Expand Up @@ -3601,18 +3599,27 @@ void CatalogManager::AddCDCStreamToUniverseAndInitConsumer(
std::vector<HostPort> hp;
HostPortsFromPBs(l->pb.producer_master_addresses(), &hp);

Status s = InitCDCConsumer(consumer_info, HostPort::ToCommaSeparatedString(hp),
l->pb.producer_id());
if (!s.ok()) {
LOG(ERROR) << "Error registering subscriber: " << s;
auto cdc_rpc_tasks_result =
universe->GetOrCreateCDCRpcTasks(l->pb.producer_master_addresses());
if (!cdc_rpc_tasks_result.ok()) {
LOG(WARNING) << "CDC streams won't be created: " << cdc_rpc_tasks_result;
l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
} else {
GStringPiece original_producer_id(universe->id());
if (original_producer_id.ends_with(".ALTER")) {
// Don't enable ALTER universes, merge them into the main universe instead.
merge_alter = true;
auto cdc_rpc_tasks = *cdc_rpc_tasks_result;
Status s = InitCDCConsumer(
consumer_info, HostPort::ToCommaSeparatedString(hp), l->pb.producer_id(),
cdc_rpc_tasks);
if (!s.ok()) {
LOG(ERROR) << "Error registering subscriber: " << s;
l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED);
} else {
l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE);
GStringPiece original_producer_id(universe->id());
if (original_producer_id.ends_with(".ALTER")) {
// Don't enable ALTER universes, merge them into the main universe instead.
merge_alter = true;
} else {
l.mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE);
}
}
}
}
Expand Down Expand Up @@ -3737,7 +3744,8 @@ Status CatalogManager::UpdateXClusterProducerOnTabletSplit(
Status CatalogManager::InitCDCConsumer(
const std::vector<CDCConsumerStreamInfo>& consumer_info,
const std::string& master_addrs,
const std::string& producer_universe_uuid) {
const std::string& producer_universe_uuid,
std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks) {

std::unordered_set<HostPort, HostPortHash> tserver_addrs;
// Get the tablets in the consumer table.
Expand All @@ -3750,11 +3758,12 @@ Status CatalogManager::InitCDCConsumer(
table_identifer.set_table_id(stream_info.consumer_table_id);
*(consumer_table_req.mutable_table()) = table_identifer;
RETURN_NOT_OK(GetTableLocations(&consumer_table_req, &consumer_table_resp));

cdc::StreamEntryPB stream_entry;
// Get producer tablets and map them to the consumer tablets
RETURN_NOT_OK(CreateTabletMapping(
stream_info.producer_table_id, stream_info.consumer_table_id, producer_universe_uuid,
master_addrs, consumer_table_resp, &tserver_addrs, &stream_entry));
master_addrs, consumer_table_resp, &tserver_addrs, &stream_entry, cdc_rpc_tasks));
(*producer_entry.mutable_stream_map())[stream_info.stream_id] = std::move(stream_entry);
}

Expand Down
6 changes: 3 additions & 3 deletions ent/src/yb/master/cdc_consumer_registry_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ Status CreateTabletMapping(
const std::string& producer_master_addrs,
const GetTableLocationsResponsePB& consumer_tablets_resp,
std::unordered_set<HostPort, HostPortHash>* tserver_addrs,
cdc::StreamEntryPB* stream_entry) {
cdc::StreamEntryPB* stream_entry,
std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks) {

// Get the tablets in the producer table.
auto cdc_rpc_tasks = VERIFY_RESULT(CDCRpcTasks::CreateWithMasterAddrs(
producer_id, producer_master_addrs));
auto producer_table_locations =
VERIFY_RESULT(cdc_rpc_tasks->GetTableLocations(producer_table_id));

Expand Down
3 changes: 2 additions & 1 deletion ent/src/yb/master/cdc_consumer_registry_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Status CreateTabletMapping(
const std::string& producer_master_addrs,
const GetTableLocationsResponsePB& consumer_tablets_resp,
std::unordered_set<HostPort, HostPortHash>* tserver_addrs,
cdc::StreamEntryPB* stream_entry);
cdc::StreamEntryPB* stream_entry,
std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks);

// After split_tablet_ids.source splits, remove its entry and replace it with its children tablets.
Status UpdateTableMappingOnTabletSplit(
Expand Down
1 change: 1 addition & 0 deletions ent/src/yb/master/cdc_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace master {

Result<std::shared_ptr<CDCRpcTasks>> CDCRpcTasks::CreateWithMasterAddrs(
const std::string& universe_id, const std::string& master_addrs) {
// NOTE: This is currently an expensive call (5+ sec). Encountered during Task #10611.
auto cdc_rpc_tasks = std::make_shared<CDCRpcTasks>();
std::string dir;

Expand Down

0 comments on commit 718e14b

Please sign in to comment.