Skip to content

Commit

Permalink
FAP: Avoid doing IO with latch (#8439)
Browse files Browse the repository at this point in the history
ref #8382
  • Loading branch information
JaySon-Huang authored Nov 30, 2023
1 parent e5eaf31 commit a4b6535
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 121 deletions.
93 changes: 35 additions & 58 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,64 +28,31 @@
namespace DB
{

static constexpr uint8_t FAP_INGEST_INFO_PERSIST_FMT_VER = 1;

CheckpointIngestInfoPtr CheckpointIngestInfo::restore(
TMTContext & tmt,
const TiFlashRaftProxyHelper * proxy_helper,
UInt64 region_id,
UInt64 peer_id)
{
auto ptr = std::shared_ptr<CheckpointIngestInfo>(new CheckpointIngestInfo(tmt, region_id, peer_id));
if (!ptr->loadFromLocal(proxy_helper))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Failed to restore CheckpointIngestInfo, region_id={} peer_id={} store_id={}",
region_id,
peer_id,
tmt.getKVStore()->getStoreID(std::memory_order_relaxed));
}
return ptr;
}

CheckpointIngestInfo::CheckpointIngestInfo(TMTContext & tmt_, UInt64 region_id_, UInt64 peer_id_)
: tmt(tmt_)
, region_id(region_id_)
, peer_id(peer_id_)
, remote_store_id(0)
, begin_time(0)
{
log = DB::Logger::get("CheckpointIngestInfo");
}

DM::Segments CheckpointIngestInfo::getRestoredSegments() const
{
return restored_segments;
}

UInt64 CheckpointIngestInfo::getRemoteStoreId() const
{
return remote_store_id;
}

RegionPtr CheckpointIngestInfo::getRegion() const
{
return region;
}
StoreID remote_store_id;
RegionPtr region;
DM::Segments restored_segments;

static constexpr uint8_t FAP_INGEST_INFO_PERSIST_FMT_VER = 1;

bool CheckpointIngestInfo::loadFromLocal(const TiFlashRaftProxyHelper * proxy_helper)
{
auto uni_ps = tmt.getContext().getWriteNodePageStorage();
auto snapshot = uni_ps->getSnapshot(fmt::format("read_fap_i_{}", region_id));
auto page_id
= UniversalPageIdFormat::toLocalKVPrefix(UniversalPageIdFormat::LocalKVKeyType::FAPIngestInfo, region_id);
Page page = uni_ps->read(page_id, nullptr, snapshot, /*throw_on_not_exist*/ false);

if (!page.isValid())
{
LOG_ERROR(log, "Can't read from CheckpointIngestInfo, page_id={} region_id={}", page_id, region_id);
return false;
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Failed to restore CheckpointIngestInfo, region_id={} peer_id={} store_id={}",
region_id,
peer_id,
tmt.getKVStore()->getStoreID(std::memory_order_relaxed));
}
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
RUNTIME_CHECK_MSG(readBinary2<UInt8>(buf) == FAP_INGEST_INFO_PERSIST_FMT_VER, "wrong fap ingest info format");
Expand All @@ -107,6 +74,7 @@ bool CheckpointIngestInfo::loadFromLocal(const TiFlashRaftProxyHelper * proxy_he
auto table_id = region->getMappedTableID();
auto storage = storages.get(keyspace_id, table_id);

auto log = DB::Logger::get("CheckpointIngestInfo");
if (storage && storage->engineType() == TiDB::StorageEngine::DT)
{
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
Expand All @@ -128,10 +96,18 @@ bool CheckpointIngestInfo::loadFromLocal(const TiFlashRaftProxyHelper * proxy_he
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "unsupported storage engine");
}
return true;

return std::make_shared<CheckpointIngestInfo>(
tmt,
region_id,
peer_id,
remote_store_id,
region,
std::move(restored_segments),
/*begin_time_*/ 0);
}

