Skip to content

Commit

Permalink
storage: Optimize Vector Index (pingcap#232)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <breezewish@outlook.com>
Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com>
  • Loading branch information
breezewish and Lloyd-Pottiger committed Sep 20, 2024
1 parent 19cecc1 commit 8568116
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 41 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,6 @@
[submodule "contrib/simsimd"]
path = contrib/simsimd
url = https://github.com/ashvardanian/SimSIMD
[submodule "contrib/highfive"]
path = contrib/highfive
url = https://github.com/BlueBrain/HighFive
4 changes: 4 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,7 @@ add_subdirectory(fastpforlib)
add_subdirectory(usearch-cmake)

add_subdirectory(simsimd-cmake)

add_subdirectory(hdf5-cmake)

add_subdirectory(highfive-cmake)
1 change: 1 addition & 0 deletions contrib/hdf5-cmake/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/download/*
41 changes: 41 additions & 0 deletions contrib/hdf5-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
include(ExternalProject)

# hdf5 is too large. Instead of adding as a submodule, let's simply download from GitHub.
ExternalProject_Add(hdf5-external
PREFIX ${CMAKE_CURRENT_BINARY_DIR}
DOWNLOAD_DIR ${TiFlash_SOURCE_DIR}/contrib/hdf5-cmake/download
URL https://github.com/HDFGroup/hdf5/archive/refs/tags/hdf5_1.14.4.3.zip
URL_HASH MD5=bc987d22e787290127aacd7b99b4f31e
CMAKE_ARGS
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX=<INSTALL_DIR>
-DBUILD_STATIC_LIBS=ON
-DBUILD_SHARED_LIBS=OFF
-DBUILD_TESTING=OFF
-DHDF5_BUILD_HL_LIB=OFF
-DHDF5_BUILD_TOOLS=OFF
-DHDF5_BUILD_CPP_LIB=ON
-DHDF5_BUILD_EXAMPLES=OFF
-DHDF5_ENABLE_Z_LIB_SUPPORT=OFF
-DHDF5_ENABLE_SZIP_SUPPORT=OFF
BUILD_BYPRODUCTS <INSTALL_DIR>/lib/${CMAKE_FIND_LIBRARY_PREFIXES}hdf5.a # Workaround for Ninja
USES_TERMINAL_DOWNLOAD TRUE
USES_TERMINAL_CONFIGURE TRUE
USES_TERMINAL_BUILD TRUE
USES_TERMINAL_INSTALL TRUE
EXCLUDE_FROM_ALL TRUE
DOWNLOAD_EXTRACT_TIMESTAMP TRUE
)

ExternalProject_Get_Property(hdf5-external INSTALL_DIR)

add_library(tiflash_contrib::hdf5 STATIC IMPORTED GLOBAL)
set_target_properties(tiflash_contrib::hdf5 PROPERTIES
IMPORTED_LOCATION ${INSTALL_DIR}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}hdf5.a
)
add_dependencies(tiflash_contrib::hdf5 hdf5-external)

file(MAKE_DIRECTORY ${INSTALL_DIR}/include)
target_include_directories(tiflash_contrib::hdf5 SYSTEM INTERFACE
${INSTALL_DIR}/include
)
1 change: 1 addition & 0 deletions contrib/highfive
Submodule highfive added at 0d0259
18 changes: 18 additions & 0 deletions contrib/highfive-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
set(HIGHFIVE_PROJECT_DIR "${TiFlash_SOURCE_DIR}/contrib/highfive")
set(HIGHFIVE_SOURCE_DIR "${HIGHFIVE_PROJECT_DIR}/include")

if (NOT EXISTS "${HIGHFIVE_SOURCE_DIR}/highfive/highfive.hpp")
message (FATAL_ERROR "submodule contrib/highfive not found")
endif()

add_library(_highfive INTERFACE)

target_include_directories(_highfive SYSTEM INTERFACE
${HIGHFIVE_SOURCE_DIR}
)

target_link_libraries(_highfive INTERFACE
tiflash_contrib::hdf5
)

add_library(tiflash_contrib::highfive ALIAS _highfive)
13 changes: 12 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,18 @@ if (ENABLE_TESTS)
)
target_include_directories(bench_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR} ${benchmark_SOURCE_DIR}/include)
target_compile_definitions(bench_dbms PUBLIC DBMS_PUBLIC_GTEST)
target_link_libraries(bench_dbms gtest dbms test_util_bench_main benchmark tiflash_functions server_for_test delta_merge kvstore tiflash_aggregate_functions)
target_link_libraries(bench_dbms
gtest
benchmark
tiflash_contrib::highfive

dbms
test_util_bench_main
tiflash_functions
server_for_test
delta_merge
tiflash_aggregate_functions
kvstore)

add_check(bench_dbms)
endif ()
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ class BitmapFilterView
RUNTIME_CHECK(filter_offset + filter_size <= filter->size(), filter_offset, filter_size, filter->size());
}

/**
* @brief Create a BitmapFilter and construct a BitmapFilterView with it.
* Should be only used in tests.
*/
static BitmapFilterView createWithFilter(UInt32 size, bool default_value)
{
return BitmapFilterView(std::make_shared<BitmapFilter>(size, default_value), 0, size);
}

// Caller should ensure n in [0, size).
inline bool get(UInt32 n) const { return filter->get(filter_offset + n); }

inline bool operator[](UInt32 n) const { return get(n); }

inline UInt32 size() const { return filter_size; }

inline UInt32 offset() const { return filter_offset; }
Expand Down
51 changes: 40 additions & 11 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ extern const Metric DT_SnapshotOfSegmentIngest;
extern const Metric DT_SnapshotOfSegmentIngestIndex;
} // namespace CurrentMetrics

