Skip to content

Commit

Permalink
[#10186] xCluster: Locality aware mapping for tables with different t…
Browse files Browse the repository at this point in the history
…ablet counts in xCluster

Summary:
Use the tablet start and end keys to find best mapping when count of tablets between producer and consumer is different. Consumer tablet with the most key range overlap will be picked. Added start and end key to ProducerTabletListPB. This is populated at initial CDC stream create. When producer or consumer tablets split, and we have key ranges in ProducerTabletListPB then it is used to construct the new mapping. If mapping was created on older build in which case keys wont be available, we will fall back to old round robin behavior.
Old code assumed same tablet count implied same key ranges. This need not be true, so has been fixed. ex: producer default 4 tablets, consumer default 3 tablets + consumer tablet split.

Test Plan:
Added new xcluster-tablet-split-itest XClusterTabletMapTest
Added new validations in xcluster-tablet-split-itest XClusterTabletSplitITest
Ensure these tests continue to work
client-test ClientTest.TestKeyRangeFiltering
All other xcluster-tablet-split-itest

Reviewers: nicolas, jhe, rahuldesirazu

Reviewed By: jhe, rahuldesirazu

Subscribers: slingam, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D17050
  • Loading branch information
hari90 committed May 27, 2022
1 parent b456277 commit a8f3801
Show file tree
Hide file tree
Showing 19 changed files with 687 additions and 268 deletions.
32 changes: 2 additions & 30 deletions ent/src/yb/integration-tests/twodc_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,38 +239,10 @@ Status TwoDCTestBase::DeleteUniverseReplication(
return Status::OK();
}

size_t TwoDCTestBase::NumProducerTabletsPolled(MiniCluster* cluster) {
size_t size = 0;
for (const auto& mini_tserver : cluster->mini_tablet_servers()) {
size_t new_size = 0;
auto* tserver = dynamic_cast<tserver::enterprise::TabletServer*>(mini_tserver->server());
CDCConsumer* cdc_consumer;
if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) {
auto tablets_running = cdc_consumer->TEST_producer_tablets_running();
new_size = tablets_running.size();
}
size += new_size;
}
return size;
}

