Skip to content

Commit

Permalink
storage: Optimize Vector Index (pingcap#232)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <breezewish@outlook.com>
Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com>
  • Loading branch information
breezewish and Lloyd-Pottiger authored Jul 8, 2024
1 parent 157c6ed commit fce6ea2
Show file tree
Hide file tree
Showing 21 changed files with 511 additions and 40 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,6 @@
[submodule "contrib/simsimd"]
path = contrib/simsimd
url = https://github.com/ashvardanian/SimSIMD
[submodule "contrib/highfive"]
path = contrib/highfive
url = https://github.com/BlueBrain/HighFive
4 changes: 4 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,7 @@ add_subdirectory(aws-cmake)
add_subdirectory(usearch-cmake)

add_subdirectory(simsimd-cmake)

add_subdirectory(hdf5-cmake)

add_subdirectory(highfive-cmake)
1 change: 1 addition & 0 deletions contrib/hdf5-cmake/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/download/*
40 changes: 40 additions & 0 deletions contrib/hdf5-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
include(ExternalProject)

# hdf5 is too large. Instead of adding as a submodule, let's simply download from GitHub.
ExternalProject_Add(hdf5-external
PREFIX ${CMAKE_CURRENT_BINARY_DIR}
DOWNLOAD_DIR ${TiFlash_SOURCE_DIR}/contrib/hdf5-cmake/download
URL https://github.com/HDFGroup/hdf5/archive/refs/tags/hdf5_1.14.4.3.zip
URL_HASH MD5=bc987d22e787290127aacd7b99b4f31e
CMAKE_ARGS
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX=<INSTALL_DIR>
-DBUILD_STATIC_LIBS=ON
-DBUILD_SHARED_LIBS=OFF
-DBUILD_TESTING=OFF
-DHDF5_BUILD_HL_LIB=OFF
-DHDF5_BUILD_TOOLS=OFF
-DHDF5_BUILD_CPP_LIB=ON
-DHDF5_BUILD_EXAMPLES=OFF
-DHDF5_ENABLE_Z_LIB_SUPPORT=OFF
-DHDF5_ENABLE_SZIP_SUPPORT=OFF
BUILD_BYPRODUCTS <INSTALL_DIR>/lib/${CMAKE_FIND_LIBRARY_PREFIXES}hdf5.a # Workaround for Ninja
USES_TERMINAL_DOWNLOAD TRUE
USES_TERMINAL_CONFIGURE TRUE
USES_TERMINAL_BUILD TRUE
USES_TERMINAL_INSTALL TRUE
EXCLUDE_FROM_ALL TRUE
)

ExternalProject_Get_Property(hdf5-external INSTALL_DIR)

add_library(tiflash_contrib::hdf5 STATIC IMPORTED GLOBAL)
set_target_properties(tiflash_contrib::hdf5 PROPERTIES
IMPORTED_LOCATION ${INSTALL_DIR}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}hdf5.a
)
add_dependencies(tiflash_contrib::hdf5 hdf5-external)

file(MAKE_DIRECTORY ${INSTALL_DIR}/include)
target_include_directories(tiflash_contrib::hdf5 SYSTEM INTERFACE
${INSTALL_DIR}/include
)
1 change: 1 addition & 0 deletions contrib/highfive
Submodule highfive added at 0d0259
18 changes: 18 additions & 0 deletions contrib/highfive-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
set(HIGHFIVE_PROJECT_DIR "${TiFlash_SOURCE_DIR}/contrib/highfive")
set(HIGHFIVE_SOURCE_DIR "${HIGHFIVE_PROJECT_DIR}/include")

if (NOT EXISTS "${HIGHFIVE_SOURCE_DIR}/highfive/highfive.hpp")
message (FATAL_ERROR "submodule contrib/highfive not found")
endif()

add_library(_highfive INTERFACE)

target_include_directories(_highfive SYSTEM INTERFACE
${HIGHFIVE_SOURCE_DIR}
)

target_link_libraries(_highfive INTERFACE
tiflash_contrib::hdf5
)

add_library(tiflash_contrib::highfive ALIAS _highfive)
12 changes: 11 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,17 @@ if (ENABLE_TESTS)
)
target_include_directories(bench_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR} ${benchmark_SOURCE_DIR}/include)
target_compile_definitions(bench_dbms PUBLIC DBMS_PUBLIC_GTEST)
target_link_libraries(bench_dbms gtest dbms test_util_bench_main benchmark tiflash_functions server_for_test delta_merge kvstore)
target_link_libraries(bench_dbms
gtest
benchmark
tiflash_contrib::highfive

dbms
test_util_bench_main
tiflash_functions
server_for_test
delta_merge
kvstore)

add_check(bench_dbms)
endif ()
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ class BitmapFilterView
RUNTIME_CHECK(filter_offset + filter_size <= filter->size(), filter_offset, filter_size, filter->size());
}

/**
* @brief Create a BitmapFilter and construct a BitmapFilterView with it.
* Should be only used in tests.
*/
static BitmapFilterView createWithFilter(UInt32 size, bool default_value)
{
return BitmapFilterView(std::make_shared<BitmapFilter>(size, default_value), 0, size);
}

inline bool get(UInt32 n) const
{
RUNTIME_CHECK(n < filter_size);
Expand Down
51 changes: 40 additions & 11 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ extern const Metric DT_SnapshotOfSegmentIngest;
extern const Metric DT_SnapshotOfSegmentIngestIndex;
} // namespace CurrentMetrics

namespace DB::ErrorCodes
{
extern const int ABORTED;
}

namespace DB
{
namespace DM
Expand Down Expand Up @@ -680,18 +685,20 @@ void DeltaMergeStore::segmentEnsureStableIndex(
RUNTIME_CHECK(dm_files.size() == 1); // size > 1 is currently not supported.
const auto & dm_file = dm_files[0];

// 2. Check whether the DMFile has been referenced by any valid segment.
{
auto is_file_valid = [this, dm_file] {
std::shared_lock lock(read_write_mutex);
auto segment_ids = dmfile_id_to_segment_ids.get(dm_file->fileId());
if (segment_ids.empty())
{
LOG_DEBUG(
log,
"EnsureStableIndex - Give up because no segment to update, source_segment={}",
source_segment_info);
return;
}
return !segment_ids.empty();
};

// 2. Check whether the DMFile has been referenced by any valid segment.
if (!is_file_valid())
{
LOG_DEBUG(
log,
"EnsureStableIndex - Give up because no segment to update, source_segment={}",
source_segment_info);
return;
}

LOG_INFO(
Expand All @@ -712,7 +719,29 @@ void DeltaMergeStore::segmentEnsureStableIndex(
.is_common_handle = dm_context.is_common_handle,
.rowkey_column_size = dm_context.rowkey_column_size,
});
auto new_dmfiles = iw.build();

DMFiles new_dmfiles{};

try
{
// When file is not valid we need to abort the index build.
new_dmfiles = iw.build(is_file_valid);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(
log,
"EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={} "
"source_segment={}",
DMFile::info(dm_files),
source_segment_info);
return;
}
throw;
}

RUNTIME_CHECK(!new_dmfiles.empty());

LOG_INFO(
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFile.h>
Expand All @@ -21,6 +22,11 @@
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/PathPool.h>

namespace DB::ErrorCodes
{
extern const int ABORTED;
}

namespace DB::DM
{

Expand Down Expand Up @@ -64,7 +70,7 @@ DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo
return build;
}

void DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) const
void DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, ProceedCheckFn should_proceed) const
{
const auto column_defines = dm_file_mutable->getColumnDefines();
const auto del_cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [](const ColumnDefine & cd) {
Expand Down Expand Up @@ -132,6 +138,9 @@ void DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) con
// Read all blocks and build index
while (true)
{
if (!should_proceed())
throw Exception(ErrorCodes::ABORTED, "Index build is interrupted");

auto block = read_stream->read();
if (!block)
break;
Expand All @@ -150,7 +159,7 @@ void DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) con
const auto & col_with_type_and_name = block.safeGetByPosition(col_idx + 1);
RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns[col_idx + 1].id);
const auto & col = col_with_type_and_name.column;
index_builder->addBlock(*col, del_mark);
index_builder->addBlock(*col, del_mark, should_proceed);
}
}

