Skip to content

Commit

Permalink
Support cancel in AsyncTasks (#8581)
Browse files Browse the repository at this point in the history
close #8382
  • Loading branch information
CalvinNeo authored Dec 26, 2023
1 parent ca9152e commit 03c3c49
Show file tree
Hide file tree
Showing 19 changed files with 700 additions and 142 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region,
fap_ctx->cleanCheckpointIngestInfo(tmt, new_region->id());
}
// Another FAP will not take place if this stage is not finished.
if (fap_ctx->tasks_trace->discardTask(new_region->id()))
if (fap_ctx->tasks_trace->leakingDiscardTask(new_region->id()))
{
LOG_ERROR(log, "FastAddPeer: find old fap task, region_id={}", new_region->id());
}
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,16 @@ FastAddPeerRes FastAddPeerImpl(
UInt64 new_peer_id,
UInt64 start_time)
{
auto log = Logger::get("FastAddPeer");
try
{
auto elapsed = fap_ctx->tasks_trace->queryElapsed(region_id);
auto maybe_elapsed = fap_ctx->tasks_trace->queryElapsed(region_id);
if unlikely (!maybe_elapsed.has_value())
{
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
LOG_INFO(log, "FAP is canceled at beginning region_id={} new_peer_id={}", region_id, new_peer_id);
}
auto elapsed = maybe_elapsed.value();
GET_METRIC(tiflash_fap_task_duration_seconds, type_queue_stage).Observe(elapsed / 1000.0);
GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Decrement();
auto res = FastAddPeerImplSelect(tmt, proxy_helper, region_id, new_peer_id);
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <Common/FmtUtils.h>
#include <Common/Stopwatch.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <Interpreters/Context.h>
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,4 +524,10 @@ const RegionState & RegionMeta::getRegionState() const
std::lock_guard lock(mutex);
return region_state;
}

RegionState & RegionMeta::debugMutRegionState()
{
std::lock_guard lock(mutex);
return region_state;
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class RegionMeta
const raft_serverpb::MergeState & getMergeState() const;
raft_serverpb::MergeState cloneMergeState() const;
const RegionState & getRegionState() const;
RegionState & debugMutRegionState();
RegionMeta clone() const
{
std::lock_guard lock(mutex);
Expand Down
67 changes: 35 additions & 32 deletions dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Region : public std::enable_shared_from_this<Region>
std::unique_lock<std::shared_mutex> lock; // A unique_lock so that we can safely remove committed data.
};

public:
public: // Simple Read and Write
explicit Region(RegionMeta && meta_);
explicit Region(RegionMeta && meta_, const TiFlashRaftProxyHelper *);
~Region();
Expand All @@ -129,12 +129,13 @@ class Region : public std::enable_shared_from_this<Region>
// Directly drop all data in this Region object.
void clearAllData();

CommittedScanner createCommittedScanner(bool use_lock, bool need_value);
CommittedRemover createCommittedRemover(bool use_lock = true);
void mergeDataFrom(const Region & other);
RegionMeta & mutMeta() { return meta; }

std::tuple<size_t, UInt64> serialize(WriteBuffer & buf) const;
static RegionPtr deserialize(ReadBuffer & buf, const TiFlashRaftProxyHelper * proxy_helper = nullptr);
// Assign data and meta by moving from `new_region`.
void assignRegion(Region && new_region);

public: // Stats
RegionID id() const;
ImutRegionRangePtr getRange() const;

Expand All @@ -161,6 +162,9 @@ class Region : public std::enable_shared_from_this<Region>
std::pair<UInt64, UInt64> getRaftLogEagerGCRange() const;
void updateRaftLogEagerIndex(UInt64 new_truncate_index);

std::tuple<size_t, UInt64> serialize(WriteBuffer & buf) const;
static RegionPtr deserialize(ReadBuffer & buf, const TiFlashRaftProxyHelper * proxy_helper = nullptr);

friend bool operator==(const Region & region1, const Region & region2)
{
std::shared_lock<std::shared_mutex> lock1(region1.mutex);
Expand All @@ -169,15 +173,6 @@ class Region : public std::enable_shared_from_this<Region>
return region1.meta == region2.meta && region1.data == region2.data;
}

// Check if we can read by this index.
bool checkIndex(UInt64 index) const;
// Return <WaitIndexStatus, time cost(seconds)> for wait-index.
std::tuple<WaitIndexStatus, double> waitIndex(
UInt64 index,
UInt64 timeout_ms,
std::function<bool(void)> && check_running,
const LoggerPtr & log);

// Requires RegionMeta's lock
UInt64 appliedIndex() const;
// Requires RegionMeta's lock
Expand All @@ -189,10 +184,33 @@ class Region : public std::enable_shared_from_this<Region>
RegionVersion version() const;
RegionVersion confVer() const;

RegionMetaSnapshot dumpRegionMetaSnapshot() const;
TableID getMappedTableID() const;
KeyspaceID getKeyspaceID() const;

// Assign data and meta by moving from `new_region`.
void assignRegion(Region && new_region);
/// get approx rows, bytes info about mem cache.
std::pair<size_t, size_t> getApproxMemCacheInfo() const;
void cleanApproxMemCacheInfo() const;

// Check the raftstore cluster version of this region.
// Currently, all version in the same TiFlash store should be the same.
RaftstoreVer getClusterRaftstoreVer();
RegionData::OrphanKeysInfo & orphanKeysInfo() { return data.orphan_keys_info; }
const RegionData::OrphanKeysInfo & orphanKeysInfo() const { return data.orphan_keys_info; }

public: // Raft Read and Write
CommittedScanner createCommittedScanner(bool use_lock, bool need_value);
CommittedRemover createCommittedRemover(bool use_lock = true);

// Check if we can read by this index.
bool checkIndex(UInt64 index) const;
// Return <WaitIndexStatus, time cost(seconds)> for wait-index.
std::tuple<WaitIndexStatus, double> waitIndex(
UInt64 index,
UInt64 timeout_ms,
std::function<bool(void)> && check_running,
const LoggerPtr & log);

RegionMetaSnapshot dumpRegionMetaSnapshot() const;

void tryCompactionFilter(Timestamp safe_point);

Expand All @@ -202,36 +220,21 @@ class Region : public std::enable_shared_from_this<Region>
raft_serverpb::MergeState cloneMergeState() const;
const raft_serverpb::MergeState & getMergeState() const;

TableID getMappedTableID() const;
KeyspaceID getKeyspaceID() const;
std::pair<EngineStoreApplyRes, DM::WriteResult> handleWriteRaftCmd(
const WriteCmdsView & cmds,
UInt64 index,
UInt64 term,
TMTContext & tmt);

/// get approx rows, bytes info about mem cache.
std::pair<size_t, size_t> getApproxMemCacheInfo() const;
void cleanApproxMemCacheInfo() const;

RegionMeta & mutMeta() { return meta; }

UInt64 getSnapshotEventFlag() const { return snapshot_event_flag; }

// IngestSST will first be applied to the `temp_region`, then we need to
// copy the key-values from `temp_region` and move forward the `index` and `term`
void finishIngestSSTByDTFile(RegionPtr && temp_region, UInt64 index, UInt64 term);

// Check the raftstore cluster version of this region.
// Currently, all version in the same TiFlash store should be the same.
RaftstoreVer getClusterRaftstoreVer();
// Methods to handle orphan keys under raftstore v2.
void beforePrehandleSnapshot(uint64_t region_id, std::optional<uint64_t> deadline_index);
void afterPrehandleSnapshot(int64_t ongoing);
RegionData::OrphanKeysInfo & orphanKeysInfo() { return data.orphan_keys_info; }
const RegionData::OrphanKeysInfo & orphanKeysInfo() const { return data.orphan_keys_info; }

void mergeDataFrom(const Region & other);

Region() = delete;

Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/KVStore/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ uint64_t TMTContext::readIndexWorkerTick() const
return read_index_worker_tick_ms.load(std::memory_order_relaxed);
}

void TMTContext::debugSetKVStore(const KVStorePtr & new_kvstore)
{
kvstore = new_kvstore;
}

const std::string & IntoStoreStatusName(TMTContext::StoreStatus status)
{
static const std::string StoreStatusName[] = {
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TMTContext : private boost::noncopyable
public:
const KVStorePtr & getKVStore() const;
KVStorePtr & getKVStore();
void debugSetKVStore(const KVStorePtr & new_kvstore) { kvstore = new_kvstore; }
void debugSetKVStore(const KVStorePtr &);

const ManagedStorages & getStorages() const;
ManagedStorages & getStorages();
Expand Down
Loading

0 comments on commit 03c3c49

Please sign in to comment.