Skip to content

Commit

Permalink
[#25389] DocDB: Add point in time restore support for vector indexes
Browse files Browse the repository at this point in the history
Summary:
This diff adds point in time restore support for vector indexes.
Jira: DB-14618

Test Plan: PgVectorIndexTest.SnapshotSchedule/*

Reviewers: arybochkin, aleksandr.ponomarenko, zdrudi, xCluster, hsunder

Reviewed By: arybochkin, zdrudi

Subscribers: yql, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D40825
  • Loading branch information
spolitov committed Dec 22, 2024
1 parent 4ad8ee6 commit 124135a
Show file tree
Hide file tree
Showing 19 changed files with 151 additions and 91 deletions.
2 changes: 1 addition & 1 deletion src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2560,7 +2560,7 @@ class CompactionClientTest : public ClientTest {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
ClientTest::SetUp();
time_before_compaction_ =
ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()->clock()->Now();
ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Now();
}

protected:
Expand Down
14 changes: 6 additions & 8 deletions src/yb/client/clone_namespace-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include "yb/common/wire_protocol.h"

#include "yb/master/master.h"

#include "yb/util/backoff_waiter.h"

DECLARE_bool(enable_db_clone);
Expand Down Expand Up @@ -88,10 +86,10 @@ TEST_F(CloneNamespaceTest, Clone) {
// Write two sets of rows.
ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 0 /* transaction */));
auto row_count1 = CountTableRows(table_);
auto ht1 = cluster_->mini_master()->master()->clock()->Now();
auto ht1 = cluster_->mini_master()->Now();
ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 1) /* transaction */);
auto row_count2 = CountTableRows(table_);
auto ht2 = cluster_->mini_master()->master()->clock()->Now();
auto ht2 = cluster_->mini_master()->Now();

ASSERT_OK(CloneAndWait(
kTableName.namespace_name(), YQLDatabase::YQL_DATABASE_CQL, ht1, kTargetNamespaceName));
Expand All @@ -114,7 +112,7 @@ TEST_F(CloneNamespaceTest, Clone) {
TEST_F(CloneNamespaceTest, CloneWithNoSchedule) {
// Write one row.
ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 0 /* transaction */));
auto ht = cluster_->mini_master()->master()->clock()->Now();
auto ht = cluster_->mini_master()->Now();

auto status = CloneAndWait(
kTableName.namespace_name(), YQLDatabase::YQL_DATABASE_CQL, ht, kTargetNamespaceName);
Expand All @@ -129,7 +127,7 @@ TEST_F(CloneNamespaceTest, CloneAfterDrop) {
ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id));

ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 0 /* transaction */));
auto ht = cluster_->mini_master()->master()->clock()->Now();
auto ht = cluster_->mini_master()->Now();

ASSERT_OK(client_->DeleteTable(kTableName));

Expand All @@ -147,7 +145,7 @@ TEST_F(CloneNamespaceTest, DropClonedNamespace) {
snapshot_util_->CreateSchedule(table_, kTableName.namespace_type(),
kTableName.namespace_name()));
ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id));
auto ht = cluster_->mini_master()->master()->clock()->Now();
auto ht = cluster_->mini_master()->Now();

ASSERT_OK(CloneAndWait(
kTableName.namespace_name(), YQLDatabase::YQL_DATABASE_CQL, ht, kTargetNamespaceName));
Expand All @@ -171,7 +169,7 @@ TEST_F(CloneNamespaceTest, CloneFromOldestSnapshot) {

auto first_snapshot = ASSERT_RESULT(snapshot_util_->WaitScheduleSnapshot(schedule_id));
ASSERT_NO_FATALS(WriteData(WriteOpType::INSERT, 0 /* transaction */));
auto ht = cluster_->mini_master()->master()->clock()->Now();
auto ht = cluster_->mini_master()->Now();

// Wait for the first snapshot to be deleted and check that we can clone to a time between the
// first and the second snapshot's hybrid times.
Expand Down
21 changes: 10 additions & 11 deletions src/yb/client/snapshot_schedule-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "yb/common/colocated_util.h"

#include "yb/common/wire_protocol.h"
#include "yb/master/master.h"
#include "yb/master/master_backup.pb.h"
#include "yb/master/master_types.pb.h"
#include "yb/master/mini_master.h"
Expand Down Expand Up @@ -241,7 +240,7 @@ TEST_F(SnapshotScheduleTest, Index) {
ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id));

CreateIndex(Transactional::kTrue, 1, false);
auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now();
auto hybrid_time = cluster_->mini_master(0)->Now();
constexpr int kTransaction = 0;
constexpr auto op_type = WriteOpType::INSERT;