namespace DB::ErrorCodes
{
extern const int ABORTED;
}

namespace DB::DM
{

Expand Down Expand Up @@ -673,18 +678,20 @@ void DeltaMergeStore::segmentEnsureStableIndex(
RUNTIME_CHECK(dm_files.size() == 1); // size > 1 is currently not supported.
const auto & dm_file = dm_files[0];

// 2. Check whether the DMFile has been referenced by any valid segment.
{
auto is_file_valid = [this, dm_file] {
std::shared_lock lock(read_write_mutex);
auto segment_ids = dmfile_id_to_segment_ids.get(dm_file->fileId());
if (segment_ids.empty())
{
LOG_DEBUG(
log,
"EnsureStableIndex - Give up because no segment to update, source_segment={}",
source_segment_info);
return;
}
return !segment_ids.empty();
};

// 2. Check whether the DMFile has been referenced by any valid segment.
if (!is_file_valid())
{
LOG_DEBUG(
log,
"EnsureStableIndex - Give up because no segment to update, source_segment={}",
source_segment_info);
return;
}

LOG_INFO(
Expand All @@ -700,7 +707,29 @@ void DeltaMergeStore::segmentEnsureStableIndex(
.dm_files = dm_files,
.dm_context = dm_context,
});
auto new_dmfiles = iw.build();

DMFiles new_dmfiles{};

try
{
// When file is not valid we need to abort the index build.
new_dmfiles = iw.build(is_file_valid);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(
log,
"EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={} "
"source_segment={}",
DMFile::info(dm_files),
source_segment_info);
return;
}
throw;
}

RUNTIME_CHECK(!new_dmfiles.empty());

LOG_INFO(
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFile.h>
Expand All @@ -22,6 +23,11 @@
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/PathPool.h>

namespace DB::ErrorCodes
{
extern const int ABORTED;
}

namespace DB::DM
{

Expand Down Expand Up @@ -63,7 +69,7 @@ DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo
return build;
}

size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) const
size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, ProceedCheckFn should_proceed) const
{
const auto column_defines = dm_file_mutable->getColumnDefines();
const auto del_cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [](const ColumnDefine & cd) {
Expand Down Expand Up @@ -128,6 +134,9 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) c
// Read all blocks and build index
while (true)
{
if (!should_proceed())
throw Exception(ErrorCodes::ABORTED, "Index build is interrupted");

auto block = read_stream->read();
if (!block)
break;
Expand All @@ -146,7 +155,7 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) c
const auto & col_with_type_and_name = block.safeGetByPosition(col_idx + 1);
RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns[col_idx + 1].id);
const auto & col = col_with_type_and_name.column;
index_builder->addBlock(*col, del_mark);
index_builder->addBlock(*col, del_mark, should_proceed);
}
}

Expand Down Expand Up @@ -187,7 +196,7 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) c
return total_built_index_bytes;
}

DMFiles DMFileIndexWriter::build() const
DMFiles DMFileIndexWriter::build(ProceedCheckFn should_proceed) const
{
RUNTIME_CHECK(!built);
// Create a clone of existing DMFile instances by using DMFile::restore,
Expand All @@ -214,7 +223,7 @@ DMFiles DMFileIndexWriter::build() const

for (const auto & cloned_dmfile : cloned_dm_files)
{
auto index_bytes = buildIndexForFile(cloned_dmfile);
auto index_bytes = buildIndexForFile(cloned_dmfile, should_proceed);
if (auto data_store = options.dm_context.global_context.getSharedContextDisagg()->remote_data_store;
!data_store)
{
Expand Down
13 changes: 10 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,23 @@ class DMFileIndexWriter
const DMContext & dm_context;
};

using ProceedCheckFn = std::function<bool()>;

explicit DMFileIndexWriter(const Options & options)
: logger(Logger::get())
, options(options)
{}

// Note: This method can only be called once.
DMFiles build() const;
// Note: You cannot call build() multiple times, as duplicate meta version will result in exceptions.
DMFiles build(ProceedCheckFn should_proceed) const;

DMFiles build() const
{
return build([]() { return true; });
}

private:
size_t buildIndexForFile(const DMFilePtr & dm_file_mutable) const;
size_t buildIndexForFile(const DMFilePtr & dm_file_mutable, ProceedCheckFn should_proceed) const;

private:
const LoggerPtr logger;
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/Index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class VectorIndexBuilder
/// The key is the row's offset in the DMFile.
using Key = UInt32;

using ProceedCheckFn = std::function<bool()>;

public:
static VectorIndexBuilderPtr create(const TiDB::VectorIndexDefinitionPtr & definition);

Expand All @@ -47,7 +49,11 @@ class VectorIndexBuilder

virtual ~VectorIndexBuilder() = default;

virtual void addBlock(const IColumn & column, const ColumnVector<UInt8> * del_mark) = 0;
virtual void addBlock( //
const IColumn & column,
const ColumnVector<UInt8> * del_mark,
ProceedCheckFn should_proceed)
= 0;

virtual void save(std::string_view path) const = 0;

Expand Down Expand Up @@ -80,6 +86,8 @@ class VectorIndexViewer
// Invalid rows in `valid_rows` will be discared when applying the search
virtual std::vector<Key> search(const ANNQueryInfoPtr & queryInfo, const RowFilter & valid_rows) const = 0;

virtual size_t size() const = 0;

// Get the value (i.e. vector content) of a Key.
virtual void get(Key key, std::vector<Float32> & out) const = 0;

Expand Down
Loading

0 comments on commit 8568116

Please sign in to comment.