Status TwoDCTestBase::CorrectlyPollingAllTablets(
MiniCluster* cluster, uint32_t num_producer_tablets) {
return LoggedWaitFor([this, cluster, num_producer_tablets]() -> Result<bool> {
static int i = 0;
constexpr int kNumIterationsWithCorrectResult = 5;
auto cur_tablets = NumProducerTabletsPolled(cluster);
if (cur_tablets == num_producer_tablets) {
if (i++ == kNumIterationsWithCorrectResult) {
i = 0;
return true;
}
} else {
i = 0;
}
LOG(INFO) << "Tablets being polled: " << cur_tablets;
return false;
}, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled");
return cdc::CorrectlyPollingAllTablets(
cluster, num_producer_tablets, MonoDelta::FromSeconds(kRpcTimeout));
}

Status TwoDCTestBase::WaitForSetupUniverseReplicationCleanUp(string producer_uuid) {
Expand Down
2 changes: 0 additions & 2 deletions ent/src/yb/integration-tests/twodc_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ class TwoDCTestBase : public YBTest {
Status DeleteUniverseReplication(
const std::string& universe_id, YBClient* client, MiniCluster* cluster);

size_t NumProducerTabletsPolled(MiniCluster* cluster);

Status CorrectlyPollingAllTablets(MiniCluster* cluster, uint32_t num_producer_tablets);

Status WaitForSetupUniverseReplicationCleanUp(string producer_uuid);
Expand Down
4 changes: 4 additions & 0 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class UniverseKeyRegistryPB;
namespace master {
namespace enterprise {

struct KeyRange;

YB_DEFINE_ENUM(CreateObjects, (kOnlyTables)(kOnlyIndexes));

class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorContext {
Expand Down Expand Up @@ -283,6 +285,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

TabletInfos GetTabletInfos(const std::vector<TabletId>& ids) override;

Result<std::map<std::string, KeyRange>> GetTableKeyRanges(const TableId& table_id);

Result<SysRowEntries> CollectEntries(
const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables,
CollectFlags flags);
Expand Down
96 changes: 44 additions & 52 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,27 @@ TabletInfos CatalogManager::GetTabletInfos(const std::vector<TabletId>& ids) {
return result;
}

Result<std::map<std::string, KeyRange>> CatalogManager::GetTableKeyRanges(const TableId& table_id) {
TableIdentifierPB table_identifier;
table_identifier.set_table_id(table_id);

auto table = VERIFY_RESULT(FindTable(table_identifier));
auto l = table->LockForRead();
RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l));

auto tablets = table->GetTablets();

std::map<std::string, KeyRange> result;
for (const scoped_refptr<TabletInfo>& tablet : tablets) {
auto tablet_lock = tablet->LockForRead();
const auto& partition = tablet_lock->pb.partition();
result[tablet->tablet_id()].start_key = partition.partition_key_start();
result[tablet->tablet_id()].end_key = partition.partition_key_end();
}

return result;
}

AsyncTabletSnapshotOpPtr CatalogManager::CreateAsyncTabletSnapshotOp(
const TabletInfoPtr& tablet, const std::string& snapshot_id,
tserver::TabletSnapshotOpRequestPB::Operation operation,
Expand Down Expand Up @@ -4142,11 +4163,11 @@ Status CatalogManager::UpdateXClusterConsumerOnTabletSplit(
// Check if this table is consuming a stream.
XClusterConsumerTableStreamInfoMap stream_infos =
GetXClusterStreamInfoForConsumerTable(consumer_table_id);

if (stream_infos.empty()) {
return Status::OK();
}

auto consumer_tablet_keys = VERIFY_RESULT(GetTableKeyRanges(consumer_table_id));
auto cluster_config = ClusterConfig();
auto l = cluster_config->LockForWrite();
for (const auto& stream_info : stream_infos) {
Expand All @@ -4167,11 +4188,10 @@ Status CatalogManager::UpdateXClusterConsumerOnTabletSplit(
universe_id,
stream_id);
}
DCHECK(stream_entry->consumer_table_id() == consumer_table_id);

RETURN_NOT_OK(UpdateTableMappingOnTabletSplit(stream_entry, split_tablet_ids));

// We also need to mark this stream as no longer being able to perform 1-1 mappings.
stream_entry->set_same_num_producer_consumer_tablets(false);
RETURN_NOT_OK(
UpdateTabletMappingOnConsumerSplit(consumer_tablet_keys, split_tablet_ids, stream_entry));
}

// Also bump the cluster_config_ version so that changes are propagated to tservers.
Expand Down Expand Up @@ -4259,19 +4279,13 @@ Status CatalogManager::InitCDCConsumer(
// Get the tablets in the consumer table.
cdc::ProducerEntryPB producer_entry;
for (const auto& stream_info : consumer_info) {
GetTableLocationsRequestPB consumer_table_req;
consumer_table_req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
GetTableLocationsResponsePB consumer_table_resp;
TableIdentifierPB table_identifer;
table_identifer.set_table_id(stream_info.consumer_table_id);
*(consumer_table_req.mutable_table()) = table_identifer;
RETURN_NOT_OK(GetTableLocations(&consumer_table_req, &consumer_table_resp));
auto consumer_tablet_keys = VERIFY_RESULT(GetTableKeyRanges(stream_info.consumer_table_id));

cdc::StreamEntryPB stream_entry;
// Get producer tablets and map them to the consumer tablets
RETURN_NOT_OK(CreateTabletMapping(
stream_info.producer_table_id, stream_info.consumer_table_id, producer_universe_uuid,
master_addrs, consumer_table_resp, &tserver_addrs, &stream_entry, cdc_rpc_tasks));
RETURN_NOT_OK(InitCDCStream(
stream_info.producer_table_id, stream_info.consumer_table_id, consumer_tablet_keys,
&tserver_addrs, &stream_entry, cdc_rpc_tasks));
(*producer_entry.mutable_stream_map())[stream_info.stream_id] = std::move(stream_entry);
}

Expand Down Expand Up @@ -5113,49 +5127,30 @@ Status CatalogManager::UpdateConsumerOnProducerSplit(
req->producer_id(), req->stream_id());
}

// Find the parent tablet in the tablet mapping.
bool found = false;
auto mutable_map = stream_entry->mutable_consumer_producer_tablet_map();
// Also keep track if we see the split children tablets.
vector<string> split_child_tablet_ids{req->producer_split_tablet_info().new_tablet1_id(),
req->producer_split_tablet_info().new_tablet2_id()};
for (auto& consumer_tablet_to_producer_tablets : *mutable_map) {
auto& producer_tablets_list = consumer_tablet_to_producer_tablets.second;
auto producer_tablets = producer_tablets_list.mutable_tablets();
for (auto tablet = producer_tablets->begin(); tablet < producer_tablets->end(); ++tablet) {
if (*tablet == req->producer_split_tablet_info().tablet_id()) {
// Remove the parent tablet id.
producer_tablets->erase(tablet);
// For now we add the children tablets to the same consumer tablet.
// See github issue #10186 for further improvements.
producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet1_id());
producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet2_id());
// There should only be one copy of each producer tablet per stream, so can exit early.
found = true;
break;
}
// Check if this is one of the child split tablets.
auto it = std::find(split_child_tablet_ids.begin(), split_child_tablet_ids.end(), *tablet);
if (it != split_child_tablet_ids.end()) {
split_child_tablet_ids.erase(it);
}
}
if (found) {
break;
}
}
SplitTabletIds split_tablet_id{
.source = req->producer_split_tablet_info().tablet_id(),
.children = {
req->producer_split_tablet_info().new_tablet1_id(),
req->producer_split_tablet_info().new_tablet2_id()}};

if (!found) {
auto split_key = req->producer_split_tablet_info().split_partition_key();
auto consumer_tablet_keys = VERIFY_RESULT(GetTableKeyRanges(stream_entry->consumer_table_id()));
bool found_source = false, found_all_split_children = false;
RETURN_NOT_OK(UpdateTabletMappingOnProducerSplit(
consumer_tablet_keys, split_tablet_id, split_key, &found_source, &found_all_split_children,
stream_entry));

if (!found_source) {
// Did not find the source tablet, but did find the children - means that we have already
// processed this SPLIT_OP, so for idempotency, we can return OK.
if (split_child_tablet_ids.empty()) {
if (found_all_split_children) {
LOG(INFO) << "Already processed this tablet split: " << req->DebugString();
return Status::OK();
}

// When there are sequential SPLIT_OPs, we may try to reprocess an older SPLIT_OP. However, if
// one or both of those children have also already been split and processed, then we'll end up
// here (!found && !split_child_tablet_ids.empty()).
// here (!found_source && !found_all_split_childs).
// This is alright, we can log a warning, and then continue (to not block later records).
LOG(WARNING)
<< "Unable to find matching source tablet " << req->producer_split_tablet_info().tablet_id()
Expand All @@ -5164,9 +5159,6 @@ Status CatalogManager::UpdateConsumerOnProducerSplit(
return Status::OK();
}

// Also make sure that we switch off of 1-1 mapping optimizations.
stream_entry->set_same_num_producer_consumer_tablets(false);

// Also bump the cluster_config_ version so that changes are propagated to tservers (and new
// pollers are created for the new tablets).
l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
Expand Down
Loading

0 comments on commit a8f3801

Please sign in to comment.