Expand Down Expand Up @@ -307,7 +306,7 @@ TEST_F(SnapshotScheduleTest, RestoreSchema) {
auto schedule_id = ASSERT_RESULT(
snapshot_util_->CreateSchedule(table_, kTableName.namespace_type(),
kTableName.namespace_name()));
auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now();
auto hybrid_time = cluster_->mini_master(0)->Now();
auto old_schema = table_.schema();
auto alterer = client_->NewTableAlterer(table_.name());
auto* column = alterer->AddColumn("new_column");
Expand Down Expand Up @@ -342,9 +341,9 @@ TEST_F(SnapshotScheduleTest, RemoveNewTablets) {
auto schedule_id = ASSERT_RESULT(snapshot_util_->CreateSchedule(
table_, kTableName.namespace_type(), kTableName.namespace_name(),
WaitSnapshot::kTrue, kInterval, kRetention));
auto before_index_ht = cluster_->mini_master(0)->master()->clock()->Now();
auto before_index_ht = cluster_->mini_master(0)->Now();
CreateIndex(Transactional::kTrue, 1, false);
auto after_time_ht = cluster_->mini_master(0)->master()->clock()->Now();
auto after_time_ht = cluster_->mini_master(0)->Now();
ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id, after_time_ht));
auto snapshot_id = ASSERT_RESULT(snapshot_util_->PickSuitableSnapshot(schedule_id,
before_index_ht));
Expand Down Expand Up @@ -395,7 +394,7 @@ TEST_F(SnapshotScheduleTest, DeletedNamespace) {
ASSERT_TRUE(schedule[0].options().filter().tables().tables(0).namespace_().has_id());

// Restore should not fatal.
auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now();
auto hybrid_time = cluster_->mini_master(0)->Now();
ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id, hybrid_time));
auto snapshot_id = ASSERT_RESULT(snapshot_util_->PickSuitableSnapshot(
schedule_id, hybrid_time));
Expand All @@ -416,12 +415,12 @@ TEST_F(SnapshotScheduleTest, MasterHistoryRetentionNoSchedule) {
// Since FLAGS_timestamp_syscatalog_history_retention_interval_sec is 120,
// history retention should be t-120 where t is the current time obtained by
// GetRetentionDirective() call.
auto& sys_catalog = cluster_->mini_master(0)->master()->sys_catalog();
auto& sys_catalog = cluster_->mini_master(0)->sys_catalog();
auto tablet = ASSERT_RESULT(sys_catalog.tablet_peer()->shared_tablet_safe());
auto directive = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff;
// current_time-120 should be >= t-120 since current_time >= t.
// We bound this error by 1s * kTimeMultiplier.
HybridTime expect = cluster_->mini_master(0)->master()->clock()->Now().AddSeconds(
HybridTime expect = cluster_->mini_master(0)->Now().AddSeconds(
-FLAGS_timestamp_syscatalog_history_retention_interval_sec);
ASSERT_GE(expect, directive.primary_cutoff_ht);
ASSERT_LE(expect, directive.primary_cutoff_ht.AddSeconds(1 * kTimeMultiplier));
Expand All @@ -443,7 +442,7 @@ TEST_F(SnapshotScheduleTest, MasterHistoryRetentionNoSchedule) {
directive = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff;
// current_time-120 should be >= t-120 since current_time >= t.
// We bound this error by 1s * kTimeMultiplier.
expect = cluster_->mini_master(0)->master()->clock()->Now().AddSeconds(
expect = cluster_->mini_master(0)->Now().AddSeconds(
-FLAGS_timestamp_history_retention_interval_sec);
ASSERT_GE(expect, directive.primary_cutoff_ht);
ASSERT_LE(expect, directive.primary_cutoff_ht.AddSeconds(1 * kTimeMultiplier));
Expand Down Expand Up @@ -471,7 +470,7 @@ TEST_F(SnapshotScheduleTest, MasterHistoryRetentionWithSchedule) {
// last_snapshot_time for all the tables except docdb metadata table
// for which it should be t-kRetention where t is the current time
// obtained by AllowedHistoryCutoffProvider().
auto& sys_catalog = cluster_->mini_master(0)->master()->sys_catalog();
auto& sys_catalog = cluster_->mini_master(0)->sys_catalog();
auto tablet = ASSERT_RESULT(sys_catalog.tablet_peer()->shared_tablet_safe());
// Because the snapshot interval is quite high (10s), at some point
// the returned history retention should become equal to last snapshot time.
Expand All @@ -480,7 +479,7 @@ TEST_F(SnapshotScheduleTest, MasterHistoryRetentionWithSchedule) {
// GetRetentionDirective() very frequently, at some point it should catch up.
ASSERT_OK(WaitFor([&tablet, this, schedule_id, kRetention]() -> Result<bool> {
auto directive = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff;
auto expect = cluster_->mini_master(0)->master()->clock()->Now().AddDelta(-kRetention);
auto expect = cluster_->mini_master(0)->Now().AddDelta(-kRetention);
auto schedules = VERIFY_RESULT(snapshot_util_->ListSchedules(schedule_id));
RSTATUS_DCHECK_EQ(schedules.size(), 1, Corruption, "There should be only one schedule");
HybridTime most_recent = HybridTime::kMin;
Expand Down
6 changes: 3 additions & 3 deletions src/yb/integration-tests/auto_flags-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -681,15 +681,15 @@ TEST_F(AutoFlagsMiniClusterTest, AddTserverBeforeApplyDelay) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_auto_flags_apply_delay_ms) = kApplyDelayMs;

ASSERT_OK(RunSetUp());
auto leader_master = ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master();
auto now_ht = [&leader_master]() { return leader_master->clock()->Now(); };
auto leader_master = ASSERT_RESULT(cluster_->GetLeaderMiniMaster());
auto now_ht = [&leader_master]() { return leader_master->Now(); };
uint32 expected_config_version = 0;
CHECK_OK(ValidateConfigOnAllProcesses(expected_config_version));

AutoFlagsConfigPB config;
ASSERT_NO_FATALS(
config = PromoteFlagsAndValidate(
++expected_config_version, leader_master, AutoFlagClass::kExternal));
++expected_config_version, leader_master->master(), AutoFlagClass::kExternal));

ASSERT_TRUE(config.has_config_apply_time());
HybridTime config_apply_ht;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
#include "yb/common/common_types.pb.h"
#include "yb/integration-tests/xcluster/xcluster_test_base.h"
#include "yb/integration-tests/xcluster/xcluster_ysql_test_base.h"
#include "yb/master/master.h"
#include "yb/master/mini_master.h"
#include "yb/tserver/mini_tablet_server.h"
#include "yb/tserver/tablet_server.h"
#include "yb/util/backoff_waiter.h"

DECLARE_bool(enable_xcluster_api_v2);
Expand Down Expand Up @@ -180,10 +178,9 @@ void XClusterDDLReplicationTestBase::InsertRowsIntoProducerTableAndVerifyConsume
}

Status XClusterDDLReplicationTestBase::WaitForSafeTimeToAdvanceToNowWithoutDDLQueue() {
auto producer_master = VERIFY_RESULT(producer_cluster()->GetLeaderMiniMaster())->master();
HybridTime now = producer_master->clock()->Now();
HybridTime now = VERIFY_RESULT(producer_cluster()->GetLeaderMiniMaster())->Now();
for (auto ts : producer_cluster()->mini_tablet_servers()) {
now.MakeAtLeast(ts->server()->clock()->Now());
now.MakeAtLeast(ts->Now());
}
auto namespace_id = VERIFY_RESULT(GetNamespaceId(consumer_client()));

Expand Down
6 changes: 2 additions & 4 deletions src/yb/integration-tests/xcluster/xcluster_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "yb/integration-tests/cdc_test_util.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/master/catalog_manager.h"
#include "yb/master/master.h"
#include "yb/master/master_ddl.pb.h"
#include "yb/master/master_ddl.proxy.h"
#include "yb/master/master_defaults.h"
Expand Down Expand Up @@ -916,10 +915,9 @@ Status XClusterTestBase::WaitForSafeTimeToAdvanceToNow(std::vector<NamespaceName
if (namespace_names.empty()) {
namespace_names = {namespace_name};
}
auto producer_master = VERIFY_RESULT(producer_cluster()->GetLeaderMiniMaster())->master();
HybridTime now = producer_master->clock()->Now();
HybridTime now = VERIFY_RESULT(producer_cluster()->GetLeaderMiniMaster())->Now();
for (auto ts : producer_cluster()->mini_tablet_servers()) {
now.MakeAtLeast(ts->server()->clock()->Now());
now.MakeAtLeast(ts->Now());
}

for (const auto& name : namespace_names) {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,10 @@ bool PersistentTableInfo::is_index() const {
return !indexed_table_id().empty();
}

bool PersistentTableInfo::is_vector_index() const {
return pb.index_info().has_vector_idx_options();
}

const std::string& PersistentTableInfo::indexed_table_id() const {
static const std::string kEmptyString;
return pb.has_index_info()
Expand Down
7 changes: 4 additions & 3 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ struct ExternalUDTypeSnapshotData {
typedef std::unordered_map<UDTypeId, ExternalUDTypeSnapshotData> UDTypeMap;

struct TableDescription {
scoped_refptr<NamespaceInfo> namespace_info;
scoped_refptr<TableInfo> table_info;
NamespaceInfoPtr namespace_info;
TableInfoPtr table_info;
TabletInfos tablet_infos;
};

Expand Down Expand Up @@ -452,9 +452,10 @@ struct PersistentTableInfo : public Persistent<SysTablesEntryPB> {
return pb.schema();
}

const std::string& indexed_table_id() const;
const TableId& indexed_table_id() const;

bool is_index() const;
bool is_vector_index() const;

SchemaPB* mutable_schema() {
return pb.mutable_schema();
Expand Down
34 changes: 18 additions & 16 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12255,7 +12255,12 @@ Status CatalogManager::CollectTable(
return Status::OK();
}
if (flags.Test(CollectFlag::kIncludeParentColocatedTable) && lock->pb.colocated()) {
if (!IsColocationParentTableId(table_description.table_info->id())) {
bool add_parent = !IsColocationParentTableId(table_description.table_info->id());
if (add_parent && lock->is_vector_index()) {
auto indexed_table = VERIFY_RESULT(FindTableById(lock->indexed_table_id()));
add_parent = indexed_table->LockForRead()->pb.colocated();
}
if (add_parent) {
// If a table is colocated, add its parent colocated table as well.
TableId parent_table_id = VERIFY_RESULT(
GetParentTableIdForColocatedTable(table_description.table_info));
Expand Down Expand Up @@ -12665,31 +12670,28 @@ Result<TableId> CatalogManager::GetParentTableIdForColocatedTable(
}

Result<TableId> CatalogManager::GetParentTableIdForColocatedTableUnlocked(
const scoped_refptr<TableInfo>& table) {
const TableInfoPtr& table) {
DCHECK(table->colocated());
DCHECK(!IsColocationParentTableId(table->id()));

auto ns_info = VERIFY_RESULT(FindNamespaceByIdUnlocked(table->namespace_id()));
TableId parent_table_id;
auto table_lock = table->LockForRead();
auto ns_info = VERIFY_RESULT(FindNamespaceByIdUnlocked(table_lock->namespace_id()));

auto tablegroup = tablegroup_manager_->FindByTable(table->id());
if (ns_info->colocated()) {
// Two types of colocated database: (1) pre-Colocation GA (2) Colocation GA
// Colocated databases created before Colocation GA don't use tablegroup
// to manage its colocated tables.
if (tablegroup)
parent_table_id = GetColocationParentTableId(tablegroup->id());
else
parent_table_id = GetColocatedDbParentTableId(table->namespace_id());
} else {
RSTATUS_DCHECK(tablegroup != nullptr,
Corruption,
Format("Not able to find the tablegroup for a colocated table $0 whose database "
"is not colocated.", table->id()));
parent_table_id = GetTablegroupParentTableId(tablegroup->id());
if (tablegroup) {
return GetColocationParentTableId(tablegroup->id());
}
return GetColocatedDbParentTableId(table_lock->namespace_id());
}

return parent_table_id;
RSTATUS_DCHECK(tablegroup != nullptr,
Corruption,
Format("Not able to find the tablegroup for a colocated table $0 whose database "
"is not colocated.", table->id()));
return GetTablegroupParentTableId(tablegroup->id());
}

Result<std::optional<cdc::ConsumerRegistryPB>> CatalogManager::GetConsumerRegistry() {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/master/mini_master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,5 +306,9 @@ FsManager& MiniMaster::fs_manager() const {
return *master_->fs_manager();
}

HybridTime MiniMaster::Now() const {
return master_->clock()->Now();
}

} // namespace master
} // namespace yb
4 changes: 4 additions & 0 deletions src/yb/master/mini_master.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include <string>
#include <vector>

#include "yb/common/hybrid_time.h"

#include "yb/gutil/macros.h"
#include "yb/gutil/port.h"

Expand Down Expand Up @@ -119,6 +121,8 @@ class MiniMaster {

std::string ToString() const;

HybridTime Now() const;

private:
Status StartDistributedMasterOnPorts(uint16_t rpc_port, uint16_t web_port,
const std::vector<uint16_t>& peer_ports);
Expand Down
4 changes: 2 additions & 2 deletions src/yb/qlexpr/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void IndexInfo::ToPB(IndexInfoPB* pb) const {
pb->mutable_where_predicate_spec()->CopyFrom(*where_predicate_spec_);
}

if (is_vector_idx()) {
if (is_vector_index()) {
*pb->mutable_vector_idx_options() = vector_idx_options();
}
}
Expand Down Expand Up @@ -282,7 +282,7 @@ size_t IndexInfo::DynamicMemoryUsage() const {
return size;
}

bool IndexInfo::is_vector_idx() const {
bool IndexInfo::is_vector_index() const {
return vector_idx_options_.has_value();
}

Expand Down
2 changes: 1 addition & 1 deletion src/yb/qlexpr/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class IndexInfo {

size_t DynamicMemoryUsage() const;

bool is_vector_idx() const;
bool is_vector_index() const;

const PgVectorIdxOptionsPB& vector_idx_options() const {
return *vector_idx_options_;
Expand Down
Loading

0 comments on commit 124135a

Please sign in to comment.