diff --git a/src/yb/client/client-test.cc b/src/yb/client/client-test.cc index 356544062760..ea73d927c65c 100644 --- a/src/yb/client/client-test.cc +++ b/src/yb/client/client-test.cc @@ -2560,7 +2560,7 @@ class CompactionClientTest : public ClientTest { ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1; ClientTest::SetUp(); time_before_compaction_ = - ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()->clock()->Now(); + ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Now(); } protected: diff --git a/src/yb/client/clone_namespace-test.cc b/src/yb/client/clone_namespace-test.cc index 7b3e95999b0e..3dc319e55b11 100644 --- a/src/yb/client/clone_namespace-test.cc +++ b/src/yb/client/clone_namespace-test.cc @@ -17,8 +17,6 @@ #include "yb/common/wire_protocol.h" -#include "yb/master/master.h" - #include "yb/util/backoff_waiter.h" DECLARE_bool(enable_db_clone); @@ -88,10 +86,10 @@ TEST_F(CloneNamespaceTest, Clone) { // Write two sets of rows. ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 0 /* transaction */)); auto row_count1 = CountTableRows(table_); - auto ht1 = cluster_->mini_master()->master()->clock()->Now(); + auto ht1 = cluster_->mini_master()->Now(); ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 1) /* transaction */); auto row_count2 = CountTableRows(table_); - auto ht2 = cluster_->mini_master()->master()->clock()->Now(); + auto ht2 = cluster_->mini_master()->Now(); ASSERT_OK(CloneAndWait( kTableName.namespace_name(), YQLDatabase::YQL_DATABASE_CQL, ht1, kTargetNamespaceName)); @@ -114,7 +112,7 @@ TEST_F(CloneNamespaceTest, Clone) { TEST_F(CloneNamespaceTest, CloneWithNoSchedule) { // Write one row. ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 0 /* transaction */)); - auto ht = cluster_->mini_master()->master()->clock()->Now(); + auto ht = cluster_->mini_master()->Now(); auto status = CloneAndWait( kTableName.namespace_name(), YQLDatabase::YQL_DATABASE_CQL, ht, kTargetNamespaceName); @@ -129,7 +127,7 @@ TEST_F(CloneNamespaceTest, CloneAfterDrop) { ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id)); ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 0 /* transaction */)); - auto ht = cluster_->mini_master()->master()->clock()->Now(); + auto ht = cluster_->mini_master()->Now(); ASSERT_OK(client_->DeleteTable(kTableName)); @@ -147,7 +145,7 @@ TEST_F(CloneNamespaceTest, DropClonedNamespace) { snapshot_util_->CreateSchedule(table_, kTableName.namespace_type(), kTableName.namespace_name())); ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id)); - auto ht = cluster_->mini_master()->master()->clock()->Now(); + auto ht = cluster_->mini_master()->Now(); ASSERT_OK(CloneAndWait( kTableName.namespace_name(), YQLDatabase::YQL_DATABASE_CQL, ht, kTargetNamespaceName)); @@ -171,7 +169,7 @@ TEST_F(CloneNamespaceTest, CloneFromOldestSnapshot) { auto first_snapshot = ASSERT_RESULT(snapshot_util_->WaitScheduleSnapshot(schedule_id)); ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 0 /* transaction */)); - auto ht = cluster_->mini_master()->master()->clock()->Now(); + auto ht = cluster_->mini_master()->Now(); // Wait for the first snapshot to be deleted and check that we can clone to a time between the // first and the second snapshot's hybrid times. diff --git a/src/yb/client/snapshot_schedule-test.cc b/src/yb/client/snapshot_schedule-test.cc index 9bba828c6ec7..b2a59827efd1 100644 --- a/src/yb/client/snapshot_schedule-test.cc +++ b/src/yb/client/snapshot_schedule-test.cc @@ -28,7 +28,6 @@ #include "yb/common/colocated_util.h" #include "yb/common/wire_protocol.h" -#include "yb/master/master.h" #include "yb/master/master_backup.pb.h" #include "yb/master/master_types.pb.h" #include "yb/master/mini_master.h" @@ -241,7 +240,7 @@ TEST_F(SnapshotScheduleTest, Index) { ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id)); CreateIndex(Transactional::kTrue, 1, false); - auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now(); + auto hybrid_time = cluster_->mini_master(0)->Now(); constexpr int kTransaction = 0; constexpr auto op_type = WriteOpType::INSERT; @@ -307,7 +306,7 @@ TEST_F(SnapshotScheduleTest, RestoreSchema) { auto schedule_id = ASSERT_RESULT( snapshot_util_->CreateSchedule(table_, kTableName.namespace_type(), kTableName.namespace_name())); - auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now(); + auto hybrid_time = cluster_->mini_master(0)->Now(); auto old_schema = table_.schema(); auto alterer = client_->NewTableAlterer(table_.name()); auto* column = alterer->AddColumn("new_column"); @@ -342,9 +341,9 @@ TEST_F(SnapshotScheduleTest, RemoveNewTablets) { auto schedule_id = ASSERT_RESULT(snapshot_util_->CreateSchedule( table_, kTableName.namespace_type(), kTableName.namespace_name(), WaitSnapshot::kTrue, kInterval, kRetention)); - auto before_index_ht = cluster_->mini_master(0)->master()->clock()->Now(); + auto before_index_ht = cluster_->mini_master(0)->Now(); CreateIndex(Transactional::kTrue, 1, false); - auto after_time_ht = cluster_->mini_master(0)->master()->clock()->Now(); + auto after_time_ht = cluster_->mini_master(0)->Now(); ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id, after_time_ht)); auto snapshot_id = ASSERT_RESULT(snapshot_util_->PickSuitableSnapshot(schedule_id, before_index_ht)); @@ -395,7 +394,7 @@ TEST_F(SnapshotScheduleTest, DeletedNamespace) { ASSERT_TRUE(schedule[0].options().filter().tables().tables(0).namespace_().has_id()); // Restore should not fatal. - auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now(); + auto hybrid_time = cluster_->mini_master(0)->Now(); ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id, hybrid_time)); auto snapshot_id = ASSERT_RESULT(snapshot_util_->PickSuitableSnapshot( schedule_id, hybrid_time)); @@ -416,12 +415,12 @@ TEST_F(SnapshotScheduleTest, MasterHistoryRetentionNoSchedule) { // Since FLAGS_timestamp_syscatalog_history_retention_interval_sec is 120, // history retention should be t-120 where t is the current time obtained by // GetRetentionDirective() call. - auto& sys_catalog = cluster_->mini_master(0)->master()->sys_catalog(); + auto& sys_catalog = cluster_->mini_master(0)->sys_catalog(); auto tablet = ASSERT_RESULT(sys_catalog.tablet_peer()->shared_tablet_safe()); auto directive = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff; // current_time-120 should be >= t-120 since current_time >= t. // We bound this error by 1s * kTimeMultiplier. - HybridTime expect = cluster_->mini_master(0)->master()->clock()->Now().AddSeconds( + HybridTime expect = cluster_->mini_master(0)->Now().AddSeconds( -FLAGS_timestamp_syscatalog_history_retention_interval_sec); ASSERT_GE(expect, directive.primary_cutoff_ht); ASSERT_LE(expect, directive.primary_cutoff_ht.AddSeconds(1 * kTimeMultiplier)); @@ -443,7 +442,7 @@ TEST_F(SnapshotScheduleTest, MasterHistoryRetentionNoSchedule) { directive = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff; // current_time-120 should be >= t-120 since current_time >= t. // We bound this error by 1s * kTimeMultiplier. - expect = cluster_->mini_master(0)->master()->clock()->Now().AddSeconds( + expect = cluster_->mini_master(0)->Now().AddSeconds( -FLAGS_timestamp_history_retention_interval_sec); ASSERT_GE(expect, directive.primary_cutoff_ht); ASSERT_LE(expect, directive.primary_cutoff_ht.AddSeconds(1 * kTimeMultiplier)); @@ -471,7 +470,7 @@ TEST_F(SnapshotScheduleTest, MasterHistoryRetentionWithSchedule) { // last_snapshot_time for all the tables except docdb metadata table // for which it should be t-kRetention where t is the current time // obtained by AllowedHistoryCutoffProvider(). - auto& sys_catalog = cluster_->mini_master(0)->master()->sys_catalog(); + auto& sys_catalog = cluster_->mini_master(0)->sys_catalog(); auto tablet = ASSERT_RESULT(sys_catalog.tablet_peer()->shared_tablet_safe()); // Because the snapshot interval is quite high (10s), at some point // the returned history retention should become equal to last snapshot time. @@ -480,7 +479,7 @@ TEST_F(SnapshotScheduleTest, MasterHistoryRetentionWithSchedule) { // GetRetentionDirective() very frequently, at some point it should catch up. ASSERT_OK(WaitFor([&tablet, this, schedule_id, kRetention]() -> Result { auto directive = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff; - auto expect = cluster_->mini_master(0)->master()->clock()->Now().AddDelta(-kRetention); + auto expect = cluster_->mini_master(0)->Now().AddDelta(-kRetention); auto schedules = VERIFY_RESULT(snapshot_util_->ListSchedules(schedule_id)); RSTATUS_DCHECK_EQ(schedules.size(), 1, Corruption, "There should be only one schedule"); HybridTime most_recent = HybridTime::kMin; diff --git a/src/yb/integration-tests/auto_flags-itest.cc b/src/yb/integration-tests/auto_flags-itest.cc index 8e4555fc5743..e4d739c0ea0e 100644 --- a/src/yb/integration-tests/auto_flags-itest.cc +++ b/src/yb/integration-tests/auto_flags-itest.cc @@ -681,15 +681,15 @@ TEST_F(AutoFlagsMiniClusterTest, AddTserverBeforeApplyDelay) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_auto_flags_apply_delay_ms) = kApplyDelayMs; ASSERT_OK(RunSetUp()); - auto leader_master = ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master(); - auto now_ht = [&leader_master]() { return leader_master->clock()->Now(); }; + auto leader_master = ASSERT_RESULT(cluster_->GetLeaderMiniMaster()); + auto now_ht = [&leader_master]() { return leader_master->Now(); }; uint32 expected_config_version = 0; CHECK_OK(ValidateConfigOnAllProcesses(expected_config_version)); AutoFlagsConfigPB config; ASSERT_NO_FATALS( config = PromoteFlagsAndValidate( - ++expected_config_version, leader_master, AutoFlagClass::kExternal)); + ++expected_config_version, leader_master->master(), AutoFlagClass::kExternal)); ASSERT_TRUE(config.has_config_apply_time()); HybridTime config_apply_ht; diff --git a/src/yb/integration-tests/xcluster/xcluster_ddl_replication_test_base.cc b/src/yb/integration-tests/xcluster/xcluster_ddl_replication_test_base.cc index 7a6192bacb2a..ac40ad94ed96 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ddl_replication_test_base.cc +++ b/src/yb/integration-tests/xcluster/xcluster_ddl_replication_test_base.cc @@ -20,10 +20,8 @@ #include "yb/common/common_types.pb.h" #include "yb/integration-tests/xcluster/xcluster_test_base.h" #include "yb/integration-tests/xcluster/xcluster_ysql_test_base.h" -#include "yb/master/master.h" #include "yb/master/mini_master.h" #include "yb/tserver/mini_tablet_server.h" -#include "yb/tserver/tablet_server.h" #include "yb/util/backoff_waiter.h" DECLARE_bool(enable_xcluster_api_v2); @@ -180,10 +178,9 @@ void XClusterDDLReplicationTestBase::InsertRowsIntoProducerTableAndVerifyConsume } Status XClusterDDLReplicationTestBase::WaitForSafeTimeToAdvanceToNowWithoutDDLQueue() { - auto producer_master = VERIFY_RESULT(producer_cluster()->GetLeaderMiniMaster())->master(); - HybridTime now = producer_master->clock()->Now(); + HybridTime now = VERIFY_RESULT(producer_cluster()->GetLeaderMiniMaster())->Now(); for (auto ts : producer_cluster()->mini_tablet_servers()) { - now.MakeAtLeast(ts->server()->clock()->Now()); + now.MakeAtLeast(ts->Now()); } auto namespace_id = VERIFY_RESULT(GetNamespaceId(consumer_client())); diff --git a/src/yb/integration-tests/xcluster/xcluster_test_base.cc b/src/yb/integration-tests/xcluster/xcluster_test_base.cc index 22606dac4124..902b43c95f64 100644 --- a/src/yb/integration-tests/xcluster/xcluster_test_base.cc +++ b/src/yb/integration-tests/xcluster/xcluster_test_base.cc @@ -31,7 +31,6 @@ #include "yb/integration-tests/cdc_test_util.h" #include "yb/integration-tests/mini_cluster.h" #include "yb/master/catalog_manager.h" -#include "yb/master/master.h" #include "yb/master/master_ddl.pb.h" #include "yb/master/master_ddl.proxy.h" #include "yb/master/master_defaults.h" @@ -916,10 +915,9 @@ Status XClusterTestBase::WaitForSafeTimeToAdvanceToNow(std::vectorGetLeaderMiniMaster())->master(); - HybridTime now = producer_master->clock()->Now(); + HybridTime now = VERIFY_RESULT(producer_cluster()->GetLeaderMiniMaster())->Now(); for (auto ts : producer_cluster()->mini_tablet_servers()) { - now.MakeAtLeast(ts->server()->clock()->Now()); + now.MakeAtLeast(ts->Now()); } for (const auto& name : namespace_names) { diff --git a/src/yb/master/catalog_entity_info.cc b/src/yb/master/catalog_entity_info.cc index 9bcc254ad37f..aea1a899e23a 100644 --- a/src/yb/master/catalog_entity_info.cc +++ b/src/yb/master/catalog_entity_info.cc @@ -1182,6 +1182,10 @@ bool PersistentTableInfo::is_index() const { return !indexed_table_id().empty(); } +bool PersistentTableInfo::is_vector_index() const { + return pb.index_info().has_vector_idx_options(); +} + const std::string& PersistentTableInfo::indexed_table_id() const { static const std::string kEmptyString; return pb.has_index_info() diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 10bb6a86c05b..4d898131702a 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -115,8 +115,8 @@ struct ExternalUDTypeSnapshotData { typedef std::unordered_map UDTypeMap; struct TableDescription { - scoped_refptr namespace_info; - scoped_refptr table_info; + NamespaceInfoPtr namespace_info; + TableInfoPtr table_info; TabletInfos tablet_infos; }; @@ -452,9 +452,10 @@ struct PersistentTableInfo : public Persistent { return pb.schema(); } - const std::string& indexed_table_id() const; + const TableId& indexed_table_id() const; bool is_index() const; + bool is_vector_index() const; SchemaPB* mutable_schema() { return pb.mutable_schema(); diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 787182f0f381..4b2876bdda2f 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -12255,7 +12255,12 @@ Status CatalogManager::CollectTable( return Status::OK(); } if (flags.Test(CollectFlag::kIncludeParentColocatedTable) && lock->pb.colocated()) { - if (!IsColocationParentTableId(table_description.table_info->id())) { + bool add_parent = !IsColocationParentTableId(table_description.table_info->id()); + if (add_parent && lock->is_vector_index()) { + auto indexed_table = VERIFY_RESULT(FindTableById(lock->indexed_table_id())); + add_parent = indexed_table->LockForRead()->pb.colocated(); + } + if (add_parent) { // If a table is colocated, add its parent colocated table as well. TableId parent_table_id = VERIFY_RESULT( GetParentTableIdForColocatedTable(table_description.table_info)); @@ -12665,31 +12670,28 @@ Result CatalogManager::GetParentTableIdForColocatedTable( } Result CatalogManager::GetParentTableIdForColocatedTableUnlocked( - const scoped_refptr& table) { + const TableInfoPtr& table) { DCHECK(table->colocated()); DCHECK(!IsColocationParentTableId(table->id())); - auto ns_info = VERIFY_RESULT(FindNamespaceByIdUnlocked(table->namespace_id())); - TableId parent_table_id; + auto table_lock = table->LockForRead(); + auto ns_info = VERIFY_RESULT(FindNamespaceByIdUnlocked(table_lock->namespace_id())); auto tablegroup = tablegroup_manager_->FindByTable(table->id()); if (ns_info->colocated()) { // Two types of colocated database: (1) pre-Colocation GA (2) Colocation GA // Colocated databases created before Colocation GA don't use tablegroup // to manage its colocated tables. - if (tablegroup) - parent_table_id = GetColocationParentTableId(tablegroup->id()); - else - parent_table_id = GetColocatedDbParentTableId(table->namespace_id()); - } else { - RSTATUS_DCHECK(tablegroup != nullptr, - Corruption, - Format("Not able to find the tablegroup for a colocated table $0 whose database " - "is not colocated.", table->id())); - parent_table_id = GetTablegroupParentTableId(tablegroup->id()); + if (tablegroup) { + return GetColocationParentTableId(tablegroup->id()); + } + return GetColocatedDbParentTableId(table_lock->namespace_id()); } - - return parent_table_id; + RSTATUS_DCHECK(tablegroup != nullptr, + Corruption, + Format("Not able to find the tablegroup for a colocated table $0 whose database " + "is not colocated.", table->id())); + return GetTablegroupParentTableId(tablegroup->id()); } Result> CatalogManager::GetConsumerRegistry() { diff --git a/src/yb/master/mini_master.cc b/src/yb/master/mini_master.cc index f63884ab1a1e..a2e94289d816 100644 --- a/src/yb/master/mini_master.cc +++ b/src/yb/master/mini_master.cc @@ -306,5 +306,9 @@ FsManager& MiniMaster::fs_manager() const { return *master_->fs_manager(); } +HybridTime MiniMaster::Now() const { + return master_->clock()->Now(); +} + } // namespace master } // namespace yb diff --git a/src/yb/master/mini_master.h b/src/yb/master/mini_master.h index 53fc472dd1ce..5b8948c82a8d 100644 --- a/src/yb/master/mini_master.h +++ b/src/yb/master/mini_master.h @@ -34,6 +34,8 @@ #include #include +#include "yb/common/hybrid_time.h" + #include "yb/gutil/macros.h" #include "yb/gutil/port.h" @@ -119,6 +121,8 @@ class MiniMaster { std::string ToString() const; + HybridTime Now() const; + private: Status StartDistributedMasterOnPorts(uint16_t rpc_port, uint16_t web_port, const std::vector& peer_ports); diff --git a/src/yb/qlexpr/index.cc b/src/yb/qlexpr/index.cc index b71840a48ce0..c2e6439b831f 100644 --- a/src/yb/qlexpr/index.cc +++ b/src/yb/qlexpr/index.cc @@ -148,7 +148,7 @@ void IndexInfo::ToPB(IndexInfoPB* pb) const { pb->mutable_where_predicate_spec()->CopyFrom(*where_predicate_spec_); } - if (is_vector_idx()) { + if (is_vector_index()) { *pb->mutable_vector_idx_options() = vector_idx_options(); } } @@ -282,7 +282,7 @@ size_t IndexInfo::DynamicMemoryUsage() const { return size; } -bool IndexInfo::is_vector_idx() const { +bool IndexInfo::is_vector_index() const { return vector_idx_options_.has_value(); } diff --git a/src/yb/qlexpr/index.h b/src/yb/qlexpr/index.h index 0e3ae73f798c..b7ad4383d606 100644 --- a/src/yb/qlexpr/index.h +++ b/src/yb/qlexpr/index.h @@ -137,7 +137,7 @@ class IndexInfo { size_t DynamicMemoryUsage() const; - bool is_vector_idx() const; + bool is_vector_index() const; const PgVectorIdxOptionsPB& vector_idx_options() const { return *vector_idx_options_; diff --git a/src/yb/tablet/tablet_metadata.cc b/src/yb/tablet/tablet_metadata.cc index bd103f2b78bb..4a4b11b7a4a9 100644 --- a/src/yb/tablet/tablet_metadata.cc +++ b/src/yb/tablet/tablet_metadata.cc @@ -134,7 +134,7 @@ std::vector GetVectorIndexedColumns(const qlexpr::IndexMap& index_map) result.reserve(1); // It is expected to have only 1 vector-indexed column. for (const auto& it : index_map) { const auto& index = it.second; - if (!index.is_vector_idx()) { + if (!index.is_vector_index()) { continue; } const auto& vector_options = index.vector_idx_options(); @@ -269,7 +269,7 @@ TableInfo::TableInfo(const TableInfo& other, SchemaVersion min_schema_version) TableInfo::~TableInfo() = default; void TableInfo::CompleteInit() { - if (index_info && index_info->is_vector_idx()) { + if (index_info && index_info->is_vector_index()) { doc_read_context->vector_idx_options = index_info->vector_idx_options(); // TODO(vector-index) could be removed if PG uses DataType::VECTOR for a column. @@ -470,7 +470,7 @@ bool TableInfo::TEST_Equals(const TableInfo& lhs, const TableInfo& rhs) { } bool TableInfo::NeedVectorIndex() const { - return index_info && index_info->is_vector_idx() && + return index_info && index_info->is_vector_index() && index_info->vector_idx_options().idx_type() != PgVectorIndexType::DUMMY; } @@ -546,18 +546,34 @@ Status KvStoreInfo::RestoreMissingValuesAndMergeTableSchemaPackings( const KvStoreInfoPB& snapshot_kvstoreinfo, const TableId& primary_table_id, bool colocated, dockv::OverwriteSchemaPacking overwrite) { if (!colocated) { - SCHECK( - snapshot_kvstoreinfo.tables_size() == 1 && tables.size() == 1, Corruption, - Format( - "Unexpected table counts during schema merge. Snapshot tables and restored tables " - "should both be non-colocated (singular). Snapshot table count: $0, restored table " - "count: $1", - snapshot_kvstoreinfo.tables_size(), tables.size())); - auto schema = tables.begin()->second->doc_read_context->mutable_schema(); + RSTATUS_DCHECK_GE( + snapshot_kvstoreinfo.tables_size(), 1, Corruption, "Unexpected snapshot tables count"); + const TableInfoPB* primary_table_info = nullptr; + for (const auto& table_info : snapshot_kvstoreinfo.tables()) { + if (table_info.index_info().has_vector_idx_options()) { + continue; + } + RSTATUS_DCHECK( + !primary_table_info, Corruption, + "Only vector indexes could be colocated to the non colocated table"); + primary_table_info = &table_info; + } + RSTATUS_DCHECK(primary_table_info != nullptr, Corruption, "Primary table info is missing"); + RSTATUS_DCHECK_GE(tables.size(), 1ULL, Corruption, "Unexpected restored tables count"); + for (const auto& [table_id, table_info] : tables) { + if (table_id == primary_table_id) { + RETURN_NOT_OK(table_info->MergeSchemaPackings(*primary_table_info, overwrite)); + continue; + } + RSTATUS_DCHECK( + table_info->index_info && table_info->index_info->is_vector_index(), Corruption, + "Only vector indexes could be colocated to the non colocated table"); + } if (overwrite) { - schema->UpdateMissingValuesFrom(snapshot_kvstoreinfo.tables(0).schema().columns()); + auto schema = tables.begin()->second->doc_read_context->mutable_schema(); + schema->UpdateMissingValuesFrom(primary_table_info->schema().columns()); } - return tables.begin()->second->MergeSchemaPackings(snapshot_kvstoreinfo.tables(0), overwrite); + return Status::OK(); } for (const auto& snapshot_table_pb : snapshot_kvstoreinfo.tables()) { diff --git a/src/yb/tserver/mini_tablet_server.cc b/src/yb/tserver/mini_tablet_server.cc index eefc6c18297d..f8f3bfdf7df9 100644 --- a/src/yb/tserver/mini_tablet_server.cc +++ b/src/yb/tserver/mini_tablet_server.cc @@ -367,5 +367,10 @@ const MemTrackerPtr& MiniTabletServer::mem_tracker() const { return server_->mem_tracker(); } +HybridTime MiniTabletServer::Now() const { + CHECK(started_); + return server_->clock()->Now(); +} + } // namespace tserver } // namespace yb diff --git a/src/yb/tserver/mini_tablet_server.h b/src/yb/tserver/mini_tablet_server.h index bc62737f8c96..511699b06a33 100644 --- a/src/yb/tserver/mini_tablet_server.h +++ b/src/yb/tserver/mini_tablet_server.h @@ -156,6 +156,7 @@ class MiniTabletServer { FsManager& fs_manager() const; MetricEntity& metric_entity() const; const MemTrackerPtr& mem_tracker() const; + HybridTime Now() const; private: bool started_; diff --git a/src/yb/tserver/tablet_server-test.cc b/src/yb/tserver/tablet_server-test.cc index c2afb22ecfe2..2d456519cb7d 100644 --- a/src/yb/tserver/tablet_server-test.cc +++ b/src/yb/tserver/tablet_server-test.cc @@ -151,7 +151,7 @@ TEST_F(TabletServerTest, TestServerClock) { RpcController controller; ASSERT_OK(generic_proxy_->ServerClock(req, &resp, &controller)); - ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(), resp.hybrid_time()); + ASSERT_GT(mini_server_->Now().ToUint64(), resp.hybrid_time()); } TEST_F(TabletServerTest, TestSetFlagsAndCheckWebPages) { @@ -412,14 +412,14 @@ TEST_F(TabletServerTest, TestInsert) { } // get the clock's current hybrid_time - HybridTime now_before = mini_server_->server()->clock()->Now(); + HybridTime now_before = mini_server_->Now(); rows_inserted = nullptr; ASSERT_OK(ShutdownAndRebuildTablet()); VerifyRows(schema_, { KeyValue(1, 1), KeyValue(2, 1), KeyValue(1234, 5678) }); // get the clock's hybrid_time after replay - HybridTime now_after = mini_server_->server()->clock()->Now(); + HybridTime now_after = mini_server_->Now(); // make sure 'now_after' is greater than or equal to 'now_before' ASSERT_GE(now_after.value(), now_before.value()); @@ -435,7 +435,7 @@ TEST_F(TabletServerTest, TestExternalConsistencyModes_ClientPropagated) { // Advance current to some time in the future. we do 5 secs to make // sure this hybrid_time will still be in the future when it reaches the // server. - HybridTime current = mini_server_->server()->clock()->Now().AddMicroseconds(5000000); + HybridTime current = mini_server_->Now().AddMicroseconds(5000000); AddTestRowInsert(1234, 5678, "hello world via RPC", &req); @@ -561,7 +561,7 @@ TEST_F(TabletServerTest, TestInsertAndMutate) { } // get the clock's current hybrid_time - HybridTime now_before = mini_server_->server()->clock()->Now(); + HybridTime now_before = mini_server_->Now(); ASSERT_NO_FATALS(WARN_NOT_OK(ShutdownAndRebuildTablet(), "Shutdown failed: ")); VerifyRows(schema_, @@ -569,7 +569,7 @@ TEST_F(TabletServerTest, TestInsertAndMutate) { KeyValue(1234, 2) }); // get the clock's hybrid_time after replay - HybridTime now_after = mini_server_->server()->clock()->Now(); + HybridTime now_after = mini_server_->Now(); // make sure 'now_after' is greater that or equal to 'now_before' ASSERT_GE(now_after.value(), now_before.value()); diff --git a/src/yb/yql/pgwrapper/pg_packed_row-test.cc b/src/yb/yql/pgwrapper/pg_packed_row-test.cc index 80fbdef4a1f8..acc956be410b 100644 --- a/src/yb/yql/pgwrapper/pg_packed_row-test.cc +++ b/src/yb/yql/pgwrapper/pg_packed_row-test.cc @@ -20,7 +20,6 @@ #include "yb/integration-tests/packed_row_test_base.h" -#include "yb/master/master.h" #include "yb/master/mini_master.h" #include "yb/rocksdb/db/db_impl.h" @@ -1044,7 +1043,7 @@ TEST_P(PgPackedRowTest, RestorePITRSnapshotAfterOldSchemaGC) { ASSERT_OK(conn.ExecuteFormat("DROP TABLE $0", kTableName)); - auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now(); + auto hybrid_time = cluster_->mini_master(0)->Now(); ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id, hybrid_time)); ASSERT_OK(cluster_->CompactTablets()); diff --git a/src/yb/yql/pgwrapper/pg_vector_index-test.cc b/src/yb/yql/pgwrapper/pg_vector_index-test.cc index 8d93a5353981..8a0deacc4e4c 100644 --- a/src/yb/yql/pgwrapper/pg_vector_index-test.cc +++ b/src/yb/yql/pgwrapper/pg_vector_index-test.cc @@ -11,6 +11,8 @@ // under the License. // +#include "yb/client/snapshot_test_util.h" + #include "yb/consensus/consensus.h" #include "yb/consensus/log.h" @@ -47,6 +49,10 @@ class PgVectorIndexTest : public PgMiniTestBase, public testing::WithParamInterf return GetParam(); } + std::string DbName() { + return IsColocated() ? "colocated_db" : "yugabyte"; + } + Result Connect() const override { return IsColocated() ? ConnectToDB("colocated_db") : PgMiniTestBase::Connect(); } @@ -81,8 +87,9 @@ class PgVectorIndexTest : public PgMiniTestBase, public testing::WithParamInterf Result MakeIndexAndFill(int num_rows, int num_tablets = 0); Status InsertRows(PGConn& conn, int start_row, int end_row); - Status VerifyRead(PGConn& conn, int limit, bool add_filter); - void VerifyRows(PGConn& conn, bool add_filter, const std::vector& expected); + void VerifyRead(PGConn& conn, int limit, bool add_filter); + void VerifyRows( + PGConn& conn, bool add_filter, const std::vector& expected, int limit = -1); void TestSimple(); void TestManyRows(bool add_filter); @@ -176,11 +183,11 @@ Result PgVectorIndexTest::MakeIndexAndFill(int num_rows, int num_tablets } void PgVectorIndexTest::VerifyRows( - PGConn& conn, bool add_filter, const std::vector& expected) { + PGConn& conn, bool add_filter, const std::vector& expected, int limit) { auto result = ASSERT_RESULT((conn.FetchRows(Format( "SELECT * FROM test $0 ORDER BY embedding <-> '[0.0, 0.0, 0.0]' LIMIT $1", add_filter ? "WHERE id + 3 <= 5" : "", - expected.size())))); + limit == -1 ? expected.size() : make_unsigned(limit))))); EXPECT_EQ(result.size(), expected.size()); for (size_t i = 0; i != std::min(result.size(), expected.size()); ++i) { SCOPED_TRACE(Format("Row $0", i)); @@ -188,17 +195,12 @@ void PgVectorIndexTest::VerifyRows( } } -Status PgVectorIndexTest::VerifyRead(PGConn& conn, int limit, bool add_filter) { - auto result = VERIFY_RESULT((conn.FetchRows(Format( - "SELECT * FROM test $0 ORDER BY embedding <-> '[0.0, 0.0, 0.0]' LIMIT $1", - add_filter ? "WHERE id + 3 <= 5" : "", - limit)))); +void PgVectorIndexTest::VerifyRead(PGConn& conn, int limit, bool add_filter) { std::vector expected; for (int i = 1; i <= limit; ++i) { expected.push_back(ExpectedRow(i)); } VerifyRows(conn, add_filter, expected); - return Status::OK(); } void PgVectorIndexTest::TestManyRows(bool add_filter) { @@ -206,7 +208,7 @@ void PgVectorIndexTest::TestManyRows(bool add_filter) { const int query_limit = add_filter ? 1 : 5; auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows)); - ASSERT_OK(VerifyRead(conn, query_limit, add_filter)); + ASSERT_NO_FATALS(VerifyRead(conn, query_limit, add_filter)); } TEST_P(PgVectorIndexTest, Split) { @@ -219,7 +221,7 @@ TEST_P(PgVectorIndexTest, Split) { // Give some time for split to happen. std::this_thread::sleep_for(2s * kTimeMultiplier); - ASSERT_OK(VerifyRead(conn, kQueryLimit, false)); + ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false)); } TEST_P(PgVectorIndexTest, ManyRows) { @@ -261,12 +263,12 @@ void PgVectorIndexTest::TestRestart(tablet::FlushFlags flush_flags) { constexpr int kQueryLimit = 5; auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows)); - ASSERT_OK(VerifyRead(conn, kQueryLimit, false)); + ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false)); ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync, flush_flags)); DisableFlushOnShutdown(*cluster_, true); ASSERT_OK(RestartCluster()); conn = ASSERT_RESULT(Connect()); - ASSERT_OK(VerifyRead(conn, kQueryLimit, false)); + ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false)); } TEST_P(PgVectorIndexTest, Restart) { @@ -344,11 +346,41 @@ TEST_P(PgVectorIndexTest, RemoteBootstrap) { } if (leader) { LOG(INFO) << "Step down: " << leader->permanent_uuid(); - RETURN_NOT_OK(StepDown(leader, mts->fs_manager().uuid(), ForceStepDown::kTrue)); + WARN_NOT_OK(StepDown(leader, mts->fs_manager().uuid(), ForceStepDown::kTrue), + "StepDown failed"); } return false; }, 60s * kTimeMultiplier, "Wait desired leader")); - ASSERT_OK(VerifyRead(conn, kQueryLimit, false)); + ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false)); +} + +TEST_P(PgVectorIndexTest, SnapshotSchedule) { + constexpr int kNumRows = 128; + constexpr int kQueryLimit = 5; + + client::SnapshotTestUtil snapshot_util; + snapshot_util.SetProxy(&client_->proxy_cache()); + snapshot_util.SetCluster(cluster_.get()); + + auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows)); + + auto schedule_id = ASSERT_RESULT(snapshot_util.CreateSchedule( + nullptr, YQL_DATABASE_PGSQL, DbName(), + client::WaitSnapshot::kTrue, 1s * kTimeMultiplier, 60s * kTimeMultiplier)); + + ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false)); + + auto hybrid_time = cluster_->mini_master(0)->Now(); + ASSERT_OK(snapshot_util.WaitScheduleSnapshot(schedule_id, hybrid_time)); + + ASSERT_OK(conn.Execute("DELETE FROM test")); + ASSERT_NO_FATALS(VerifyRows(conn, false, {}, 10)); + + auto snapshot_id = ASSERT_RESULT(snapshot_util.PickSuitableSnapshot( + schedule_id, hybrid_time)); + ASSERT_OK(snapshot_util.RestoreSnapshot(snapshot_id, hybrid_time)); + + ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false)); } std::string ColocatedToString(const testing::TestParamInfo& param_info) {