Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancel in AsyncTasks #8581

Merged
merged 24 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region,
fap_ctx->cleanCheckpointIngestInfo(tmt, new_region->id());
}
// Another FAP will not take place if this stage is not finished.
if (fap_ctx->tasks_trace->discardTask(new_region->id()))
if (fap_ctx->tasks_trace->leakingDiscardTask(new_region->id()))
{
LOG_ERROR(log, "FastAddPeer: find old fap task, region_id={}", new_region->id());
}
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,16 @@ FastAddPeerRes FastAddPeerImpl(
UInt64 new_peer_id,
UInt64 start_time)
{
auto log = Logger::get("FastAddPeer");
try
{
auto elapsed = fap_ctx->tasks_trace->queryElapsed(region_id);
auto maybe_elapsed = fap_ctx->tasks_trace->queryElapsed(region_id);
if unlikely (!maybe_elapsed.has_value())
{
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
LOG_INFO(log, "FAP is canceled at beginning region_id={} new_peer_id={}", region_id, new_peer_id);
}
Comment on lines +343 to +348
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what situation will we run into this branch?

Copy link
Member Author

@CalvinNeo CalvinNeo Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a cancel happens in addTaskWithCancel between if (cancel_handle->canceled()), and the inner invokable object of (*p)() actually called(after the addTaskWithCancel quitted and release the lock).

It is very slightly chance.

auto elapsed = maybe_elapsed.value();
GET_METRIC(tiflash_fap_task_duration_seconds, type_queue_stage).Observe(elapsed / 1000.0);
GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Decrement();
auto res = FastAddPeerImplSelect(tmt, proxy_helper, region_id, new_peer_id);
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <Common/FmtUtils.h>
#include <Common/Stopwatch.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <Interpreters/Context.h>
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,4 +524,10 @@ const RegionState & RegionMeta::getRegionState() const
std::lock_guard lock(mutex);
return region_state;
}

RegionState & RegionMeta::debugMutRegionState()
{
std::lock_guard lock(mutex);
return region_state;
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class RegionMeta
const raft_serverpb::MergeState & getMergeState() const;
raft_serverpb::MergeState cloneMergeState() const;
const RegionState & getRegionState() const;
RegionState & debugMutRegionState();
RegionMeta clone() const
{
std::lock_guard lock(mutex);
Expand Down
67 changes: 35 additions & 32 deletions dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Region : public std::enable_shared_from_this<Region>
std::unique_lock<std::shared_mutex> lock; // A unique_lock so that we can safely remove committed data.
};

public:
public: // Simple Read and Write
explicit Region(RegionMeta && meta_);
explicit Region(RegionMeta && meta_, const TiFlashRaftProxyHelper *);
~Region();
Expand All @@ -129,12 +129,13 @@ class Region : public std::enable_shared_from_this<Region>
// Directly drop all data in this Region object.
void clearAllData();

CommittedScanner createCommittedScanner(bool use_lock, bool need_value);
CommittedRemover createCommittedRemover(bool use_lock = true);
void mergeDataFrom(const Region & other);
RegionMeta & mutMeta() { return meta; }

std::tuple<size_t, UInt64> serialize(WriteBuffer & buf) const;
static RegionPtr deserialize(ReadBuffer & buf, const TiFlashRaftProxyHelper * proxy_helper = nullptr);
// Assign data and meta by moving from `new_region`.
void assignRegion(Region && new_region);

public: // Stats
RegionID id() const;
ImutRegionRangePtr getRange() const;

Expand All @@ -161,6 +162,9 @@ class Region : public std::enable_shared_from_this<Region>
std::pair<UInt64, UInt64> getRaftLogEagerGCRange() const;
void updateRaftLogEagerIndex(UInt64 new_truncate_index);

std::tuple<size_t, UInt64> serialize(WriteBuffer & buf) const;
static RegionPtr deserialize(ReadBuffer & buf, const TiFlashRaftProxyHelper * proxy_helper = nullptr);

friend bool operator==(const Region & region1, const Region & region2)
{
std::shared_lock<std::shared_mutex> lock1(region1.mutex);
Expand All @@ -169,15 +173,6 @@ class Region : public std::enable_shared_from_this<Region>
return region1.meta == region2.meta && region1.data == region2.data;
}

// Check if we can read by this index.
bool checkIndex(UInt64 index) const;
// Return <WaitIndexStatus, time cost(seconds)> for wait-index.
std::tuple<WaitIndexStatus, double> waitIndex(
UInt64 index,
UInt64 timeout_ms,
std::function<bool(void)> && check_running,
const LoggerPtr & log);

// Requires RegionMeta's lock
UInt64 appliedIndex() const;
// Requires RegionMeta's lock
Expand All @@ -189,10 +184,33 @@ class Region : public std::enable_shared_from_this<Region>
RegionVersion version() const;
RegionVersion confVer() const;

RegionMetaSnapshot dumpRegionMetaSnapshot() const;
TableID getMappedTableID() const;
KeyspaceID getKeyspaceID() const;

// Assign data and meta by moving from `new_region`.
void assignRegion(Region && new_region);
/// get approx rows, bytes info about mem cache.
std::pair<size_t, size_t> getApproxMemCacheInfo() const;
void cleanApproxMemCacheInfo() const;

// Check the raftstore cluster version of this region.
// Currently, all version in the same TiFlash store should be the same.
RaftstoreVer getClusterRaftstoreVer();
RegionData::OrphanKeysInfo & orphanKeysInfo() { return data.orphan_keys_info; }
const RegionData::OrphanKeysInfo & orphanKeysInfo() const { return data.orphan_keys_info; }

public: // Raft Read and Write
CommittedScanner createCommittedScanner(bool use_lock, bool need_value);
CommittedRemover createCommittedRemover(bool use_lock = true);

// Check if we can read by this index.
bool checkIndex(UInt64 index) const;
// Return <WaitIndexStatus, time cost(seconds)> for wait-index.
std::tuple<WaitIndexStatus, double> waitIndex(
UInt64 index,
UInt64 timeout_ms,
std::function<bool(void)> && check_running,
const LoggerPtr & log);

RegionMetaSnapshot dumpRegionMetaSnapshot() const;

void tryCompactionFilter(Timestamp safe_point);

Expand All @@ -202,36 +220,21 @@ class Region : public std::enable_shared_from_this<Region>
raft_serverpb::MergeState cloneMergeState() const;
const raft_serverpb::MergeState & getMergeState() const;

TableID getMappedTableID() const;
KeyspaceID getKeyspaceID() const;
std::pair<EngineStoreApplyRes, DM::WriteResult> handleWriteRaftCmd(
const WriteCmdsView & cmds,
UInt64 index,
UInt64 term,
TMTContext & tmt);

/// get approx rows, bytes info about mem cache.
std::pair<size_t, size_t> getApproxMemCacheInfo() const;
void cleanApproxMemCacheInfo() const;

RegionMeta & mutMeta() { return meta; }

UInt64 getSnapshotEventFlag() const { return snapshot_event_flag; }

// IngestSST will first be applied to the `temp_region`, then we need to
// copy the key-values from `temp_region` and move forward the `index` and `term`
void finishIngestSSTByDTFile(RegionPtr && temp_region, UInt64 index, UInt64 term);

// Check the raftstore cluster version of this region.
// Currently, all version in the same TiFlash store should be the same.
RaftstoreVer getClusterRaftstoreVer();
// Methods to handle orphan keys under raftstore v2.
void beforePrehandleSnapshot(uint64_t region_id, std::optional<uint64_t> deadline_index);
void afterPrehandleSnapshot(int64_t ongoing);
RegionData::OrphanKeysInfo & orphanKeysInfo() { return data.orphan_keys_info; }
const RegionData::OrphanKeysInfo & orphanKeysInfo() const { return data.orphan_keys_info; }

void mergeDataFrom(const Region & other);

Region() = delete;

Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/KVStore/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ uint64_t TMTContext::readIndexWorkerTick() const
return read_index_worker_tick_ms.load(std::memory_order_relaxed);
}

void TMTContext::debugSetKVStore(const KVStorePtr & new_kvstore)
{
kvstore = new_kvstore;
}

const std::string & IntoStoreStatusName(TMTContext::StoreStatus status)
{
static const std::string StoreStatusName[] = {
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TMTContext : private boost::noncopyable
public:
const KVStorePtr & getKVStore() const;
KVStorePtr & getKVStore();
void debugSetKVStore(const KVStorePtr & new_kvstore) { kvstore = new_kvstore; }
void debugSetKVStore(const KVStorePtr &);

const ManagedStorages & getStorages() const;
ManagedStorages & getStorages();
Expand Down
Loading