Skip to content

Commit

Permalink
Storage: Add error message when fail to build local index (pingcap#288)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Sep 25, 2024
1 parent 26c821f commit 84a1207
Show file tree
Hide file tree
Showing 21 changed files with 451 additions and 134 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ namespace DB
M(force_remote_read_for_batch_cop_once) \
M(exception_new_dynamic_thread) \
M(force_wait_index_timeout) \
M(force_local_index_task_memory_limit_exceeded) \
M(exception_build_local_index_for_file) \
M(force_not_support_vector_index) \
M(sync_schema_request_failure)

Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Index/IndexInfo.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
Expand Down Expand Up @@ -355,7 +355,7 @@ DeltaMergeStorePtr DeltaMergeStore::create(
settings_,
thread_pool);
std::shared_ptr<DeltaMergeStore> store_shared_ptr(store);
store_shared_ptr->checkAllSegmentsLocalIndex();
store_shared_ptr->checkAllSegmentsLocalIndex({});
return store_shared_ptr;
}

Expand Down Expand Up @@ -1979,20 +1979,20 @@ void DeltaMergeStore::applySchemaChanges(TableInfo & table_info)
void DeltaMergeStore::applyLocalIndexChange(const TiDB::TableInfo & new_table_info)
{
// Get a snapshot on the local_index_infos to check whether any new index is created
auto new_local_index_infos = generateLocalIndexInfos(getLocalIndexInfosSnapshot(), new_table_info, log);
auto changeset = generateLocalIndexInfos(getLocalIndexInfosSnapshot(), new_table_info, log);

// no index is created or dropped
if (!new_local_index_infos)
if (!changeset.new_local_index_infos)
return;

{
// new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot`
std::unique_lock index_write_lock(mtx_local_index_infos);
local_index_infos.swap(new_local_index_infos);
local_index_infos.swap(changeset.new_local_index_infos);
}

// generate async tasks for building local index for all segments
checkAllSegmentsLocalIndex();
checkAllSegmentsLocalIndex(std::move(changeset.dropped_indexes));
}

SortDescription DeltaMergeStore::getPrimarySortDescription() const
Expand Down
17 changes: 10 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeInterfaces.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Index/IndexInfo.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
Expand Down Expand Up @@ -72,6 +72,7 @@ using NotCompress = std::unordered_set<ColId>;
using SegmentIdSet = std::unordered_set<UInt64>;
struct ExternalDTFileInfo;
struct GCOptions;
struct LocalIndexBuildInfo;

namespace tests
{
Expand Down Expand Up @@ -186,6 +187,9 @@ struct LocalIndexStats
UInt64 rows_stable_not_indexed{}; // Total rows
UInt64 rows_delta_indexed{}; // Total rows
UInt64 rows_delta_not_indexed{}; // Total rows

// If the index is finally failed to be built, then this is not empty
String error_message{};
};
using LocalIndexesStats = std::vector<LocalIndexStats>;

Expand Down Expand Up @@ -723,11 +727,9 @@ class DeltaMergeStore
MergeDeltaReason reason,
SegmentSnapshotPtr segment_snap = nullptr);

void segmentEnsureStableIndex(
DMContext & dm_context,
const LocalIndexInfosPtr & index_info,
const DMFiles & dm_files,
const String & source_segment_info);
void segmentEnsureStableIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info);

void segmentEnsureStableIndexWithErrorReport(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info);

/**
* Ingest a DMFile into the segment, optionally causing a new segment being created.
Expand Down Expand Up @@ -870,8 +872,9 @@ class DeltaMergeStore

/**
* Check whether there are new local indexes should be built for all segments.
* If dropped_indexes is not empty, try to cleanup the dropped_indexes
*/
void checkAllSegmentsLocalIndex();
void checkAllSegmentsLocalIndex(std::vector<IndexID> && dropped_indexes);

