Skip to content

Commit

Permalink
Storage: Support multiple vec indexes on the same column (pingcap#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Sep 6, 2024
1 parent b7b4618 commit c04bddb
Show file tree
Hide file tree
Showing 39 changed files with 1,142 additions and 282 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
run: |
chmod +x /tmp/gtests_dbms
cd /tmp
/tmp/gtests_dbms --gtest_filter="Vector*:FileCacheTest*:DeltaMergeStoreVectorTest*" 2>/tmp/gtests_dbms.log
/tmp/gtests_dbms --gtest_filter="Vector*:FileCacheTest*:DeltaMergeStoreVectorTest*:SchemaSyncTest*:LocalIndexInfoTest*:LocalDMFile*" 2>/tmp/gtests_dbms.log
- name: Detailed Test Log
if: failure()
Expand Down
67 changes: 49 additions & 18 deletions dbms/src/Storages/DeltaMerge/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>
#include <Storages/KVStore/Types.h>

namespace DB
{
namespace DM
namespace DB::DM
{
struct ColumnStat
{
ColId col_id;
DataTypePtr type;
// The average size of values. A hint for speeding up deserialize.
double avg_size;
// The serialized size of the column data on disk.
// The serialized size of the column data on disk. (including column data and nullmap)
size_t serialized_bytes = 0;

// These members are only useful when using metav2
Expand All @@ -41,9 +40,9 @@ struct ColumnStat
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;

std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;
std::vector<dtpb::VectorIndexFileProps> vector_index;

String additional_data_for_test{};
String additional_data_for_test;

dtpb::ColumnStat toProto() const
{
Expand All @@ -60,8 +59,11 @@ struct ColumnStat
stat.set_array_sizes_bytes(array_sizes_bytes);
stat.set_array_sizes_mark_bytes(array_sizes_mark_bytes);

if (vector_index.has_value())
stat.mutable_vector_index()->CopyFrom(vector_index.value());
for (const auto & vec_idx : vector_index)
{
auto * pb_idx = stat.add_vector_indexes();
pb_idx->CopyFrom(vec_idx);
}

stat.set_additional_data_for_test(additional_data_for_test);

Expand All @@ -83,13 +85,27 @@ struct ColumnStat
array_sizes_mark_bytes = proto.array_sizes_mark_bytes();

if (proto.has_vector_index())
vector_index = proto.vector_index();
{
// For backward compatibility, loaded `vector_index` into `vector_indexes`
// with index_id == EmptyIndexID
vector_index.emplace_back(proto.vector_index());
auto & idx = vector_index.back();
idx.set_index_id(EmptyIndexID);
idx.set_index_bytes(index_bytes);
}
vector_index.reserve(vector_index.size() + proto.vector_indexes_size());
for (const auto & pb_idx : proto.vector_indexes())
{
vector_index.emplace_back(pb_idx);
}

additional_data_for_test = proto.additional_data_for_test();
}

// @deprecated. New fields should be added via protobuf. Use `toProto` instead
void serializeToBuffer(WriteBuffer & buf) const
// New fields should be added via protobuf. Use `toProto` instead
[[deprecated("Use ColumnStat::toProto instead")]] //
void
serializeToBuffer(WriteBuffer & buf) const
{
writeIntBinary(col_id, buf);
writeStringBinary(type->getName(), buf);
Expand All @@ -102,8 +118,10 @@ struct ColumnStat
writeIntBinary(index_bytes, buf);
}

// @deprecated. This only presents for reading with old data. Use `mergeFromProto` instead
void parseFromBuffer(ReadBuffer & buf)
// This only presents for reading with old data. Use `mergeFromProto` instead
[[deprecated("Use ColumnStat::mergeFromProto instead")]] //
void
parseFromBuffer(ReadBuffer & buf)
{
readIntBinary(col_id, buf);
String type_name;
Expand All @@ -121,7 +139,9 @@ struct ColumnStat

using ColumnStats = std::unordered_map<ColId, ColumnStat>;

inline void readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadBuffer & buf)
[[deprecated("Used by DMFileMeta v1. Use ColumnStat::mergeFromProto instead")]] //
inline void
readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadBuffer & buf)
{
DataTypeFactory & data_type_factory = DataTypeFactory::instance();

Expand Down Expand Up @@ -149,11 +169,23 @@ inline void readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadB
DB::assertChar('\n', buf);

auto type = data_type_factory.getOrSet(type_name);
column_sats.emplace(id, ColumnStat{id, type, avg_size, serialized_bytes});
column_sats.emplace(
id,
ColumnStat{
.col_id = id,
.type = type,
.avg_size = avg_size,
.serialized_bytes = serialized_bytes,
// ... here ignore some fields with default initializers
.vector_index = {},
.additional_data_for_test = {},
});
}
}

inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver, WriteBuffer & buf)
[[deprecated("Used by DMFileMeta v1. Use ColumnStat::toProto instead")]] //
inline void
writeText(const ColumnStats & column_sats, DMFileFormat::Version ver, WriteBuffer & buf)
{
DB::writeString("Columns: ", buf);
DB::writeText(column_sats.size(), buf);
Expand All @@ -176,5 +208,4 @@ inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver
}
}

} // namespace DM
} // namespace DB
} // namespace DB::DM
12 changes: 4 additions & 8 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1967,24 +1967,20 @@ void DeltaMergeStore::applySchemaChanges(TableInfo & table_info)
original_table_columns.swap(new_original_table_columns);
store_columns.swap(new_store_columns);