Expand Down Expand Up @@ -188,7 +197,7 @@ void DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) con
iw->finalize(); // Note: There may be S3 uploads here.
}

DMFiles DMFileIndexWriter::build() const
DMFiles DMFileIndexWriter::build(ProceedCheckFn should_proceed) const
{
// Create a clone of existing DMFile instances by using DMFile::restore,
// because later we will mutate some fields and persist these mutations.
Expand All @@ -215,7 +224,7 @@ DMFiles DMFileIndexWriter::build() const

for (const auto & cloned_dmfile : cloned_dm_files)
{
buildIndexForFile(cloned_dmfile);
buildIndexForFile(cloned_dmfile, should_proceed);
// TODO: including the new index bytes in the file size.
// auto res = dm_context.path_pool->getStableDiskDelegator().updateDTFileSize(
// new_dmfile->fileId(),
Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,24 @@ class DMFileIndexWriter
const size_t rowkey_column_size;
};

using ProceedCheckFn = std::function<bool()>;

explicit DMFileIndexWriter(const Options & options)
: logger(Logger::get())
, options(options)
{}

// Note: You cannot call build() multiple times, as duplicate meta version will result in exceptions.
// TODO: Add a better guard.
DMFiles build() const;
DMFiles build(ProceedCheckFn should_proceed) const;

DMFiles build() const
{
return build([]() { return true; });
}

private:
void buildIndexForFile(const DMFilePtr & dm_file_mutable) const;
void buildIndexForFile(const DMFilePtr & dm_file_mutable, ProceedCheckFn should_proceed) const;

private:
const LoggerPtr logger;
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <Storages/DeltaMerge/DMChecksumConfig.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>

namespace DB
{
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Storages/DeltaMerge/Index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/DeltaMerge/File/dtpb/dmfile.pb.h>
#include <Storages/DeltaMerge/Index/VectorIndex_fwd.h>
#include <TiDB/Schema/VectorIndex.h>

namespace DB::DM
{

Expand All @@ -33,6 +34,8 @@ class VectorIndexBuilder
/// The key is the row's offset in the DMFile.
using Key = UInt32;

using ProceedCheckFn = std::function<bool()>;

public:
static VectorIndexBuilderPtr create(const TiDB::VectorIndexDefinitionPtr & definition);

Expand All @@ -45,7 +48,11 @@ class VectorIndexBuilder

virtual ~VectorIndexBuilder() = default;

virtual void addBlock(const IColumn & column, const ColumnVector<UInt8> * del_mark) = 0;
virtual void addBlock( //
const IColumn & column,
const ColumnVector<UInt8> * del_mark,
ProceedCheckFn should_proceed)
= 0;

virtual void save(std::string_view path) const = 0;

Expand Down Expand Up @@ -80,6 +87,8 @@ class VectorIndexViewer
const RowFilter & valid_rows) const
= 0;

virtual size_t size() const = 0;

// Get the value (i.e. vector content) of a Key.
virtual void get(Key key, std::vector<Float32> & out) const = 0;

Expand Down
Loading

0 comments on commit fce6ea2

Please sign in to comment.