/**
* Ensure the segment has stable index.
Expand Down
171 changes: 126 additions & 45 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
Expand All @@ -20,10 +21,12 @@
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <common/logger_useful.h>

#include <magic_enum.hpp>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <vector>


Expand Down Expand Up @@ -454,7 +457,7 @@ SegmentPtr DeltaMergeStore::segmentMerge(
return merged;
}

void DeltaMergeStore::checkAllSegmentsLocalIndex()
void DeltaMergeStore::checkAllSegmentsLocalIndex(std::vector<IndexID> && dropped_indexes)
{
if (!getLocalIndexInfosSnapshot())
{
Expand Down Expand Up @@ -519,6 +522,9 @@ void DeltaMergeStore::checkAllSegmentsLocalIndex()
for (const auto & [end, segment] : segments)
{
UNUSED(end);
// cleanup the index error messaage for dropped indexes
segment->clearIndexBuildError(dropped_indexes);

if (segmentEnsureStableIndexAsync(segment))
++segments_missing_indexes;
}
Expand All @@ -540,35 +546,61 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment)
return false;

// No lock is needed, stable meta is immutable.
auto dm_files = segment->getStable()->getDMFiles();
auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files);
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
const auto build_info
= DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles());
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty() || build_info.dm_files.empty())
return false;

auto store_weak_ptr = weak_from_this();
auto tracing_id = fmt::format("segmentEnsureStableIndex<{}>", log->identifier());
auto workload = [store_weak_ptr, build_info, dm_files, segment, tracing_id]() -> void {
auto tracing_id
= fmt::format("segmentEnsureStableIndex<{}> source_segment={}", log->identifier(), segment->simpleInfo());
auto workload = [store_weak_ptr, build_info, tracing_id]() -> void {
auto store = store_weak_ptr.lock();
if (store == nullptr) // Store is destroyed before the task is executed.
return;
auto dm_context = store->newDMContext( //
store->global_context,
store->global_context.getSettingsRef(),
tracing_id);
const auto source_segment_info = segment->simpleInfo();
store->segmentEnsureStableIndex(*dm_context, build_info.indexes_to_build, dm_files, source_segment_info);
store->segmentEnsureStableIndexWithErrorReport(*dm_context, build_info);
};

auto indexer_scheduler = global_context.getGlobalLocalIndexerScheduler();
RUNTIME_CHECK(indexer_scheduler != nullptr);
indexer_scheduler->pushTask(LocalIndexerScheduler::Task{
.keyspace_id = keyspace_id,
.table_id = physical_table_id,
.dmfile_ids = build_info.file_ids,
.request_memory = build_info.estimated_memory_bytes,
.workload = workload,
});
return true;
try
{
// new task of these index are generated, clear existing error_message in segment
segment->clearIndexBuildError(build_info.indexesIDs());

auto [ok, reason] = indexer_scheduler->pushTask(LocalIndexerScheduler::Task{
.keyspace_id = keyspace_id,
.table_id = physical_table_id,
.dmfile_ids = build_info.filesIDs(),
.request_memory = build_info.estimated_memory_bytes,
.workload = workload,
});
if (ok)
return true;

segment->setIndexBuildError(build_info.indexesIDs(), reason);
LOG_ERROR(
log->getChild(tracing_id),
"Failed to generate async segment stable index task, index_ids={} reason={}",
build_info.indexesIDs(),
reason);
return false;
}
catch (...)
{
const auto message = getCurrentExceptionMessage(false, false);
segment->setIndexBuildError(build_info.indexesIDs(), message);

tryLogCurrentException(log);

// catch and ignore the exception
// not able to push task to index scheduler
return false;
}
}

bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) const
Expand All @@ -581,8 +613,8 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co

// No lock is needed, stable meta is immutable.
auto segment_id = segment->segmentId();
auto dm_files = segment->getStable()->getDMFiles();
auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files);
auto build_info
= DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles());
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
return true;

Expand Down Expand Up @@ -657,11 +689,7 @@ SegmentPtr DeltaMergeStore::segmentUpdateMeta(
return new_segment;
}

void DeltaMergeStore::segmentEnsureStableIndex(
DMContext & dm_context,
const LocalIndexInfosPtr & index_info,
const DMFiles & dm_files,
const String & source_segment_info)
void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info)
{
// 1. Acquire a snapshot for PageStorage, and keep the snapshot until index is built.
// This helps keep DMFile valid during the index build process.
Expand All @@ -681,8 +709,10 @@ void DeltaMergeStore::segmentEnsureStableIndex(
dm_context.tracing_id,
/*snapshot_read*/ true);

RUNTIME_CHECK(dm_files.size() == 1); // size > 1 is currently not supported.
const auto & dm_file = dm_files[0];
auto tracing_logger = log->getChild(getLogTracingId(dm_context));

RUNTIME_CHECK(index_build_info.dm_files.size() == 1); // size > 1 is currently not supported.
const auto & dm_file = index_build_info.dm_files[0];