void CheckpointIngestInfo::persistToLocal()
void CheckpointIngestInfo::persistToLocal() const
{
if (region->isPendingRemove())
{
Expand All @@ -150,7 +126,7 @@ void CheckpointIngestInfo::persistToLocal()
{
size_t segment_data_size = 0;
segment_data_size += writeBinary2(restored_segments.size(), wb_buffer);
for (auto & restored_segment : restored_segments)
for (const auto & restored_segment : restored_segments)
{
data_size += writeBinary2(restored_segment->segmentId(), wb_buffer);
}
Expand Down Expand Up @@ -189,15 +165,10 @@ void CheckpointIngestInfo::persistToLocal()
region->getDebugString());
}

void CheckpointIngestInfo::removeFromLocal(TMTContext & tmt, UInt64 region_id, UInt64 peer_id, UInt64 remote_store_id)
void CheckpointIngestInfo::removeFromLocal(TMTContext & tmt, UInt64 region_id)
{
auto log = DB::Logger::get();
LOG_INFO(
log,
"Erase CheckpointIngestInfo from disk, region_id={} peer_id={} remote_store_id={}",
region_id,
peer_id,
remote_store_id);
LOG_INFO(log, "Erase CheckpointIngestInfo from disk, region_id={}", region_id);
auto uni_ps = tmt.getContext().getWriteNodePageStorage();
UniversalWriteBatch del_batch;
del_batch.delPage(
Expand All @@ -206,20 +177,26 @@ void CheckpointIngestInfo::removeFromLocal(TMTContext & tmt, UInt64 region_id, U
}

// Like removeFromLocal, but is static and with check.
bool CheckpointIngestInfo::forciblyClean(TMTContext & tmt, UInt64 region_id)
bool CheckpointIngestInfo::forciblyClean(TMTContext & tmt, UInt64 region_id, bool pre_check)
{
if (!pre_check)
{
CheckpointIngestInfo::removeFromLocal(tmt, region_id);
return true;
}

auto uni_ps = tmt.getContext().getWriteNodePageStorage();
auto snapshot = uni_ps->getSnapshot(fmt::format("read_fap_i_{}", region_id));
auto page_id
= UniversalPageIdFormat::toLocalKVPrefix(UniversalPageIdFormat::LocalKVKeyType::FAPIngestInfo, region_id);
// For most cases, ingest infos are deleted in `removeFromLocal`.
Page page = uni_ps->read(page_id, nullptr, snapshot, /*throw_on_not_exist*/ false);
if unlikely (page.isValid())
if (unlikely(page.isValid()))
{
CheckpointIngestInfo::removeFromLocal(tmt, region_id, 0, 0);
CheckpointIngestInfo::removeFromLocal(tmt, region_id);
return true;
}
return false;
}

} // namespace DB
} // namespace DB
43 changes: 18 additions & 25 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/Logger.h>
#include <common/types.h>

#include <memory>
Expand All @@ -38,19 +39,16 @@ using CheckpointIngestInfoPtr = std::shared_ptr<CheckpointIngestInfo>;
struct TiFlashRaftProxyHelper;
class FastAddPeerContext;

// Safety: The class is immutable once created.
struct CheckpointIngestInfo
{
// Safety: The class is immutable once created.

// Get segments from memory or restore from ps.
DM::Segments getRestoredSegments() const;
UInt64 getRemoteStoreId() const;
RegionPtr getRegion() const;
DM::Segments getRestoredSegments() const { return restored_segments; }
UInt64 getRemoteStoreId() const { return remote_store_id; }
RegionPtr getRegion() const { return region; }
UInt64 regionId() const { return region_id; }
UInt64 peerId() const { return peer_id; }
UInt64 beginTime() const { return begin_time; }

// Create from build
CheckpointIngestInfo(
TMTContext & tmt_,
UInt64 region_id_,
Expand All @@ -66,10 +64,8 @@ struct CheckpointIngestInfo
, region(region_)
, restored_segments(std::move(restored_segments_))
, begin_time(begin_time_)
{
log = DB::Logger::get("CheckpointIngestInfo");
persistToLocal();
}
, log(DB::Logger::get("CheckpointIngestInfo"))
{}

static CheckpointIngestInfoPtr restore(
TMTContext & tmt,
Expand All @@ -78,27 +74,24 @@ struct CheckpointIngestInfo
UInt64 peer_id);

// Only call to clean dangling CheckpointIngestInfo.
static bool forciblyClean(TMTContext & tmt, UInt64 region_id);
static bool forciblyClean(TMTContext & tmt, UInt64 region_id, bool pre_check = true);

private:
friend class FastAddPeerContext;
// Create from restore
CheckpointIngestInfo(TMTContext & tmt_, UInt64 region_id_, UInt64 peer_id_);
bool loadFromLocal(const TiFlashRaftProxyHelper * proxy_helper);
// Safety: raftstore ensures a region is handled in a single thread.
// `persistToLocal` is called at a fixed place in this thread.
void persistToLocal();
static void removeFromLocal(TMTContext & tmt, UInt64 region_id, UInt64 peer_id, UInt64 remote_store_id);
void persistToLocal() const;
static void removeFromLocal(TMTContext & tmt, UInt64 region_id);

private:
TMTContext & tmt;
UInt64 region_id = 0;
UInt64 peer_id = 0;
UInt64 remote_store_id;
RegionPtr region;
DM::Segments restored_segments;
// If restarted, `beginTime` is no longer meaningful.
UInt64 begin_time;
const UInt64 region_id = 0;
const UInt64 peer_id = 0;
const UInt64 remote_store_id;
const RegionPtr region;
const DM::Segments restored_segments;
// If restored, `beginTime` is no longer meaningful.
const UInt64 begin_time;
DB::LoggerPtr log;
};
} // namespace DB
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ void ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helpe
GET_METRIC(tiflash_fap_task_duration_seconds, type_ingest_stage).Observe(watch_ingest.elapsedSeconds());
auto begin = checkpoint_ingest_info->beginTime();
auto current = FAPAsyncTasks::getCurrentMillis();
if (begin)
if (begin != 0)
{
GET_METRIC(tiflash_fap_task_duration_seconds, type_total).Observe((current - begin) / 1000.0);
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class FastAddPeerContext
DM::Segments && segments,
UInt64 start_time);
void debugRemoveCheckpointIngestInfo(UInt64 region_id);
std::optional<CheckpointIngestInfoPtr> tryGetCheckpointIngestInfo(UInt64 region_id);
std::optional<CheckpointIngestInfoPtr> tryGetCheckpointIngestInfo(UInt64 region_id) const;
void cleanCheckpointIngestInfo(TMTContext & tmt, UInt64 region_id);

public:
Expand Down Expand Up @@ -96,7 +96,7 @@ class FastAddPeerContext
std::unordered_map<UInt64, std::pair<UInt64, CheckpointCacheElementPtr>> checkpoint_cache_map;

// Checkpoint that is persisted, but yet to be ingested into DeltaTree.
std::mutex ingest_info_mu;
mutable std::mutex ingest_info_mu;
// RegionID->CheckpointIngestInfoPtr
std::unordered_map<UInt64, CheckpointIngestInfoPtr> checkpoint_ingest_info_map;

Expand Down
77 changes: 42 additions & 35 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,19 @@ CheckpointIngestInfoPtr FastAddPeerContext::getOrRestoreCheckpointIngestInfo(
UInt64 region_id,
UInt64 peer_id)
{
std::scoped_lock<std::mutex> lock(ingest_info_mu);
auto iter = checkpoint_ingest_info_map.find(region_id);
if (iter != checkpoint_ingest_info_map.end())
{
return iter->second;
std::scoped_lock<std::mutex> lock(ingest_info_mu);
if (auto iter = checkpoint_ingest_info_map.find(region_id); //
iter != checkpoint_ingest_info_map.end())
{
return iter->second;
}
}
else
{
// The caller ensure there is no concurrency operation on the same region_id so
// that we can call restore without locking `ingest_info_mu`
auto info = CheckpointIngestInfo::restore(tmt, proxy_helper, region_id, peer_id);
std::scoped_lock<std::mutex> lock(ingest_info_mu);
checkpoint_ingest_info_map.emplace(region_id, info);
return info;
}
Expand All @@ -140,26 +144,23 @@ void FastAddPeerContext::cleanCheckpointIngestInfo(TMTContext & tmt, UInt64 regi
{
// TODO(fap) We can move checkpoint_ingest_info to a dedicated queue, and schedule a timed task to clean it, if this costs much.
// However, we have to make sure the clean task will not override if a new fap snapshot of the same region comes later.
bool pre_check = true;
{
// If it's still managed by fap context.
std::scoped_lock<std::mutex> lock(ingest_info_mu);
auto iter = checkpoint_ingest_info_map.find(region_id);
if (iter != checkpoint_ingest_info_map.end())
{
CheckpointIngestInfo::removeFromLocal(
tmt,
region_id,
iter->second->peerId(),
iter->second->getRemoteStoreId());
iter->second->forciblyClean(tmt, region_id);
checkpoint_ingest_info_map.erase(region_id);
return;
// the ingest info exist, do not need to check again later
pre_check = false;
checkpoint_ingest_info_map.erase(iter);
}
}
CheckpointIngestInfo::forciblyClean(tmt, region_id);
// clean without locking `ingest_info_mu`
CheckpointIngestInfo::forciblyClean(tmt, region_id, pre_check);
}

std::optional<CheckpointIngestInfoPtr> FastAddPeerContext::tryGetCheckpointIngestInfo(UInt64 region_id)
std::optional<CheckpointIngestInfoPtr> FastAddPeerContext::tryGetCheckpointIngestInfo(UInt64 region_id) const
{
std::scoped_lock<std::mutex> lock(ingest_info_mu);
auto it = checkpoint_ingest_info_map.find(region_id);
Expand All @@ -177,29 +178,35 @@ void FastAddPeerContext::insertCheckpointIngestInfo(
DM::Segments && segments,
UInt64 start_time)
{
std::scoped_lock<std::mutex> lock(ingest_info_mu);
if unlikely (checkpoint_ingest_info_map.contains(region_id))
std::shared_ptr<CheckpointIngestInfo> info;
{
// 1. Two fap task of a same snapshot take place in parallel, not possible.
// 2. A previous fap task recovered from disk, while a new fap task is ongoing, not possible.
// 3. A previous fap task finished with result attached to `checkpoint_ingest_info_map`, however, the ingest stage failed to be triggered/handled due to some check in proxy's part. It could be possible.
LOG_ERROR(
log,
"Repeated ingest for region_id={} peer_id={} old_peer_id={}",
std::scoped_lock<std::mutex> lock(ingest_info_mu);
if (auto iter = checkpoint_ingest_info_map.find(region_id); unlikely(iter != checkpoint_ingest_info_map.end()))
{
// 1. Two fap task of a same snapshot take place in parallel, not possible.
// 2. A previous fap task recovered from disk, while a new fap task is ongoing, not possible.
// 3. A previous fap task finished with result attached to `checkpoint_ingest_info_map`, however, the ingest stage failed to be triggered/handled due to some check in proxy's part. It could be possible.
LOG_ERROR(
log,
"Repeated ingest for region_id={} peer_id={} old_peer_id={}",
region_id,
peer_id,
iter->second->peerId());
GET_METRIC(tiflash_fap_task_result, type_failed_repeated).Increment();
}

info = std::make_shared<CheckpointIngestInfo>(
tmt,
region_id,
peer_id,
checkpoint_ingest_info_map[region_id]->peerId());
GET_METRIC(tiflash_fap_task_result, type_failed_repeated).Increment();
remote_store_id,
region,
std::move(segments),
start_time);
checkpoint_ingest_info_map[region_id] = info;
}

checkpoint_ingest_info_map[region_id] = std::make_shared<CheckpointIngestInfo>(
tmt,
region_id,
peer_id,
remote_store_id,
region,
std::move(segments),
start_time);
// persist without locking on `ingest_info_mu`
info->persistToLocal();
}

} // namespace DB
} // namespace DB

0 comments on commit a4b6535

Please sign in to comment.