From a8f3801cba43e49bd0a98675a69f553b0edd96df Mon Sep 17 00:00:00 2001 From: Hari Krishna Sunder Date: Fri, 27 May 2022 08:09:23 -0700 Subject: [PATCH] [#10186] xCluster: Locality aware mapping for tables with different tablet counts in xCluster Summary: Use the tablet start and end keys to find best mapping when count of tablets between producer and consumer is different. Consumer tablet with the most key range overlap will be picked. Added start and end key to ProducerTabletListPB. This is populated at initial CDC stream create. When producer or consumer tablets split, and we have key ranges in ProducerTabletListPB then it is used to construct the new mapping. If mapping was created on older build in which case keys wont be available, we will fall back to old round robin behavior. Old code assumed same tablet count implied same key ranges. This need not be true, so has been fixed. ex: producer default 4 tablets, consumer default 3 tablets + consumer tablet split. Test Plan: Added new xcluster-tablet-split-itest XClusterTabletMapTest Added new validations in xcluster-tablet-split-itest XClusterTabletSplitITest Ensure these tests continue to work client-test ClientTest.TestKeyRangeFiltering All other xcluster-tablet-split-itest Reviewers: nicolas, jhe, rahuldesirazu Reviewed By: jhe, rahuldesirazu Subscribers: slingam, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D17050 --- .../yb/integration-tests/twodc_test_base.cc | 32 +- .../yb/integration-tests/twodc_test_base.h | 2 - ent/src/yb/master/catalog_manager.h | 4 + ent/src/yb/master/catalog_manager_ent.cc | 96 +++-- .../master/cdc_consumer_registry_service.cc | 356 +++++++++++++----- .../yb/master/cdc_consumer_registry_service.h | 32 +- ent/src/yb/tserver/cdc_consumer.cc | 10 +- ent/src/yb/tserver/cdc_consumer.h | 5 +- ent/src/yb/tserver/twodc_output_client.cc | 33 +- src/yb/cdc/cdc_consumer.proto | 6 +- src/yb/client/client_utils.cc | 12 +- src/yb/common/partition.cc | 32 ++ src/yb/common/partition.h | 10 + src/yb/integration-tests/cdc_test_util.cc | 37 ++ src/yb/integration-tests/cdc_test_util.h | 2 + src/yb/integration-tests/mini_cluster.h | 1 - .../xcluster-tablet-split-itest.cc | 187 ++++++++- src/yb/master/catalog_manager.cc | 72 ++-- src/yb/master/catalog_manager_util.h | 26 ++ 19 files changed, 687 insertions(+), 268 deletions(-) diff --git a/ent/src/yb/integration-tests/twodc_test_base.cc b/ent/src/yb/integration-tests/twodc_test_base.cc index 6d7f545b9a02..1494e8fefa48 100644 --- a/ent/src/yb/integration-tests/twodc_test_base.cc +++ b/ent/src/yb/integration-tests/twodc_test_base.cc @@ -239,38 +239,10 @@ Status TwoDCTestBase::DeleteUniverseReplication( return Status::OK(); } -size_t TwoDCTestBase::NumProducerTabletsPolled(MiniCluster* cluster) { - size_t size = 0; - for (const auto& mini_tserver : cluster->mini_tablet_servers()) { - size_t new_size = 0; - auto* tserver = dynamic_cast(mini_tserver->server()); - CDCConsumer* cdc_consumer; - if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) { - auto tablets_running = cdc_consumer->TEST_producer_tablets_running(); - new_size = tablets_running.size(); - } - size += new_size; - } - return size; -} - Status TwoDCTestBase::CorrectlyPollingAllTablets( MiniCluster* cluster, uint32_t num_producer_tablets) { - return LoggedWaitFor([this, cluster, num_producer_tablets]() -> Result { - static int i = 0; - constexpr int kNumIterationsWithCorrectResult = 5; - auto cur_tablets = NumProducerTabletsPolled(cluster); - if (cur_tablets == num_producer_tablets) { - if (i++ == kNumIterationsWithCorrectResult) { - i = 0; - return true; - } - } else { - i = 0; - } - LOG(INFO) << "Tablets being polled: " << cur_tablets; - return false; - }, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled"); + return cdc::CorrectlyPollingAllTablets( + cluster, num_producer_tablets, MonoDelta::FromSeconds(kRpcTimeout)); } Status TwoDCTestBase::WaitForSetupUniverseReplicationCleanUp(string producer_uuid) { diff --git a/ent/src/yb/integration-tests/twodc_test_base.h b/ent/src/yb/integration-tests/twodc_test_base.h index 8b87996a8903..bcfc12c960fe 100644 --- a/ent/src/yb/integration-tests/twodc_test_base.h +++ b/ent/src/yb/integration-tests/twodc_test_base.h @@ -117,8 +117,6 @@ class TwoDCTestBase : public YBTest { Status DeleteUniverseReplication( const std::string& universe_id, YBClient* client, MiniCluster* cluster); - size_t NumProducerTabletsPolled(MiniCluster* cluster); - Status CorrectlyPollingAllTablets(MiniCluster* cluster, uint32_t num_producer_tablets); Status WaitForSetupUniverseReplicationCleanUp(string producer_uuid); diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index 6d99dee2c647..3be1d74a4d0d 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -25,6 +25,8 @@ class UniverseKeyRegistryPB; namespace master { namespace enterprise { +struct KeyRange; + YB_DEFINE_ENUM(CreateObjects, (kOnlyTables)(kOnlyIndexes)); class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorContext { @@ -283,6 +285,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon TabletInfos GetTabletInfos(const std::vector& ids) override; + Result> GetTableKeyRanges(const TableId& table_id); + Result CollectEntries( const google::protobuf::RepeatedPtrField& tables, CollectFlags flags); diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index cd22ca013495..7b13e00b89d4 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -1924,6 +1924,27 @@ TabletInfos CatalogManager::GetTabletInfos(const std::vector& ids) { return result; } +Result> CatalogManager::GetTableKeyRanges(const TableId& table_id) { + TableIdentifierPB table_identifier; + table_identifier.set_table_id(table_id); + + auto table = VERIFY_RESULT(FindTable(table_identifier)); + auto l = table->LockForRead(); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l)); + + auto tablets = table->GetTablets(); + + std::map result; + for (const scoped_refptr& tablet : tablets) { + auto tablet_lock = tablet->LockForRead(); + const auto& partition = tablet_lock->pb.partition(); + result[tablet->tablet_id()].start_key = partition.partition_key_start(); + result[tablet->tablet_id()].end_key = partition.partition_key_end(); + } + + return result; +} + AsyncTabletSnapshotOpPtr CatalogManager::CreateAsyncTabletSnapshotOp( const TabletInfoPtr& tablet, const std::string& snapshot_id, tserver::TabletSnapshotOpRequestPB::Operation operation, @@ -4142,11 +4163,11 @@ Status CatalogManager::UpdateXClusterConsumerOnTabletSplit( // Check if this table is consuming a stream. XClusterConsumerTableStreamInfoMap stream_infos = GetXClusterStreamInfoForConsumerTable(consumer_table_id); - if (stream_infos.empty()) { return Status::OK(); } + auto consumer_tablet_keys = VERIFY_RESULT(GetTableKeyRanges(consumer_table_id)); auto cluster_config = ClusterConfig(); auto l = cluster_config->LockForWrite(); for (const auto& stream_info : stream_infos) { @@ -4167,11 +4188,10 @@ Status CatalogManager::UpdateXClusterConsumerOnTabletSplit( universe_id, stream_id); } + DCHECK(stream_entry->consumer_table_id() == consumer_table_id); - RETURN_NOT_OK(UpdateTableMappingOnTabletSplit(stream_entry, split_tablet_ids)); - - // We also need to mark this stream as no longer being able to perform 1-1 mappings. - stream_entry->set_same_num_producer_consumer_tablets(false); + RETURN_NOT_OK( + UpdateTabletMappingOnConsumerSplit(consumer_tablet_keys, split_tablet_ids, stream_entry)); } // Also bump the cluster_config_ version so that changes are propagated to tservers. @@ -4259,19 +4279,13 @@ Status CatalogManager::InitCDCConsumer( // Get the tablets in the consumer table. cdc::ProducerEntryPB producer_entry; for (const auto& stream_info : consumer_info) { - GetTableLocationsRequestPB consumer_table_req; - consumer_table_req.set_max_returned_locations(std::numeric_limits::max()); - GetTableLocationsResponsePB consumer_table_resp; - TableIdentifierPB table_identifer; - table_identifer.set_table_id(stream_info.consumer_table_id); - *(consumer_table_req.mutable_table()) = table_identifer; - RETURN_NOT_OK(GetTableLocations(&consumer_table_req, &consumer_table_resp)); + auto consumer_tablet_keys = VERIFY_RESULT(GetTableKeyRanges(stream_info.consumer_table_id)); cdc::StreamEntryPB stream_entry; // Get producer tablets and map them to the consumer tablets - RETURN_NOT_OK(CreateTabletMapping( - stream_info.producer_table_id, stream_info.consumer_table_id, producer_universe_uuid, - master_addrs, consumer_table_resp, &tserver_addrs, &stream_entry, cdc_rpc_tasks)); + RETURN_NOT_OK(InitCDCStream( + stream_info.producer_table_id, stream_info.consumer_table_id, consumer_tablet_keys, + &tserver_addrs, &stream_entry, cdc_rpc_tasks)); (*producer_entry.mutable_stream_map())[stream_info.stream_id] = std::move(stream_entry); } @@ -5113,49 +5127,30 @@ Status CatalogManager::UpdateConsumerOnProducerSplit( req->producer_id(), req->stream_id()); } - // Find the parent tablet in the tablet mapping. - bool found = false; - auto mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); - // Also keep track if we see the split children tablets. - vector split_child_tablet_ids{req->producer_split_tablet_info().new_tablet1_id(), - req->producer_split_tablet_info().new_tablet2_id()}; - for (auto& consumer_tablet_to_producer_tablets : *mutable_map) { - auto& producer_tablets_list = consumer_tablet_to_producer_tablets.second; - auto producer_tablets = producer_tablets_list.mutable_tablets(); - for (auto tablet = producer_tablets->begin(); tablet < producer_tablets->end(); ++tablet) { - if (*tablet == req->producer_split_tablet_info().tablet_id()) { - // Remove the parent tablet id. - producer_tablets->erase(tablet); - // For now we add the children tablets to the same consumer tablet. - // See github issue #10186 for further improvements. - producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet1_id()); - producer_tablets_list.add_tablets(req->producer_split_tablet_info().new_tablet2_id()); - // There should only be one copy of each producer tablet per stream, so can exit early. - found = true; - break; - } - // Check if this is one of the child split tablets. - auto it = std::find(split_child_tablet_ids.begin(), split_child_tablet_ids.end(), *tablet); - if (it != split_child_tablet_ids.end()) { - split_child_tablet_ids.erase(it); - } - } - if (found) { - break; - } - } + SplitTabletIds split_tablet_id{ + .source = req->producer_split_tablet_info().tablet_id(), + .children = { + req->producer_split_tablet_info().new_tablet1_id(), + req->producer_split_tablet_info().new_tablet2_id()}}; - if (!found) { + auto split_key = req->producer_split_tablet_info().split_partition_key(); + auto consumer_tablet_keys = VERIFY_RESULT(GetTableKeyRanges(stream_entry->consumer_table_id())); + bool found_source = false, found_all_split_children = false; + RETURN_NOT_OK(UpdateTabletMappingOnProducerSplit( + consumer_tablet_keys, split_tablet_id, split_key, &found_source, &found_all_split_children, + stream_entry)); + + if (!found_source) { // Did not find the source tablet, but did find the children - means that we have already // processed this SPLIT_OP, so for idempotency, we can return OK. - if (split_child_tablet_ids.empty()) { + if (found_all_split_children) { LOG(INFO) << "Already processed this tablet split: " << req->DebugString(); return Status::OK(); } // When there are sequential SPLIT_OPs, we may try to reprocess an older SPLIT_OP. However, if // one or both of those children have also already been split and processed, then we'll end up - // here (!found && !split_child_tablet_ids.empty()). + // here (!found_source && !found_all_split_childs). // This is alright, we can log a warning, and then continue (to not block later records). LOG(WARNING) << "Unable to find matching source tablet " << req->producer_split_tablet_info().tablet_id() @@ -5164,9 +5159,6 @@ Status CatalogManager::UpdateConsumerOnProducerSplit( return Status::OK(); } - // Also make sure that we switch off of 1-1 mapping optimizations. - stream_entry->set_same_num_producer_consumer_tablets(false); - // Also bump the cluster_config_ version so that changes are propagated to tservers (and new // pollers are created for the new tablets). l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1); diff --git a/ent/src/yb/master/cdc_consumer_registry_service.cc b/ent/src/yb/master/cdc_consumer_registry_service.cc index 68fe5f70e349..17adf921a4b5 100644 --- a/ent/src/yb/master/cdc_consumer_registry_service.cc +++ b/ent/src/yb/master/cdc_consumer_registry_service.cc @@ -23,6 +23,7 @@ #include "yb/client/client.h" #include "yb/client/yb_table_name.h" #include "yb/common/wire_protocol.h" +#include "yb/common/partition.h" #include "yb/util/random_util.h" #include "yb/util/result.h" @@ -33,92 +34,211 @@ namespace master { namespace enterprise { std::map GetPartitionStartKeyConsumerTabletMapping( - const GetTableLocationsResponsePB& consumer_tablets_resp) { + const std::map& consumer_tablet_keys) { std::map partitions_map; - for (const auto& tablet_location : consumer_tablets_resp.tablet_locations()) { - partitions_map[tablet_location.partition().partition_key_start()] = tablet_location.tablet_id(); + for (const auto& tablets : consumer_tablet_keys) { + partitions_map[tablets.second.start_key] = tablets.first; } return partitions_map; } -Status CreateTabletMapping( +std::map GetTabletKeys( + const google::protobuf::RepeatedPtrField& tablet_locations) { + std::map tablet_keys; + for (const auto& tablets : tablet_locations) { + tablet_keys[tablets.tablet_id()].start_key = tablets.partition().partition_key_start(); + tablet_keys[tablets.tablet_id()].end_key = tablets.partition().partition_key_end(); + } + return tablet_keys; +} + +bool GetProducerTabletKeys( + const ::google::protobuf::Map& mutable_map, + std::map* tablet_keys) { + for (const auto& mapping : mutable_map) { + auto tablet_info = mapping.second; + if (tablet_info.tablets_size() != tablet_info.start_key_size() || + tablet_info.tablets_size() != tablet_info.end_key_size()) { + // Looks like the mapping was created without key info. We need to fallback to old round robin + // based mapping. + DCHECK(tablet_info.start_key_size() == 0 && tablet_info.end_key_size() == 0); + return false; + } + + for (int i = 0; i < tablet_info.tablets_size(); i++) { + const auto& producer_tablet = tablet_info.tablets(i); + (*tablet_keys)[producer_tablet].start_key = tablet_info.start_key(i); + (*tablet_keys)[producer_tablet].end_key = tablet_info.end_key(i); + } + } + return true; +} + +void PopulateTsAddresses( + const TabletLocationsPB& producer, std::unordered_set* tserver_addrs) { + // For external CDC Consumers, populate the list of TServers they can connect to as proxies. + for (const auto& replica : producer.replicas()) { + // Use the public IP addresses since we're cross-universe + for (const auto& addr : replica.ts_info().broadcast_addresses()) { + tserver_addrs->insert(HostPortFromPB(addr)); + } + // Rarely a viable setup for production replication, but used in testing... + if (replica.ts_info().broadcast_addresses_size() == 0) { + LOG(WARNING) << "No public broadcast addresses found for " + << replica.ts_info().permanent_uuid() << ". Using private addresses instead."; + for (const auto& addr : replica.ts_info().private_rpc_addresses()) { + tserver_addrs->insert(HostPortFromPB(addr)); + } + } + } +} + +// We can optimize if we have the same tablet count in the producer and consumer table by +// mapping key ranges to each other. Due to tablet splitting it may be possible that the keys dont +// match even when the count of range is the same. Return true if a mapping was possible. +bool TryCreateOptimizedTabletMapping( + const std::map& producer_tablet_keys, + const std::map& consumer_partitions_map, + google::protobuf::Map<::std::string, ::yb::cdc::ProducerTabletListPB>* mutable_map) { + if (consumer_partitions_map.size() != producer_tablet_keys.size()) { + return false; + } + + for (const auto& producer : producer_tablet_keys) { + auto producer_key_range = producer.second; + const auto& it = consumer_partitions_map.find(producer_key_range.start_key); + if (it == consumer_partitions_map.end()) { + mutable_map->clear(); + return false; + } + + (*mutable_map)[it->second].add_tablets(producer.first); + (*mutable_map)[it->second].add_start_key(producer_key_range.start_key); + (*mutable_map)[it->second].add_end_key(producer_key_range.end_key); + } + + return true; +} + +Status ComputeTabletMapping( + const std::map& producer_tablet_keys, + const std::map& consumer_tablet_keys, + cdc::StreamEntryPB* stream_entry) { + stream_entry->set_local_tserver_optimized(false); + auto mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); + mutable_map->clear(); + auto consumer_partitions_map = GetPartitionStartKeyConsumerTabletMapping(consumer_tablet_keys); + if (TryCreateOptimizedTabletMapping(producer_tablet_keys, consumer_partitions_map, mutable_map)) { + stream_entry->set_local_tserver_optimized(true); + return Status::OK(); + } + + // Map tablets based on max key range overlap. + for (const auto& producer : producer_tablet_keys) { + const auto& producer_tablet_id = producer.first; + auto producer_key_range = producer.second; + auto producer_key_range_size = PartitionSchema::GetPartitionRangeSize( + producer_key_range.start_key, producer_key_range.end_key); + std::string consumer_tablet_id; + uint32_t max_overlap = 0; + + for (const auto& consumer_tablet : consumer_tablet_keys) { + auto consumer_key_range = consumer_tablet.second; + auto overlap = PartitionSchema::GetOverlap( + producer_key_range.start_key, producer_key_range.end_key, consumer_key_range.start_key, + consumer_key_range.end_key); + if (overlap > max_overlap) { + consumer_tablet_id = consumer_tablet.first; + max_overlap = overlap; + if (overlap >= producer_key_range_size / 2) { + // We have majority overlap. Break as we cannot do better than this. + break; + } + } + } + + if (consumer_tablet_id.empty()) { + auto s = STATUS_SUBSTITUTE( + IllegalState, + "Could not find any consumer tablets with overlapping key range for producer tablet $0, " + "partition_key_start: $1 and partition_key_end: $2", + producer_tablet_id, + Slice(producer_key_range.start_key).ToDebugHexString(), + Slice(producer_key_range.end_key).ToDebugHexString()); + DLOG(FATAL) << s; + return s; + } + + (*mutable_map)[consumer_tablet_id].add_tablets(producer_tablet_id); + (*mutable_map)[consumer_tablet_id].add_start_key(producer_key_range.start_key); + (*mutable_map)[consumer_tablet_id].add_end_key(producer_key_range.end_key); + } + + return Status::OK(); +} + +Status InitCDCStream( const std::string& producer_table_id, const std::string& consumer_table_id, - const std::string& producer_id, - const std::string& producer_master_addrs, - const GetTableLocationsResponsePB& consumer_tablets_resp, + const std::map& consumer_tablet_keys, std::unordered_set* tserver_addrs, cdc::StreamEntryPB* stream_entry, - std::shared_ptr cdc_rpc_tasks) { - + std::shared_ptr + cdc_rpc_tasks) { // Get the tablets in the producer table. auto producer_table_locations = VERIFY_RESULT(cdc_rpc_tasks->GetTableLocations(producer_table_id)); - auto consumer_tablets_size = consumer_tablets_resp.tablet_locations_size(); - auto partitions_map = GetPartitionStartKeyConsumerTabletMapping(consumer_tablets_resp); stream_entry->set_consumer_table_id(consumer_table_id); stream_entry->set_producer_table_id(producer_table_id); - auto* mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); - bool same_tablet_count = consumer_tablets_size == producer_table_locations.size(); - LOG(INFO) << Format("For producer table id $0 and consumer table id $1, same num tablets: $2", - producer_table_id, consumer_table_id, same_tablet_count); - stream_entry->set_same_num_producer_consumer_tablets(same_tablet_count); + + auto producer_tablet_keys = GetTabletKeys(producer_table_locations); + RETURN_NOT_OK(ComputeTabletMapping(producer_tablet_keys, consumer_tablet_keys, stream_entry)); + + LOG(INFO) << Format( + "For producer table id $0 and consumer table id $1, same num tablets: $2", producer_table_id, + consumer_table_id, stream_entry->local_tserver_optimized()); + // Create the mapping between consumer and producer tablets. for (int i = 0; i < producer_table_locations.size(); i++) { - const auto& producer = producer_table_locations.Get(i).tablet_id(); - std::string consumer; - if (same_tablet_count) { - // We can optimize if we have the same tablet count in the producer and consumer table by - // mapping key ranges to each other. - const auto& it = - partitions_map.find(producer_table_locations.Get(i).partition().partition_key_start()); - if (it == partitions_map.end()) { - return STATUS_SUBSTITUTE( - IllegalState, "When producer and consumer tablet counts are the same, could not find " - "matching keyrange for tablet $0", producer); - } - consumer = it->second; - } else { - consumer = consumer_tablets_resp.tablet_locations(i % consumer_tablets_size).tablet_id(); - } + const auto& producer = producer_table_locations.Get(i); + PopulateTsAddresses(producer, tserver_addrs); + } - cdc::ProducerTabletListPB producer_tablets; - auto it = mutable_map->find(consumer); - if (it != mutable_map->end()) { - producer_tablets = it->second; - } - *producer_tablets.add_tablets() = producer; - (*mutable_map)[consumer] = producer_tablets; + return Status::OK(); +} - // For external CDC Consumers, populate the list of TServers they can connect to as proxies. - for (const auto& replica : producer_table_locations.Get(i).replicas()) { - // Use the public IP addresses since we're cross-universe - for (const auto& addr : replica.ts_info().broadcast_addresses()) { - tserver_addrs->insert(HostPortFromPB(addr)); - } - // Rarely a viable setup for production replication, but used in testing... - if (replica.ts_info().broadcast_addresses_size() == 0) { - LOG(WARNING) << "No public broadcast addresses found for " - << replica.ts_info().permanent_uuid() - << ". Using private addresses instead."; - for (const auto& addr : replica.ts_info().private_rpc_addresses()) { - tserver_addrs->insert(HostPortFromPB(addr)); - } - } - } +Result TryComputeOverlapBasedMapping( + const std::map& consumer_tablet_keys, cdc::StreamEntryPB* stream_entry) { + std::map producer_tablet_keys; + + // See if we stored the producer tablet keys. + if (!GetProducerTabletKeys(stream_entry->consumer_producer_tablet_map(), &producer_tablet_keys)) { + return false; } - return Status::OK(); + + RETURN_NOT_OK(ComputeTabletMapping(producer_tablet_keys, consumer_tablet_keys, stream_entry)); + return true; } -Status UpdateTableMappingOnTabletSplit( - cdc::StreamEntryPB* stream_entry, - const SplitTabletIds& split_tablet_ids) { +Status UpdateTabletMappingOnConsumerSplit( + const std::map& consumer_tablet_keys, + const SplitTabletIds& split_tablet_ids, cdc::StreamEntryPB* stream_entry) { auto* mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); + // We should have more consumer tablets than before. Request has to be idempotent so equality is + // allowed. + DCHECK_GE(consumer_tablet_keys.size(), mutable_map->size()); + + // Try to perform a overlap based mapping. + if (VERIFY_RESULT(TryComputeOverlapBasedMapping(consumer_tablet_keys, stream_entry))) { + return Status::OK(); + } + + // Looks like we dont have producer key ranges. Just distribute the producer tablets between both + // children. auto producer_tablets = (*mutable_map)[split_tablet_ids.source]; + DCHECK(producer_tablets.start_key_size() == 0 && producer_tablets.end_key_size() == 0); mutable_map->erase(split_tablet_ids.source); - // TODO introduce a better mapping of tablets to improve locality (GH #10186). - // For now we just distribute the producer tablets between both children. for (int i = 0; i < producer_tablets.tablets().size(); ++i) { if (i % 2) { *(*mutable_map)[split_tablet_ids.children.first].add_tablets() = producer_tablets.tablets(i); @@ -126,39 +246,101 @@ Status UpdateTableMappingOnTabletSplit( *(*mutable_map)[split_tablet_ids.children.second].add_tablets() = producer_tablets.tablets(i); } } + + stream_entry->set_local_tserver_optimized(false); return Status::OK(); } -Result> TEST_GetConsumerProducerTableMap( - const std::string& producer_master_addrs, - const ListTablesResponsePB& resp) { +Status UpdateTabletMappingOnProducerSplit( + const std::map& consumer_tablet_keys, + const SplitTabletIds& split_tablet_ids, + const string& split_key, + bool* found_source, + bool* found_all_split_childs, + cdc::StreamEntryPB* stream_entry) { + // Find the parent tablet in the tablet mapping. + *found_source = false; + *found_all_split_childs = false; + auto mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); + // Also keep track if we see the split children tablets. + vector split_child_tablet_ids{ + split_tablet_ids.children.first, split_tablet_ids.children.second}; + for (auto& consumer_tablet_to_producer_tablets : *mutable_map) { + auto& producer_tablet_infos = consumer_tablet_to_producer_tablets.second; + bool has_key_range = + producer_tablet_infos.start_key_size() == producer_tablet_infos.tablets_size() && + producer_tablet_infos.end_key_size() == producer_tablet_infos.tablets_size(); + auto producer_tablets = producer_tablet_infos.mutable_tablets(); + for (int i = 0; i < producer_tablets->size(); i++) { + auto& tablet = producer_tablets->Get(i); + if (tablet == split_tablet_ids.source) { + // Remove the parent tablet id. + producer_tablets->DeleteSubrange(i, 1); + // For now we add the children tablets to the same consumer tablet. + // ReComputeTabletMapping will optimize this. + producer_tablet_infos.add_tablets(split_tablet_ids.children.first); + producer_tablet_infos.add_tablets(split_tablet_ids.children.second); - auto cdc_rpc_tasks = VERIFY_RESULT(CDCRpcTasks::CreateWithMasterAddrs( - "" /* producer_id */, producer_master_addrs)); - auto producer_tables = VERIFY_RESULT(cdc_rpc_tasks->ListTables()); + if (has_key_range) { + auto old_start_key = producer_tablet_infos.start_key(i); + auto old_end_key = producer_tablet_infos.end_key(i); - std::unordered_map consumer_tables_map; - for (const auto& table_info : resp.tables()) { - const auto& table_name_str = Format("$0:$1", table_info.namespace_().name(), table_info.name()); - consumer_tables_map[table_name_str] = table_info.id(); - } + RETURN_NOT_OK_PREPEND( + PartitionSchema::IsValidHashPartitionRange(old_start_key, split_key), + Format("Producer tablet $0 does not contain split key", split_tablet_ids.source)); + RETURN_NOT_OK_PREPEND( + PartitionSchema::IsValidHashPartitionRange(split_key, old_end_key), + Format("Producer tablet $0 does not contain split key", split_tablet_ids.source)); + + // Remove old keys and add the new ones. + producer_tablet_infos.mutable_start_key()->DeleteSubrange(i, 1); + producer_tablet_infos.mutable_end_key()->DeleteSubrange(i, 1); + producer_tablet_infos.add_start_key(old_start_key); + producer_tablet_infos.add_end_key(split_key); + producer_tablet_infos.add_start_key(split_key); + producer_tablet_infos.add_end_key(old_end_key); + } else { + DCHECK( + producer_tablet_infos.start_key_size() == 0 && + producer_tablet_infos.end_key_size() == 0); + } + // There should only be one copy of each producer tablet per stream, so can exit + // early. + *found_source = true; + break; + } - std::vector consumer_producer_list; - for (const auto& table : producer_tables) { - // TODO(Rahul): Fix this for YSQL workload testing. - if (!master::IsSystemNamespace(table.second.namespace_name())) { - const auto& table_name_str = - Format("$0:$1", table.second.namespace_name(), table.second.table_name()); - CDCConsumerStreamInfo stream_info; - stream_info.stream_id = RandomHumanReadableString(16); - stream_info.producer_table_id = table.first; - stream_info.consumer_table_id = consumer_tables_map[table_name_str]; - consumer_producer_list.push_back(std::move(stream_info)); + // Check if this is one of the child split tablets. + auto it = std::find(split_child_tablet_ids.begin(), split_child_tablet_ids.end(), tablet); + if (it != split_child_tablet_ids.end()) { + split_child_tablet_ids.erase(it); + } + } + if (*found_source) { + break; } } - return consumer_producer_list; + + if (!*found_source) { + // Did not find the source tablet - means that we have already processed this SPLIT_OP, so for + // idempotent, we can return OK. + *found_all_split_childs = split_child_tablet_ids.empty(); + return Status::OK(); + } + + // Try to perform a better overlap based mapping. + if (VERIFY_RESULT(TryComputeOverlapBasedMapping(consumer_tablet_keys, stream_entry))) { + return Status::OK(); + } + + // Stream was created without producer tablet key info. We will leave the children on the same + // consumer. + // Also make sure that we switch off of 1-1 mapping optimizations. + stream_entry->set_local_tserver_optimized(false); + + return Status::OK(); } -} // namespace enterprise -} // namespace master -} // namespace yb +} // namespace enterprise +} // namespace master +} // namespace yb diff --git a/ent/src/yb/master/cdc_consumer_registry_service.h b/ent/src/yb/master/cdc_consumer_registry_service.h index 68a1938cbda3..d344d5a71a82 100644 --- a/ent/src/yb/master/cdc_consumer_registry_service.h +++ b/ent/src/yb/master/cdc_consumer_registry_service.h @@ -36,31 +36,37 @@ class GetTableLocationsResponsePB; namespace enterprise { +struct KeyRange { + std::string start_key; + std::string end_key; +}; + struct CDCConsumerStreamInfo { std::string stream_id; std::string consumer_table_id; std::string producer_table_id; }; -Status CreateTabletMapping( +Status InitCDCStream( const std::string& producer_table_id, const std::string& consumer_table_id, - const std::string& producer_id, - const std::string& producer_master_addrs, - const GetTableLocationsResponsePB& consumer_tablets_resp, + const std::map& consumer_tablet_keys, std::unordered_set* tserver_addrs, cdc::StreamEntryPB* stream_entry, std::shared_ptr cdc_rpc_tasks); -// After split_tablet_ids.source splits, remove its entry and replace it with its children tablets. -Status UpdateTableMappingOnTabletSplit( - cdc::StreamEntryPB* stream_entry, - const SplitTabletIds& split_tablet_ids); - -Result> TEST_GetConsumerProducerTableMap( - const std::string& producer_master_addrs, - const ListTablesResponsePB& resp); - +Status UpdateTabletMappingOnConsumerSplit( + const std::map& consumer_tablet_keys, + const SplitTabletIds& split_tablet_ids, + cdc::StreamEntryPB* stream_entry); + +Status UpdateTabletMappingOnProducerSplit( + const std::map& consumer_tablet_keys, + const SplitTabletIds& split_tablet_ids, + const string& split_key, + bool* found_source, + bool* found_all_split_childs, + cdc::StreamEntryPB* stream_entry); } // namespace enterprise } // namespace master } // namespace yb diff --git a/ent/src/yb/tserver/cdc_consumer.cc b/ent/src/yb/tserver/cdc_consumer.cc index c173d8731052..816746182535 100644 --- a/ent/src/yb/tserver/cdc_consumer.cc +++ b/ent/src/yb/tserver/cdc_consumer.cc @@ -207,7 +207,7 @@ void CDCConsumer::UpdateInMemoryState(const cdc::ConsumerRegistryPB* consumer_re LOG_WITH_PREFIX(INFO) << "Updating CDC consumer registry: " << consumer_registry->DebugString(); - streams_with_same_num_producer_consumer_tablets_.clear(); + streams_with_local_tserver_optimization_.clear(); for (const auto& producer_map : DCHECK_NOTNULL(consumer_registry)->producer_map()) { const auto& producer_entry_pb = producer_map.second; if (producer_entry_pb.disable_stream()) { @@ -228,10 +228,10 @@ void CDCConsumer::UpdateInMemoryState(const cdc::ConsumerRegistryPB* consumer_re // recreate the set of CDCPollers for (const auto& stream_entry : producer_entry_pb.stream_map()) { const auto& stream_entry_pb = stream_entry.second; - if (stream_entry_pb.same_num_producer_consumer_tablets()) { + if (stream_entry_pb.local_tserver_optimized()) { LOG_WITH_PREFIX(INFO) << Format("Stream $0 will use local tserver optimization", stream_entry.first); - streams_with_same_num_producer_consumer_tablets_.insert(stream_entry.first); + streams_with_local_tserver_optimization_.insert(stream_entry.first); } for (const auto& tablet_entry : stream_entry_pb.consumer_producer_tablet_map()) { const auto& consumer_tablet_id = tablet_entry.first; @@ -326,8 +326,8 @@ void CDCConsumer::TriggerPollForNewTablets() { // now create the poller bool use_local_tserver = - streams_with_same_num_producer_consumer_tablets_.find(entry.first.stream_id) != - streams_with_same_num_producer_consumer_tablets_.end(); + streams_with_local_tserver_optimization_.find(entry.first.stream_id) != + streams_with_local_tserver_optimization_.end(); auto cdc_poller = std::make_shared( entry.first, entry.second, std::bind(&CDCConsumer::ShouldContinuePolling, this, entry.first, entry.second), diff --git a/ent/src/yb/tserver/cdc_consumer.h b/ent/src/yb/tserver/cdc_consumer.h index e56d1136197f..25999dedad9a 100644 --- a/ent/src/yb/tserver/cdc_consumer.h +++ b/ent/src/yb/tserver/cdc_consumer.h @@ -14,6 +14,7 @@ #ifndef ENT_SRC_YB_TSERVER_CDC_CONSUMER_H #define ENT_SRC_YB_TSERVER_CDC_CONSUMER_H +#include #include #include @@ -132,8 +133,8 @@ class CDCConsumer { cdc::ProducerTabletInfo::Hash> producer_consumer_tablet_map_from_master_ GUARDED_BY(master_data_mutex_); - std::unordered_set streams_with_same_num_producer_consumer_tablets_ - GUARDED_BY(master_data_mutex_); + std::unordered_set streams_with_local_tserver_optimization_ + GUARDED_BY(master_data_mutex_); scoped_refptr run_trigger_poll_thread_; diff --git a/ent/src/yb/tserver/twodc_output_client.cc b/ent/src/yb/tserver/twodc_output_client.cc index fba05e8516f6..58dbb1286070 100644 --- a/ent/src/yb/tserver/twodc_output_client.cc +++ b/ent/src/yb/tserver/twodc_output_client.cc @@ -92,14 +92,12 @@ class TwoDCOutputClient : public cdc::CDCOutputClient { Status ProcessChangesStartingFromIndex(int start); Status ProcessRecordForTablet( - const int record_idx, const Result& tablet); + const cdc::CDCRecordPB& record, const Result& tablet); - Status ProcessRecordForLocalTablet(const int record_idx); + Status ProcessRecordForLocalTablet(const cdc::CDCRecordPB& record); Status ProcessRecordForTabletRange( - const int record_idx, - const std::string partition_key_start, - const std::string partition_key_end, + const cdc::CDCRecordPB& record, const Result>& tablets); Result ProcessSplitOp(const cdc::CDCRecordPB& record); @@ -236,18 +234,16 @@ Status TwoDCOutputClient::ProcessChangesStartingFromIndex(int start) { } if (UseLocalTserver()) { - RETURN_NOT_OK(ProcessRecordForLocalTablet(i)); + RETURN_NOT_OK(ProcessRecordForLocalTablet(record)); } else { if (record.operation() == cdc::CDCRecordPB::APPLY) { - RETURN_NOT_OK(ProcessRecordForTabletRange( - i, record.partition().partition_key_start(), - record.partition().partition_key_end(), all_tablets_result_)); + RETURN_NOT_OK(ProcessRecordForTabletRange(record, all_tablets_result_)); } else { auto partition_hash_key = PartitionSchema::EncodeMultiColumnHashValue( VERIFY_RESULT(CheckedStoInt(record.key(0).key()))); auto tablet_result = local_client_->client->LookupTabletByKeyFuture( table_, partition_hash_key, CoarseMonoClock::now() + timeout_ms_).get(); - RETURN_NOT_OK(ProcessRecordForTablet(i, tablet_result)); + RETURN_NOT_OK(ProcessRecordForTablet(record, tablet_result)); } } processed_write_record = true; @@ -289,21 +285,18 @@ Status TwoDCOutputClient::ProcessRecord(const std::vector& tablet_i } Status TwoDCOutputClient::ProcessRecordForTablet( - const int record_idx, - const Result& tablet) { + const cdc::CDCRecordPB& record, const Result& tablet) { RETURN_NOT_OK(tablet); - return ProcessRecord({tablet->get()->tablet_id()}, twodc_resp_copy_.records(record_idx)); + return ProcessRecord({tablet->get()->tablet_id()}, record); } Status TwoDCOutputClient::ProcessRecordForTabletRange( - const int record_idx, - const std::string partition_key_start, - const std::string partition_key_end, + const cdc::CDCRecordPB& record, const Result>& tablets) { RETURN_NOT_OK(tablets); auto filtered_tablets_result = client::FilterTabletsByHashPartitionKeyRange( - *tablets, partition_key_start, partition_key_end); + *tablets, record.partition().partition_key_start(), record.partition().partition_key_end()); RETURN_NOT_OK(filtered_tablets_result); auto filtered_tablets = *filtered_tablets_result; @@ -316,11 +309,11 @@ Status TwoDCOutputClient::ProcessRecordForTabletRange( [&](const auto& tablet_ptr) { return tablet_ptr->tablet_id(); }); - return ProcessRecord(tablet_ids, twodc_resp_copy_.records(record_idx)); + return ProcessRecord(tablet_ids, record); } -Status TwoDCOutputClient::ProcessRecordForLocalTablet(const int record_idx) { - return ProcessRecord({consumer_tablet_info_.tablet_id}, twodc_resp_copy_.records(record_idx)); +Status TwoDCOutputClient::ProcessRecordForLocalTablet(const cdc::CDCRecordPB& record) { + return ProcessRecord({consumer_tablet_info_.tablet_id}, record); } Result TwoDCOutputClient::ProcessSplitOp(const cdc::CDCRecordPB& record) { diff --git a/src/yb/cdc/cdc_consumer.proto b/src/yb/cdc/cdc_consumer.proto index cc8e45f290db..c6abb62826c1 100644 --- a/src/yb/cdc/cdc_consumer.proto +++ b/src/yb/cdc/cdc_consumer.proto @@ -38,8 +38,10 @@ option java_package = "org.yb.cdc"; import "yb/common/common_net.proto"; message ProducerTabletListPB { - // List of tablet ids for a given producer table. + // List of tablet ids, start and end keys for a given producer table. repeated string tablets = 1; + repeated bytes start_key = 2; + repeated bytes end_key = 3; } message StreamEntryPB { @@ -47,7 +49,7 @@ message StreamEntryPB { map consumer_producer_tablet_map = 1; string consumer_table_id = 2; string producer_table_id = 3; - bool same_num_producer_consumer_tablets = 4; + bool local_tserver_optimized = 4; } message ProducerEntryPB { diff --git a/src/yb/client/client_utils.cc b/src/yb/client/client_utils.cc index 461511d7c58b..7b0ad1519ad9 100644 --- a/src/yb/client/client_utils.cc +++ b/src/yb/client/client_utils.cc @@ -80,14 +80,10 @@ Result> FilterTabletsByHashPartitionKeyRa partition_key_end)); std::vector filtered_results; for (const auto& remote_tablet : all_tablets) { - auto tablet_partition_start = remote_tablet->partition().partition_key_start(); - auto tablet_partition_end = remote_tablet->partition().partition_key_end(); - // Is this tablet at the start - bool start_condition = partition_key_start.empty() || tablet_partition_end.empty() || - tablet_partition_end > partition_key_start; - bool end_condition = partition_key_end.empty() || tablet_partition_start < partition_key_end; - - if (start_condition && end_condition) { + if (PartitionSchema::GetOverlap( + remote_tablet->partition().partition_key_start(), + remote_tablet->partition().partition_key_end(), partition_key_start, + partition_key_end)) { filtered_results.push_back(remote_tablet); } } diff --git a/src/yb/common/partition.cc b/src/yb/common/partition.cc index a63d756f4f5d..01085722bb49 100644 --- a/src/yb/common/partition.cc +++ b/src/yb/common/partition.cc @@ -475,6 +475,38 @@ bool PartitionSchema::IsValidHashPartitionKeyBound(const string& partition_key) return partition_key.empty() || partition_key.size() == kPartitionKeySize; } +uint32_t PartitionSchema::GetOverlap( + const std::string& key_start, + const std::string& key_end, + const std::string& other_key_start, + const std::string& other_key_end) { + uint16_t first_start_val = + key_start.empty() ? 0 : PartitionSchema::DecodeMultiColumnHashValue(key_start); + uint16_t second_start_val = + other_key_start.empty() ? 0 : PartitionSchema::DecodeMultiColumnHashValue(other_key_start); + uint16_t first_end_val = key_end.empty() + ? std::numeric_limits::max() + : PartitionSchema::DecodeMultiColumnHashValue(key_end) - 1; + uint16_t second_end_val = other_key_end.empty() + ? std::numeric_limits::max() + : PartitionSchema::DecodeMultiColumnHashValue(other_key_end) - 1; + + // Use uint32 as max Overlap is uint16_t max + 1 + uint32_t start_key = max(first_start_val, second_start_val); + uint32_t end_key = min(first_end_val, second_end_val); + + if (end_key >= start_key) { + return end_key - start_key + 1; + } + + return 0; +} + +uint32_t PartitionSchema::GetPartitionRangeSize( + const std::string& key_start, const std::string& key_end) { + return GetOverlap(key_start, key_end, key_start, key_end); +} + Status PartitionSchema::CreateRangePartitions(std::vector* partitions) const { // Create the start range keys. // NOTE: When converting FromPB to partition schema, we already error-check, so we don't need diff --git a/src/yb/common/partition.h b/src/yb/common/partition.h index 57fbdd2318f7..936b79bcdb7e 100644 --- a/src/yb/common/partition.h +++ b/src/yb/common/partition.h @@ -252,6 +252,16 @@ class PartitionSchema { static bool IsValidHashPartitionKeyBound(const std::string& partition_key); + // Get the overlap between two key ranges. + static uint32_t GetOverlap( + const std::string& key_start, + const std::string& key_end, + const std::string& other_key_start, + const std::string& other_key_end); + + // Get the Partition range size. + static uint32_t GetPartitionRangeSize(const std::string& key_start, const std::string& key_end); + template static void ProcessHashKeyEntry(const T* value_pb, std::string* out) { if (value_pb) { diff --git a/src/yb/integration-tests/cdc_test_util.cc b/src/yb/integration-tests/cdc_test_util.cc index f6e59bfeb895..d6c549b2accb 100644 --- a/src/yb/integration-tests/cdc_test_util.cc +++ b/src/yb/integration-tests/cdc_test_util.cc @@ -22,6 +22,7 @@ #include "yb/tablet/tablet_metadata.h" #include "yb/tablet/tablet_peer.h" +#include "yb/tserver/cdc_consumer.h" #include "yb/tserver/mini_tablet_server.h" #include "yb/tserver/tablet_server.h" #include "yb/tserver/ts_tablet_manager.h" @@ -98,5 +99,41 @@ void VerifyWalRetentionTime(MiniCluster* cluster, ASSERT_GT(ntablets_checked, 0); } +size_t NumProducerTabletsPolled(MiniCluster* cluster) { + size_t size = 0; + for (const auto& mini_tserver : cluster->mini_tablet_servers()) { + size_t new_size = 0; + auto* tserver = dynamic_cast(mini_tserver->server()); + tserver::enterprise::CDCConsumer* cdc_consumer; + if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) { + auto tablets_running = cdc_consumer->TEST_producer_tablets_running(); + new_size = tablets_running.size(); + } + size += new_size; + } + return size; +} + +Status CorrectlyPollingAllTablets( + MiniCluster* cluster, uint32_t num_producer_tablets, MonoDelta timeout) { + return LoggedWaitFor( + [&]() -> Result { + static int i = 0; + constexpr int kNumIterationsWithCorrectResult = 5; + auto cur_tablets = NumProducerTabletsPolled(cluster); + if (cur_tablets == num_producer_tablets) { + if (i++ == kNumIterationsWithCorrectResult) { + i = 0; + return true; + } + } else { + i = 0; + } + LOG(INFO) << "Tablets being polled: " << cur_tablets; + return false; + }, + timeout, "Num producer tablets being polled"); +} + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdc_test_util.h b/src/yb/integration-tests/cdc_test_util.h index 5a2c82ecdb17..43b71a1f2ba6 100644 --- a/src/yb/integration-tests/cdc_test_util.h +++ b/src/yb/integration-tests/cdc_test_util.h @@ -37,6 +37,8 @@ void VerifyWalRetentionTime(yb::MiniCluster* cluster, const std::string& table_name_start, uint32_t expected_wal_retention_secs); +Status CorrectlyPollingAllTablets( + MiniCluster* cluster, uint32_t num_producer_tablets, MonoDelta timeout); } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/mini_cluster.h b/src/yb/integration-tests/mini_cluster.h index 8fe8113aaed1..3e99606cd03f 100644 --- a/src/yb/integration-tests/mini_cluster.h +++ b/src/yb/integration-tests/mini_cluster.h @@ -364,7 +364,6 @@ void SetCompactFlushRateLimitBytesPerSec(MiniCluster* cluster, size_t bytes_per_ Status WaitAllReplicasSynchronizedWithLeader( MiniCluster* cluster, CoarseTimePoint deadline); - } // namespace yb #endif /* YB_INTEGRATION_TESTS_MINI_CLUSTER_H_ */ diff --git a/src/yb/integration-tests/xcluster-tablet-split-itest.cc b/src/yb/integration-tests/xcluster-tablet-split-itest.cc index bd13b9498ca3..2955a5ca9c0a 100644 --- a/src/yb/integration-tests/xcluster-tablet-split-itest.cc +++ b/src/yb/integration-tests/xcluster-tablet-split-itest.cc @@ -11,15 +11,18 @@ // under the License. // +#include #include "yb/cdc/cdc_service.proxy.h" #include "yb/client/client_fwd.h" #include "yb/client/session.h" #include "yb/client/table.h" #include "yb/client/yb_table_name.h" +#include "yb/common/partition.h" #include "yb/integration-tests/cdc_test_util.h" #include "yb/integration-tests/tablet-split-itest-base.h" +#include "yb/master/catalog_entity_info.h" #include "yb/master/master_client.pb.h" #include "yb/master/master_ddl.proxy.h" #include "yb/master/master_defaults.h" @@ -46,6 +49,10 @@ DECLARE_int64(tablet_force_split_threshold_bytes); DECLARE_int64(db_write_buffer_size); namespace yb { +using client::kv_table_test::Partitioning; +using master::GetTableLocationsRequestPB; +using master::GetTableLocationsResponsePB; +using master::TableIdentifierPB; class CdcTabletSplitITest : public TabletSplitITest { public: @@ -90,7 +97,7 @@ class CdcTabletSplitITest : public TabletSplitITest { // Create an identical table on the new cluster. client::kv_table_test::CreateTable( client::Transactional(GetIsolationLevel() != IsolationLevel::NON_TRANSACTIONAL), - 1, // num_tablets + NumTablets(), // num_tablets cluster_client.get(), table); return cluster; @@ -167,16 +174,28 @@ class XClusterTabletSplitITest : public CdcTabletSplitITest { consumer_cluster_ = ASSERT_RESULT(CreateNewUniverseAndTable("consumer", &consumer_table_)); consumer_client_ = ASSERT_RESULT(consumer_cluster_->CreateClient()); + SetupReplication(); + } + + void SetupReplication() { + SwitchToProducer(); ASSERT_OK(tools::RunAdminToolCommand( consumer_cluster_->GetMasterAddresses(), "setup_universe_replication", kProducerClusterId, cluster_->GetMasterAddresses(), table_->id())); } + void DeleteReplication() { + SwitchToProducer(); + ASSERT_OK(tools::RunAdminToolCommand( + consumer_cluster_->GetMasterAddresses(), "delete_universe_replication", + kProducerClusterId)); + } + protected: void DoBeforeTearDown() override { + DeleteReplication(); + SwitchToConsumer(); - ASSERT_OK(tools::RunAdminToolCommand( - cluster_->GetMasterAddresses(), "delete_universe_replication", kProducerClusterId)); // Since delete_universe_replication is async, wait until consumers are empty before shutdown. // TODO: remove this once #12068 is fixed. @@ -270,6 +289,86 @@ class XClusterTabletSplitITest : public CdcTabletSplitITest { return WaitForTabletSplitCompletion(expected_non_split_tablets, expected_split_tablets); } + auto GetTabletLocations() { + GetTableLocationsRequestPB consumer_table_req; + consumer_table_req.set_max_returned_locations(std::numeric_limits::max()); + GetTableLocationsResponsePB consumer_table_resp; + TableIdentifierPB table_identifer; + table_identifer.set_table_id(table_->id()); + *(consumer_table_req.mutable_table()) = table_identifer; + + auto catalog_mgr = EXPECT_RESULT(catalog_manager()); + EXPECT_OK(catalog_mgr->GetTableLocations(&consumer_table_req, &consumer_table_resp)); + return consumer_table_resp.tablet_locations(); + } + + auto GetConsumerMap() { + master::SysClusterConfigEntryPB cluster_info; + auto& cm = EXPECT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); + EXPECT_OK(cm.GetClusterConfig(&cluster_info)); + auto producer_map = cluster_info.mutable_consumer_registry()->mutable_producer_map(); + auto it = producer_map->find(kProducerClusterId); + EXPECT_NE(it, producer_map->end()); + EXPECT_EQ(it->second.stream_map().size(), 1); + return it->second.stream_map().begin()->second.consumer_producer_tablet_map(); + } + + void ValidateOverlap() { + const auto timeout = MonoDelta::FromSeconds(60 * kTimeMultiplier); + SwitchToProducer(); + auto producer_tablet_locations = GetTabletLocations(); + int producer_tablet_count = producer_tablet_locations.size(); + + SwitchToConsumer(); + ASSERT_OK(cdc::CorrectlyPollingAllTablets(cluster_.get(), producer_tablet_count, timeout)); + auto consumer_tablet_locations = GetTabletLocations(); + int consumer_tablet_count = consumer_tablet_locations.size(); + + auto tablet_map = GetConsumerMap(); + LOG(INFO) << "Consumer Map: \n"; + for (const auto& elem : tablet_map) { + std::vector start_keys, end_keys; + std::transform( + elem.second.start_key().begin(), elem.second.start_key().end(), + std::back_inserter(start_keys), + [](std::string s) -> string { return Slice(s).ToDebugHexString(); }); + std::transform( + elem.second.end_key().begin(), elem.second.end_key().end(), std::back_inserter(end_keys), + [](std::string s) -> string { return Slice(s).ToDebugHexString(); }); + + LOG(INFO) << elem.first << ", [" << boost::algorithm::join(elem.second.tablets(), ",") + << "], [" << boost::algorithm::join(start_keys, ",") << "], [" + << boost::algorithm::join(end_keys, ",") << "]\n"; + } + ASSERT_LE(tablet_map.size(), min(producer_tablet_count, consumer_tablet_count)); + + int producer_tablets = 0; + for (auto& mapping : tablet_map) { + auto consumer_tablet = std::find_if( + consumer_tablet_locations.begin(), consumer_tablet_locations.end(), + [&](const auto& tablet) { return tablet.tablet_id() == mapping.first; }); + ASSERT_NE(consumer_tablet, consumer_tablet_locations.end()); + + for (auto& mapped_producer_tablet : mapping.second.tablets()) { + producer_tablets++; + auto producer_tablet = std::find_if( + producer_tablet_locations.begin(), producer_tablet_locations.end(), + [&](const auto& tablet) { return tablet.tablet_id() == mapped_producer_tablet; }); + ASSERT_NE(producer_tablet, producer_tablet_locations.end()); + + ASSERT_GT( + PartitionSchema::GetOverlap( + consumer_tablet->partition().partition_key_start(), + consumer_tablet->partition().partition_key_end(), + producer_tablet->partition().partition_key_start(), + producer_tablet->partition().partition_key_end()), + 0); + } + } + + ASSERT_EQ(producer_tablets, producer_tablet_count); + } + // Only one set of these is valid at any time. // The other cluster is accessible via cluster_ / client_ / table_. std::unique_ptr consumer_cluster_; @@ -283,6 +382,67 @@ class XClusterTabletSplitITest : public CdcTabletSplitITest { const string kProducerClusterId = "producer"; }; +class xClusterTabletMapTest : public XClusterTabletSplitITest, + public testing::WithParamInterface { + public: + void SetUp() override {} + + void RunSetUp(int producer_tablet_count, int consumer_tablet_count) { + FLAGS_cdc_state_table_num_tablets = 1; + TabletSplitITest::SetUp(); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_validate_all_tablet_candidates) = false; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_xcluster_replicated_tables) = true; + + SetNumTablets(producer_tablet_count); + Schema schema; + BuildSchema(GetParam(), &schema); + schema.mutable_table_properties()->SetTransactional( + GetIsolationLevel() != IsolationLevel::NON_TRANSACTIONAL); + ASSERT_OK(client::kv_table_test::CreateTable(schema, NumTablets(), client_.get(), &table_)); + + SetNumTablets(consumer_tablet_count); + // Also create the consumer cluster. + // First create the new cluster. + MiniClusterOptions opts; + opts.num_tablet_servers = 3; + opts.cluster_id = "consumer"; + consumer_cluster_ = std::make_unique(opts); + ASSERT_OK(consumer_cluster_->Start()); + ASSERT_OK(consumer_cluster_->WaitForTabletServerCount(3)); + consumer_client_ = ASSERT_RESULT(consumer_cluster_->CreateClient()); + + // Create an identical table on the new cluster. + ASSERT_OK(client::kv_table_test::CreateTable( + schema, + NumTablets(), // num_tablets + consumer_client_.get(), + &consumer_table_)); + + SetupReplication(); + ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows, 1)); + } +}; + +TEST_P(xClusterTabletMapTest, SingleTableCountMapTest) { + RunSetUp(1, 1); + ValidateOverlap(); +} + +TEST_P(xClusterTabletMapTest, SameTableCountMapTest) { + RunSetUp(4, 4); + ValidateOverlap(); +} + +TEST_P(xClusterTabletMapTest, MoreProducerTablets) { + RunSetUp(8, 2); + ValidateOverlap(); +} + +TEST_P(xClusterTabletMapTest, MoreConsumerTablets) { + RunSetUp(3, 8); + ValidateOverlap(); +} + TEST_F(XClusterTabletSplitITest, SplittingWithXClusterReplicationOnConsumer) { // Perform a split on the consumer side and ensure replication still works. @@ -298,6 +458,8 @@ TEST_F(XClusterTabletSplitITest, SplittingWithXClusterReplicationOnConsumer) { // Perform a split on the CONSUMER cluster. ASSERT_OK(SplitTabletAndValidate(split_hash_code, kDefaultNumRows)); + ValidateOverlap(); + SwitchToProducer(); // Write another set of rows, and make sure the new poller picks up on the changes. @@ -323,6 +485,8 @@ TEST_F(XClusterTabletSplitITest, SplittingWithXClusterReplicationOnProducer) { ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1)); ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows)); + + ValidateOverlap(); } TEST_F(XClusterTabletSplitITest, MultipleSplitsDuringPausedReplication) { @@ -379,6 +543,8 @@ TEST_F(XClusterTabletSplitITest, MultipleSplitsInSequence) { // Write some more rows and check that everything is replicated correctly. ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1)); ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows)); + + ValidateOverlap(); } TEST_F(XClusterTabletSplitITest, SplittingOnProducerAndConsumer) { @@ -434,6 +600,8 @@ TEST_F(XClusterTabletSplitITest, SplittingOnProducerAndConsumer) { size_t num_rows = ASSERT_RESULT(SelectRowsCount(producer_session, table_)); ASSERT_OK(CheckForNumRowsOnConsumer(num_rows)); + + ValidateOverlap(); } TEST_F(XClusterTabletSplitITest, ConsumerClusterFailureWhenProcessingSplitOp) { @@ -708,4 +876,17 @@ TEST_F(NotSupportedTabletSplitITest, SplittingWithXClusterReplicationOnConsumer) producer_cluster->Shutdown(); } +namespace { +template +std::string TestParamToString(const testing::TestParamInfo& param_info) { + return ToString(param_info.param); +} +} // namespace + +INSTANTIATE_TEST_CASE_P( + xClusterTabletMapTestITest, + xClusterTabletMapTest, + ::testing::ValuesIn(client::kv_table_test::kPartitioningArray), + TestParamToString); + } // namespace yb diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 12b4ae110bfb..a2a6d48480c8 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -634,31 +634,6 @@ class IndexInfoBuilder { IndexInfoPB& index_info_; }; -template -Status CheckIfTableDeletedOrNotVisibleToClient(const Lock& lock) { - // This covers both in progress and fully deleted objects. - if (lock->started_deleting()) { - return STATUS_EC_FORMAT( - NotFound, MasterError(MasterErrorPB::OBJECT_NOT_FOUND), - "The object '$0.$1' does not exist", lock->namespace_id(), lock->name()); - } - if (!lock->visible_to_client()) { - return STATUS_EC_FORMAT( - ServiceUnavailable, MasterError(MasterErrorPB::OBJECT_NOT_FOUND), - "The object '$0.$1' is not running", lock->namespace_id(), lock->name()); - } - return Status::OK(); -} - -template -Status CheckIfTableDeletedOrNotVisibleToClient(const Lock& lock, RespClass* resp) { - auto status = CheckIfTableDeletedOrNotVisibleToClient(lock); - if (!status.ok()) { - return SetupError(resp->mutable_error(), status); - } - return Status::OK(); -} - #define VERIFY_NAMESPACE_FOUND(expr, resp) RESULT_CHECKER_HELPER( \ expr, \ if (!__result.ok()) { \ @@ -2381,7 +2356,7 @@ Status CatalogManager::AddIndexInfoToTable(const scoped_refptr& index << yb::ToString(index_info); TRACE("Locking indexed table"); auto l = DCHECK_NOTNULL(indexed_table)->LockForWrite(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); // Make sure that the index appears to not have been added to the table until the tservers apply // the alter and respond back. @@ -2728,18 +2703,27 @@ Status CatalogManager::DoSplitTablet( std::array new_tablets_partition = VERIFY_RESULT( CreateNewTabletsPartition(*source_tablet_info, split_partition_key)); - std::array new_tablet_ids; + std::array new_tablet_ids_sorted; for (int i = 0; i < kNumSplitParts; ++i) { - if (i < source_tablet_lock->pb.split_tablet_ids_size()) { - // Post-split tablet `i` has been already registered. - new_tablet_ids[i] = source_tablet_lock->pb.split_tablet_ids(i); - } else { + TabletId child_tablet_id; + for (const auto& split_tablet_id : source_tablet_lock->pb.split_tablet_ids()) { + const auto child_tablet = VERIFY_RESULT(GetTabletInfo(split_tablet_id)); + const auto child_partition = child_tablet->LockForRead()->pb.partition(); + if (child_partition.partition_key_start() == new_tablets_partition[i].partition_key_start()) { + child_tablet_id = split_tablet_id; + break; + } + } + + if (child_tablet_id.empty()) { auto new_tablet_info = VERIFY_RESULT(RegisterNewTabletForSplit( source_tablet_info.get(), new_tablets_partition[i], &source_table_lock, &source_tablet_lock)); - new_tablet_ids[i] = new_tablet_info->id(); + child_tablet_id = new_tablet_info->id(); } + + new_tablet_ids_sorted[i] = child_tablet_id; } source_tablet_lock.Commit(); source_table_lock.Commit(); @@ -2748,7 +2732,7 @@ Status CatalogManager::DoSplitTablet( // TODO(tsplit): what if source tablet will be deleted before or during TS leader is processing // split? Add unit-test. RETURN_NOT_OK(SendSplitTabletRequest( - source_tablet_info, new_tablet_ids, split_encoded_key, split_partition_key)); + source_tablet_info, new_tablet_ids_sorted, split_encoded_key, split_partition_key)); return Status::OK(); } @@ -3329,7 +3313,8 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } TRACE("Locking indexed table"); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(indexed_table->LockForRead(), resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient( + indexed_table->LockForRead(), resp)); } // Determine if this table should be colocated with its database. @@ -4211,7 +4196,7 @@ Status CatalogManager::GetGlobalTransactionStatusTablets( auto global_txn_table = VERIFY_RESULT(GetGlobalTransactionStatusTable()); auto l = global_txn_table->LockForRead(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); for (const auto& tablet : global_txn_table->GetTablets()) { TabletLocationsPB locs_pb; @@ -4375,7 +4360,7 @@ Status CatalogManager::CreateMetricsSnapshotsTableIfNeeded(rpc::RpcContext *rpc) Result CatalogManager::IsCreateTableDone(const TableInfoPtr& table) { TRACE("Locking table"); auto l = table->LockForRead(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l)); const auto& pb = l->pb; // 2. Verify if the create is in-progress. @@ -4845,7 +4830,7 @@ Status CatalogManager::TruncateTable(const TableId& table_id, TRACE(Substitute("Locking object with id $0", table_id)); auto l = table->LockForRead(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); // Truncate on a colocated table should not hit master because it should be handled by a write // DML that creates a table-level tombstone. @@ -4937,7 +4922,8 @@ Status CatalogManager::IsTruncateTableDone(const IsTruncateTableDoneRequestPB* r } TRACE("Locking table"); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(table->LockForRead(), resp)); + RETURN_NOT_OK( + CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(table->LockForRead(), resp)); resp->set_done(!table->HasTasks(MonitoredTask::Type::ASYNC_TRUNCATE_TABLET)); return Status::OK(); @@ -5750,7 +5736,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, TRACE("Locking table"); auto l = table->LockForWrite(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); bool has_changes = false; auto& table_pb = l.mutable_data()->pb; @@ -5931,7 +5917,7 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req, TRACE("Locking table"); auto l = table->LockForRead(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); // 2. Verify if the alter is in-progress. TRACE("Verify if there is an alter operation in progress for $0", table->ToString()); @@ -6025,7 +6011,7 @@ Status CatalogManager::GetTableSchemaInternal(const GetTableSchemaRequestPB* req TRACE("Locking table"); auto l = table->LockForRead(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); if (l->pb.has_fully_applied_schema()) { // An AlterTable is in progress; fully_applied_schema is the last @@ -6156,7 +6142,7 @@ Status CatalogManager::GetColocatedTabletSchema(const GetColocatedTabletSchemaRe { TRACE("Locking table"); auto l = parent_colocated_table->LockForRead(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); } if (!parent_colocated_table->colocated() || !parent_colocated_table->IsColocatedDbParentTable()) { @@ -10176,7 +10162,7 @@ Status CatalogManager::GetTableLocations( } auto l = table->LockForRead(); - RETURN_NOT_OK(CheckIfTableDeletedOrNotVisibleToClient(l, resp)); + RETURN_NOT_OK(CatalogManagerUtil::CheckIfTableDeletedOrNotVisibleToClient(l, resp)); vector> tablets; table->GetTabletsInRange(req, &tablets); diff --git a/src/yb/master/catalog_manager_util.h b/src/yb/master/catalog_manager_util.h index 6fbc17a5de56..139210bfe52c 100644 --- a/src/yb/master/catalog_manager_util.h +++ b/src/yb/master/catalog_manager_util.h @@ -19,6 +19,7 @@ #include "yb/consensus/consensus_fwd.h" #include "yb/master/catalog_entity_info.h" +#include "yb/master/master_error.h" #include "yb/master/master_fwd.h" #include "yb/master/ts_descriptor.h" @@ -145,6 +146,31 @@ class CatalogManagerUtil { return sequences_data_table_filter_; } + template + static Status CheckIfTableDeletedOrNotVisibleToClient(const Lock& lock) { + // This covers both in progress and fully deleted objects. + if (lock->started_deleting()) { + return STATUS_EC_FORMAT( + NotFound, MasterError(MasterErrorPB::OBJECT_NOT_FOUND), + "The object '$0.$1' does not exist", lock->namespace_id(), lock->name()); + } + if (!lock->visible_to_client()) { + return STATUS_EC_FORMAT( + ServiceUnavailable, MasterError(MasterErrorPB::OBJECT_NOT_FOUND), + "The object '$0.$1' is not running", lock->namespace_id(), lock->name()); + } + return Status::OK(); + } + + template + static Status CheckIfTableDeletedOrNotVisibleToClient(const Lock& lock, RespClass* resp) { + auto status = CheckIfTableDeletedOrNotVisibleToClient(lock); + if (!status.ok()) { + return SetupError(resp->mutable_error(), status); + } + return Status::OK(); + } + private: CatalogManagerUtil();