// copy the local_index_infos to check whether any new index is created
LocalIndexInfosPtr local_index_infos_copy = nullptr;
{
std::shared_lock index_read_lock(mtx_local_index_infos);
local_index_infos_copy = std::shared_ptr<LocalIndexInfos>(local_index_infos);
}
// Get a snapshot on the local_index_infos to check whether any new index is created
LocalIndexInfosSnapshot local_index_infos_snap = getLocalIndexInfosSnapshot();

std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));

// release the lock because `applyLocalIndexChange` will try to acquire the lock
// and generate tasks on segments
lock.unlock();

auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_copy, table_info, log);
auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_snap, table_info, log);
if (new_local_index_infos)
{
{
// new index created, update the info in-memory
// new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot`
std::unique_lock index_write_lock(mtx_local_index_infos);
local_index_infos.swap(new_local_index_infos);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ struct StoreStats

struct LocalIndexStats
{
String column_name{};
UInt64 column_id{};
UInt64 index_id{};
String index_kind{};
Expand Down Expand Up @@ -857,13 +856,16 @@ class DeltaMergeStore
const SegmentPtr & old_segment,
const SegmentPtr & new_segment);

// Get a snap of local_index_infos to check whether any new index is created.
LocalIndexInfosPtr getLocalIndexInfosSnapshot() const
// Get a snap of local_index_infos for checking.
// Note that this is just a shallow copy of `local_index_infos`, do not
// modify the local indexes inside the snapshot.
LocalIndexInfosSnapshot getLocalIndexInfosSnapshot() const
{
std::shared_lock index_read_lock(mtx_local_index_infos);
if (!local_index_infos || local_index_infos->empty())
return nullptr;
return std::make_shared<LocalIndexInfos>(*local_index_infos);
// only make a shallow copy on the shared_ptr is OK
return local_index_infos;
}

/**
Expand Down
15 changes: 6 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ void DeltaMergeStore::checkAllSegmentsLocalIndex()
}
LOG_INFO(
log,
"CheckAllSegmentsLocalIndex - Finish, updated_meta={}, elapsed={}",
"CheckAllSegmentsLocalIndex - Finish, updated_meta={}, elapsed={:.3f}s",
segments_updated_meta,
watch.elapsedSeconds());
}
Expand Down Expand Up @@ -535,7 +535,6 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment)
{
RUNTIME_CHECK(segment != nullptr);

// TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return false;
Expand Down Expand Up @@ -576,7 +575,6 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co
{
RUNTIME_CHECK(segment != nullptr);

// TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return true;
Expand Down Expand Up @@ -604,12 +602,11 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co
bool all_indexes_built = true;
for (const auto & index : *build_info.indexes_to_build)
{
auto col_id = index.column_id;
// The dmfile may be built before col_id is added. Skip build indexes for it
if (!dmfile->isColumnExist(col_id))
continue;

all_indexes_built = all_indexes_built && dmfile->getColumnStat(col_id).index_bytes > 0;
const auto [state, bytes] = dmfile->getLocalIndexState(index.column_id, index.index_id);
UNUSED(bytes);
all_indexes_built = all_indexes_built
// dmfile built before the column_id added or index already built
&& (state == DMFileMeta::LocalIndexState::NoNeed || state == DMFileMeta::LocalIndexState::IndexBuilt);
}
if (all_indexes_built)
return true;
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
LocalIndexStats index_stats;
index_stats.column_id = index_info.column_id;
index_stats.index_id = index_info.index_id;
index_stats.column_name = index_info.column_name;
index_stats.index_kind = "HNSW"; // TODO: Support more.

for (const auto & [handle, segment] : segments)
Expand All @@ -221,13 +220,14 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
bool is_stable_indexed = true;
for (const auto & dmfile : stable->getDMFiles())
{
if (!dmfile->isColumnExist(index_info.column_id))
continue; // Regard as indexed, because column does not need any index

auto column_stat = dmfile->getColumnStat(index_info.column_id);

if (column_stat.index_bytes == 0 && column_stat.data_bytes > 0)
const auto [state, bytes] = dmfile->getLocalIndexState(index_info.column_id, index_info.index_id);
UNUSED(bytes);
switch (state)
{
case DMFileMeta::LocalIndexState::NoNeed: // Regard as indexed, because column does not need any index
case DMFileMeta::LocalIndexState::IndexBuilt:
break;
case DMFileMeta::LocalIndexState::IndexPending:
is_stable_indexed = false;
break;
}
Expand Down
32 changes: 29 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

#pragma once

#include <DataTypes/IDataType.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFileMetaV2.h>
#include <Storages/DeltaMerge/File/DMFileUtil.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h>
#include <Storages/DeltaMerge/File/dtpb/dmfile.pb.h>
#include <Storages/FormatVersion.h>
Expand Down Expand Up @@ -126,8 +129,6 @@ class DMFile : private boost::noncopyable
size_t getPacks() const { return meta->pack_stats.size(); }
const DMFileMeta::PackStats & getPackStats() const { return meta->pack_stats; }
const DMFileMeta::PackProperties & getPackProperties() const { return meta->pack_properties; }
const ColumnStats & getColumnStats() const { return meta->column_stats; }
const std::unordered_set<ColId> & getColumnIndices() const { return meta->column_indices; }

// only used in gtest
void clearPackProperties() const { meta->pack_properties.clear_property(); }
Expand All @@ -142,6 +143,29 @@ class DMFile : private boost::noncopyable
}
bool isColumnExist(ColId col_id) const { return meta->column_stats.contains(col_id); }

std::tuple<DMFileMeta::LocalIndexState, size_t> getLocalIndexState(ColId col_id, IndexID index_id) const
{
return meta->getLocalIndexState(col_id, index_id);
}

// Check whether the local index of given col_id and index_id has been built on this dmfile.
// Return false if
// - the col_id is not exist in the dmfile
// - the index has not been built
bool isLocalIndexExist(ColId col_id, IndexID index_id) const
{
return std::get<0>(meta->getLocalIndexState(col_id, index_id)) == DMFileMeta::LocalIndexState::IndexBuilt;
}

// Try to get the local index of given col_id and index_id.
// Return std::nullopt if
// - the col_id is not exist in the dmfile
// - the index has not been built
std::optional<dtpb::VectorIndexFileProps> getLocalIndex(ColId col_id, IndexID index_id) const
{
return meta->getLocalIndex(col_id, index_id);
}

/*
* TODO: This function is currently unused. We could use it when:
* 1. The content is polished (e.g. including at least file ID, and use a format easy for grep).
Expand Down Expand Up @@ -177,7 +201,6 @@ class DMFile : private boost::noncopyable
void switchToRemote(const S3::DMFileOID & oid) const;

UInt64 metaVersion() const { return meta->metaVersion(); }
UInt64 bumpMetaVersion() const { return meta->bumpMetaVersion(); }

#ifndef DBMS_PUBLIC_GTEST
private:
Expand Down Expand Up @@ -273,6 +296,9 @@ class DMFile : private boost::noncopyable
return IDataType::getFileNameForStream(DB::toString(col_id), substream);
}

static String vectorIndexFileName(IndexID index_id) { return fmt::format("idx_{}.vector", index_id); }
String vectorIndexPath(IndexID index_id) const { return subFilePath(vectorIndexFileName(index_id)); }

void addPack(const DMFileMeta::PackStat & pack_stat) const { meta->pack_stats.push_back(pack_stat); }

DMFileStatus getStatus() const { return meta->status; }
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::build2(
bool is_matching_ann_query = false;
for (const auto & cd : read_columns)
{
// Note that it requires ann_query_info->column_id match
if (cd.id == ann_query_info->column_id())
{
is_matching_ann_query = true;
Expand Down Expand Up @@ -167,8 +168,10 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::build2(

RUNTIME_CHECK(rest_columns.size() + 1 == read_columns.size(), rest_columns.size(), read_columns.size());

const auto & vec_column_stat = dmfile->getColumnStat(vec_column->id);
if (vec_column_stat.index_bytes == 0 || !vec_column_stat.vector_index.has_value())
const IndexID ann_query_info_index_id = ann_query_info->index_id() > 0 //
? ann_query_info->index_id()
: EmptyIndexID;
if (!dmfile->isLocalIndexExist(vec_column->id, ann_query_info_index_id))
// Vector index is defined but does not exist on the data file,
// or there is no data at all
return fallback();
Expand Down
Loading

0 comments on commit c04bddb

Please sign in to comment.