auto is_file_valid = [this, dm_file] {
std::shared_lock lock(read_write_mutex);
Expand All @@ -693,27 +723,23 @@ void DeltaMergeStore::segmentEnsureStableIndex(
// 2. Check whether the DMFile has been referenced by any valid segment.
if (!is_file_valid())
{
LOG_DEBUG(
log,
"EnsureStableIndex - Give up because no segment to update, source_segment={}",
source_segment_info);
LOG_DEBUG(tracing_logger, "EnsureStableIndex - Give up because no segment to update");
return;
}

LOG_INFO(
log,
"EnsureStableIndex - Begin building index, dm_files={} source_segment={}",
DMFile::info(dm_files),
source_segment_info);
tracing_logger,
"EnsureStableIndex - Begin building index, dm_files={}",
DMFile::info(index_build_info.dm_files));

// 2. Build the index.
DMFileIndexWriter iw(DMFileIndexWriter::Options{
.path_pool = path_pool,
.file_provider = dm_context.db_context.getFileProvider(),
.write_limiter = dm_context.getWriteLimiter(),
.disagg_ctx = dm_context.db_context.getSharedContextDisagg(),
.index_infos = index_info,
.dm_files = dm_files,
.index_infos = index_build_info.indexes_to_build,
.dm_files = index_build_info.dm_files,
.db_context = dm_context.db_context,
.is_common_handle = dm_context.is_common_handle,
.rowkey_column_size = dm_context.rowkey_column_size,
Expand All @@ -731,11 +757,9 @@ void DeltaMergeStore::segmentEnsureStableIndex(
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(
log,
"EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={} "
"source_segment={}",
DMFile::info(dm_files),
source_segment_info);
tracing_logger,
"EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={}",
DMFile::info(index_build_info.dm_files));
return;
}
throw;
Expand All @@ -744,10 +768,9 @@ void DeltaMergeStore::segmentEnsureStableIndex(
RUNTIME_CHECK(!new_dmfiles.empty());

LOG_INFO(
log,
"EnsureStableIndex - Finish building index, dm_files={} source_segment={}",
DMFile::info(dm_files),
source_segment_info);
tracing_logger,
"EnsureStableIndex - Finish building index, dm_files={}",
DMFile::info(index_build_info.dm_files));

// 3. Update the meta version of the segments to the latest one.
// To avoid logical split between step 2 and 3, get lastest segments to update again.
Expand All @@ -763,8 +786,66 @@ void DeltaMergeStore::segmentEnsureStableIndex(
auto segment = id_to_segment[seg_id];
auto new_segment = segmentUpdateMeta(lock, dm_context, segment, new_dmfiles);
// Expect update meta always success, because the segment must be valid and bump meta should succeed.
RUNTIME_CHECK_MSG(new_segment != nullptr, "Update meta failed for segment {}", segment->simpleInfo());
RUNTIME_CHECK_MSG(
new_segment != nullptr,
"Update meta failed for segment {} ident={}",
segment->simpleInfo(),
tracing_logger->identifier());
}
}
}

// A wrapper of `segmentEnsureStableIndex`
// If any exception thrown, the error message will be recorded to
// the related segment(s)
void DeltaMergeStore::segmentEnsureStableIndexWithErrorReport(
DMContext & dm_context,
const LocalIndexBuildInfo & index_build_info)
{
auto handle_error = [this, &index_build_info](const std::vector<IndexID> & index_ids) {
const auto message = getCurrentExceptionMessage(false, false);
std::unordered_map<PageIdU64, SegmentPtr> segment_to_add_msg;
{
std::unique_lock lock(read_write_mutex);
for (const auto & dmf : index_build_info.dm_files)
{
const auto segment_ids = dmfile_id_to_segment_ids.get(dmf->fileId());
for (const auto & seg_id : segment_ids)
{
if (segment_to_add_msg.contains(seg_id))
continue;
segment_to_add_msg.emplace(seg_id, id_to_segment[seg_id]);
}
}
}

for (const auto & [seg_id, seg] : segment_to_add_msg)
{
UNUSED(seg_id);
seg->setIndexBuildError(index_ids, message);
}
};

try
{
segmentEnsureStableIndex(dm_context, index_build_info);
}
catch (DB::Exception & e)
{
const auto index_ids = index_build_info.indexesIDs();
e.addMessage(fmt::format("while building stable index for index_ids={}", index_ids));
handle_error(index_ids);

// rethrow
throw;
}
catch (...)
{
const auto index_ids = index_build_info.indexesIDs();
handle_error(index_ids);

// rethrow
throw;
}
}

Expand Down
Loading

0 comments on commit 84a1207

Please sign in to comment.