From 88e1ac44e014b62d60e8753b6d6fa54b909bef75 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Wed, 4 Sep 2019 16:04:02 +0800 Subject: [PATCH 1/4] finish --- dbms/src/Storages/Transaction/KVStore.cpp | 31 ++++++++---- dbms/src/Storages/Transaction/KVStore.h | 17 +++++++ .../Storages/Transaction/RaftCommandResult.h | 2 +- dbms/src/Storages/Transaction/Region.cpp | 50 ++++++++++--------- dbms/src/Storages/Transaction/Region.h | 37 ++++++++++---- dbms/src/Storages/Transaction/RegionManager.h | 21 ++++++-- dbms/src/Storages/Transaction/RegionMeta.cpp | 19 +++++-- dbms/src/Storages/Transaction/RegionMeta.h | 27 +++++++--- dbms/src/Storages/Transaction/RegionTable.cpp | 10 +++- dbms/src/Storages/Transaction/RegionTable.h | 1 + dbms/src/Storages/Transaction/TMTContext.cpp | 1 + .../Storages/Transaction/applySnapshot.cpp | 2 +- 12 files changed, 150 insertions(+), 68 deletions(-) diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 2215d24e5ad..197751449a5 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -15,7 +15,9 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir, region_manager), log(&Logger::get("KVStore")) {} +KVStore::KVStore(const std::string & data_dir) + : region_persister(data_dir, region_manager), raft_cmd_res(std::make_unique()), log(&Logger::get("KVStore")) +{} void KVStore::restore(const RegionClientCreateFunc & region_client_create) { @@ -46,15 +48,20 @@ RegionPtr KVStore::getRegion(const RegionID region_id) const return nullptr; } -RegionManager::RegionTaskElementPtr RegionManager::getRegionTaskCtrl(const RegionID region_id) const +const RegionManager::RegionTaskElement & RegionManager::getRegionTaskCtrl(const RegionID region_id) const { std::lock_guard lock(mutex); - auto & p = regions_ctrl[region_id]; - return p ? p : (p = std::make_shared()); + if (auto it = regions_ctrl.find(region_id); it != regions_ctrl.end()) + return it->second; + + return regions_ctrl.try_emplace(region_id).first->second; } -RegionTaskLock RegionManager::genRegionTaskLock(const RegionID region_id) const { return RegionTaskLock(*getRegionTaskCtrl(region_id)); } +RegionTaskLock RegionManager::genRegionTaskLock(const RegionID region_id) const +{ + return RegionTaskLock(getRegionTaskCtrl(region_id).mutex); +} size_t KVStore::regionSize() const { @@ -77,16 +84,16 @@ bool KVStore::onSnapshot(RegionPtr new_region, Context * context) region_persister.persist(*new_region, region_lock); } { - std::lock_guard lock(task_mutex); + auto task_lock = genTaskLock(); auto region_lock = region_manager.genRegionTaskLock(region_id); RegionPtr old_region = getRegion(region_id); if (old_region != nullptr) { - UInt64 old_index = old_region->getProbableIndex(); + UInt64 old_index = old_region->appliedIndex(); LOG_DEBUG(log, "KVStore::onSnapshot previous " << old_region->toString(true) << " ; new " << new_region->toString(true)); - if (old_index >= new_region->getProbableIndex()) + if (old_index >= new_region->appliedIndex()) { LOG_INFO(log, "KVStore::onSnapshot discard new region because of index is outdated"); return false; @@ -126,7 +133,7 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex LOG_INFO(log, "Report [region " << region_id << "] destroyed"); }; - std::lock_guard lock(task_mutex); + auto task_lock = genTaskLock(); for (auto && cmd : *cmds.mutable_requests()) { @@ -155,7 +162,8 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex continue; } - RaftCommandResult result = curr_region.onCommand(std::move(cmd)); + curr_region.makeRaftCommandDelegate(task_lock).onCommand(std::move(cmd), *this, region_table, *raft_cmd_res); + RaftCommandResult & result = *raft_cmd_res; const auto region_report = [&]() { *(responseBatch.add_responses()) = curr_region.toCommandResponse(); }; @@ -269,7 +277,7 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex void KVStore::report(RaftContext & raft_ctx) { - std::lock_guard lock(task_mutex); + auto lock = genTaskLock(); enginepb::CommandResponseBatch responseBatch; { @@ -364,6 +372,7 @@ void KVStore::updateRegionTableBySnapshot(RegionTable & region_table) LOG_INFO(log, "update RegionTable done"); } +KVStoreTaskLock KVStore::genTaskLock() const { return KVStoreTaskLock(task_mutex); } RegionMap & KVStore::regions() { return region_manager.regions; } const RegionMap & KVStore::regions() const { return region_manager.regions; } std::mutex & KVStore::mutex() const { return region_manager.mutex; } diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index c78edaec726..87589bf053a 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -11,6 +11,8 @@ namespace DB static const Seconds REGION_PERSIST_PERIOD(300); // 5 minutes static const Seconds KVSTORE_TRY_PERSIST_PERIOD(180); // 3 minutes +class Context; + class KVStore; using KVStorePtr = std::shared_ptr; @@ -19,6 +21,8 @@ struct RaftContext; class Region; using RegionPtr = std::shared_ptr; +struct RaftCommandResult; +class KVStoreTaskLock; struct MockTiDBTable; @@ -51,6 +55,8 @@ class KVStore final : private boost::noncopyable void updateRegionTableBySnapshot(RegionTable & region_table); + KVStoreTaskLock genTaskLock() const; + private: friend class MockTiDB; friend struct MockTiDBTable; @@ -70,7 +76,18 @@ class KVStore final : private boost::noncopyable // onServiceCommand and onSnapshot should not be called concurrently mutable std::mutex task_mutex; + // raft_cmd_res stores the result of applying raft cmd. It must be protected by task_mutex. + std::unique_ptr raft_cmd_res; + Logger * log; }; +/// Encapsulation of lock guard of task mutex in KVStore +class KVStoreTaskLock : private boost::noncopyable +{ + friend KVStoreTaskLock KVStore::genTaskLock() const; + KVStoreTaskLock(std::mutex & mutex_) : lock(mutex_) {} + std::lock_guard lock; +}; + } // namespace DB diff --git a/dbms/src/Storages/Transaction/RaftCommandResult.h b/dbms/src/Storages/Transaction/RaftCommandResult.h index 8d3eeb912fb..edb8593423a 100644 --- a/dbms/src/Storages/Transaction/RaftCommandResult.h +++ b/dbms/src/Storages/Transaction/RaftCommandResult.h @@ -8,7 +8,7 @@ namespace DB class Region; using RegionPtr = std::shared_ptr; -struct RaftCommandResult +struct RaftCommandResult : private boost::noncopyable { enum Type { diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 3583fa39fec..2e2e6e929ce 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -34,7 +34,7 @@ RegionDataReadInfo Region::readDataByWriteIt(const TableID & table_id, const Reg LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ts) const { return data.getLockInfo(expected_table_id, start_ts); } -TableID Region::insert(const std::string & cf, TiKVKey key, TiKVValue value) +TableID Region::insert(const std::string & cf, TiKVKey && key, TiKVValue && value) { std::unique_lock lock(mutex); return doInsert(cf, std::move(key), std::move(value)); @@ -86,13 +86,7 @@ TableID Region::doRemove(const std::string & cf, const TiKVKey & key) return table_id; } -UInt64 Region::getIndex() const -{ - std::shared_lock lock(mutex); - return meta.appliedIndex(); -} - -UInt64 Region::getProbableIndex() const { return meta.appliedIndex(); } +UInt64 Region::appliedIndex() const { return meta.appliedIndex(); } RegionPtr Region::splitInto(RegionMeta meta) { @@ -111,17 +105,17 @@ RegionPtr Region::splitInto(RegionMeta meta) return new_region; } -void Region::execChangePeer( +void RegionRaftCommandDelegate::execChangePeer( const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term) { const auto & change_peer_request = request.change_peer(); LOG_INFO(log, toString(false) << " execute change peer type: " << eraftpb::ConfChangeType_Name(change_peer_request.change_type())); - meta.execChangePeer(request, response, index, term); + meta.makeRaftCommandDelegate().execChangePeer(request, response, index, term); } -Regions Region::execBatchSplit( +Regions RegionRaftCommandDelegate::execBatchSplit( const raft_cmdpb::AdminRequest &, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term) { const auto & new_region_infos = response.splits().regions(); @@ -174,30 +168,30 @@ Regions Region::execBatchSplit( return split_regions; } -void Region::execCompactLog( +void RegionRaftCommandDelegate::execCompactLog( const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term) { const auto & compact_log_request = request.compact_log(); - LOG_INFO(log, - toString(false) << " execute compact log, compact_term: " << compact_log_request.compact_term() - << ", compact_index: " << compact_log_request.compact_index()); + const auto compact_index = compact_log_request.compact_index(); + const auto compact_term = compact_log_request.compact_term(); + + LOG_INFO(log, toString(false) << " execute compact log, compact_term: " << compact_term << ", compact_index: " << compact_index); - meta.execCompactLog(request, response, index, term); + meta.makeRaftCommandDelegate().execCompactLog(request, response, index, term); } -RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd) +void RegionRaftCommandDelegate::onCommand(enginepb::CommandRequest && cmd, const KVStore &, RegionTable *, RaftCommandResult & result) { const auto & header = cmd.header(); UInt64 term = header.term(); UInt64 index = header.index(); bool sync_log = header.sync_log(); - RaftCommandResult result; + result.type = RaftCommandResult::Type::Default; result.sync_log = sync_log; { - auto applied_index = meta.appliedIndex(); - if (index <= applied_index) + if (index <= appliedIndex()) { result.type = RaftCommandResult::Type::IndexError; if (term == 0 && index == 0) @@ -206,7 +200,7 @@ RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd) } else LOG_WARNING(log, toString() << " ignore outdated raft log [term: " << term << ", index: " << index << "]"); - return result; + return; } } @@ -361,8 +355,6 @@ RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd) if (is_dirty) incDirtyFlag(); - - return result; } std::tuple Region::serialize(WriteBuffer & buf) const @@ -417,7 +409,7 @@ ColumnFamilyType Region::getCf(const std::string & cf) RegionID Region::id() const { return meta.regionId(); } -bool Region::isPendingRemove() const { return meta.peerState() == raft_serverpb::PeerState::Tombstone; } +bool Region::isPendingRemove() const { return peerState() == raft_serverpb::PeerState::Tombstone; } void Region::setPendingRemove() { @@ -425,6 +417,8 @@ void Region::setPendingRemove() meta.notifyAll(); } +raft_serverpb::PeerState Region::peerState() const { return meta.peerState(); } + size_t Region::dataSize() const { return data.dataSize(); } size_t Region::writeCFCount() const @@ -591,6 +585,14 @@ void Region::compareAndCompleteSnapshot(HandleMap & handle_map, const TableID ta LOG_INFO(log, "[compareAndCompleteSnapshot] add deleted gc: " << deleted_gc_cnt); } +RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLock & lock) +{ + static_assert(sizeof(RegionRaftCommandDelegate) == sizeof(Region)); + // lock is useless, just to make sure the task mutex of KVStore is locked + std::ignore = lock; + return static_cast(*this); +} + void Region::compareAndCompleteSnapshot(const Timestamp safe_point, const Region & source_region) { const auto & [start_key, end_key] = getRange(); diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index f5fa11ba0f4..eb5d8eea1f9 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -16,6 +16,10 @@ using RegionPtr = std::shared_ptr; using Regions = std::vector; struct RaftCommandResult; +class KVStore; +class RegionTable; +class RegionRaftCommandDelegate; +class KVStoreTaskLock; /// Store all kv data of one region. Including 'write', 'data' and 'lock' column families. /// TODO: currently the synchronize mechanism is broken and need to fix. @@ -98,11 +102,9 @@ class Region : public std::enable_shared_from_this : meta(std::move(meta_)), client(region_client_create(meta.getRegionVerID())), log(&Logger::get(log_name)) {} - TableID insert(const std::string & cf, TiKVKey key, TiKVValue value); + TableID insert(const std::string & cf, TiKVKey && key, TiKVValue && value); TableID remove(const std::string & cf, const TiKVKey & key); - RaftCommandResult onCommand(enginepb::CommandRequest && cmd); - CommittedScanner createCommittedScanner(TableID expected_table_id); CommittedRemover createCommittedRemover(TableID expected_table_id); @@ -118,6 +120,7 @@ class Region : public std::enable_shared_from_this bool isPendingRemove() const; void setPendingRemove(); bool isPeerRemoved() const; + raft_serverpb::PeerState peerState() const; size_t dataSize() const; size_t writeCFCount() const; @@ -141,8 +144,7 @@ class Region : public std::enable_shared_from_this void waitIndex(UInt64 index); - UInt64 getIndex() const; - UInt64 getProbableIndex() const; + UInt64 appliedIndex() const; RegionVersion version() const; RegionVersion confVer() const; @@ -163,11 +165,13 @@ class Region : public std::enable_shared_from_this void compareAndCompleteSnapshot(const Timestamp safe_point, const Region & source_region); static ColumnFamilyType getCf(const std::string & cf); + RegionRaftCommandDelegate & makeRaftCommandDelegate(const KVStoreTaskLock &); void tryPreDecodeTiKVValue(); private: Region() = delete; + friend class RegionRaftCommandDelegate; // Private methods no need to lock mutex, normally @@ -182,12 +186,6 @@ class Region : public std::enable_shared_from_this LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) const; RegionPtr splitInto(RegionMeta meta); - Regions execBatchSplit( - const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term); - void execChangePeer( - const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term); - void execCompactLog( - const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term); private: RegionData data; @@ -205,4 +203,21 @@ class Region : public std::enable_shared_from_this Logger * log; }; +class RegionRaftCommandDelegate : public Region, private boost::noncopyable +{ +public: + /// Only after the task mutex of KVStore is locked, region can apply raft command. + void onCommand(enginepb::CommandRequest &&, const KVStore &, RegionTable *, RaftCommandResult &); + +private: + RegionRaftCommandDelegate() = delete; + + Regions execBatchSplit( + const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term); + void execChangePeer( + const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term); + void execCompactLog( + const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, const UInt64 index, const UInt64 term); +}; + } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionManager.h b/dbms/src/Storages/Transaction/RegionManager.h index 99fab22422b..9570d9ff3d7 100644 --- a/dbms/src/Storages/Transaction/RegionManager.h +++ b/dbms/src/Storages/Transaction/RegionManager.h @@ -11,23 +11,34 @@ using RegionMap = std::unordered_map; class RegionTaskLock; +/// RegionManager is used to store region instance and mutex for region to execute raft cmd/task. class RegionManager : private boost::noncopyable { public: + /// Encapsulate the task lock for region RegionTaskLock genRegionTaskLock(const RegionID region_id) const; private: friend class KVStore; - using RegionTaskElement = std::mutex; - using RegionTaskElementPtr = std::shared_ptr; - RegionTaskElementPtr getRegionTaskCtrl(const RegionID region_id) const; - mutable std::unordered_map regions_ctrl; - RegionMap regions; + struct RegionTaskElement : private boost::noncopyable + { + mutable std::mutex mutex; + }; + + /// The life time of each RegionTaskElement element should be as long as RegionManager, just return const ref. + const RegionTaskElement & getRegionTaskCtrl(const RegionID region_id) const; + + /// RegionManager can only be constructed by KVStore. + RegionManager() = default; +private: + mutable std::unordered_map regions_ctrl; + RegionMap regions; mutable std::mutex mutex; }; +/// Task lock for region to prevent other thread persist middle state during applying raft cmd. class RegionTaskLock : private boost::noncopyable { friend RegionTaskLock RegionManager::genRegionTaskLock(const RegionID region_id) const; diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index 09132402b06..f5cc3d5f330 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -83,7 +83,7 @@ void RegionMeta::doSetApplied(UInt64 index, UInt64 term) applied_term = term; } -void RegionMeta::notifyAll() { cv.notify_all(); } +void RegionMeta::notifyAll() const { cv.notify_all(); } UInt64 RegionMeta::appliedIndex() const { @@ -154,13 +154,13 @@ void RegionMeta::setPeerState(const raft_serverpb::PeerState peer_state_) region_state.set_state(peer_state_); } -void RegionMeta::waitIndex(UInt64 index) +void RegionMeta::waitIndex(UInt64 index) const { std::unique_lock lock(mutex); cv.wait(lock, [this, index] { return doCheckIndex(index); }); } -bool RegionMeta::checkIndex(UInt64 index) +bool RegionMeta::checkIndex(UInt64 index) const { std::lock_guard lock(mutex); return doCheckIndex(index); @@ -196,7 +196,7 @@ void RegionMeta::assignRegionMeta(RegionMeta && rhs) region_state = std::move(rhs.region_state); } -void RegionMeta::execChangePeer( +void MetaRaftCommandDelegate::execChangePeer( const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) { const auto & change_peer_request = request.change_peer(); @@ -233,7 +233,7 @@ void RegionMeta::execChangePeer( } } -void RegionMeta::execCompactLog( +void MetaRaftCommandDelegate::execCompactLog( const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse &, const UInt64 index, const UInt64 term) { const auto & compact_log_request = request.compact_log(); @@ -263,6 +263,9 @@ bool RegionMeta::isPeerRemoved() const bool operator==(const RegionMeta & meta1, const RegionMeta & meta2) { + std::lock_guard lock1(meta1.mutex); + std::lock_guard lock2(meta2.mutex); + return meta1.peer == meta2.peer && meta1.apply_state == meta2.apply_state && meta1.applied_term == meta2.applied_term && meta1.region_state == meta2.region_state; } @@ -274,4 +277,10 @@ std::tuple RegionMeta::dumpVersionRan std::make_pair(TiKVKey::copyFrom(region_state.region().start_key()), TiKVKey::copyFrom(region_state.region().end_key()))}; } +MetaRaftCommandDelegate & RegionMeta::makeRaftCommandDelegate() +{ + static_assert(sizeof(MetaRaftCommandDelegate) == sizeof(RegionMeta)); + return static_cast(*this); +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index b4bee48c6fb..0370d0949b0 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -19,6 +19,7 @@ namespace DB { using RegionRange = std::pair; +class MetaRaftCommandDelegate; class RegionMeta { @@ -62,7 +63,7 @@ class RegionMeta raft_serverpb::RaftApplyState getApplyState() const; void setApplied(UInt64 index, UInt64 term); - void notifyAll(); + void notifyAll() const; std::string toString(bool dump_status = true) const; @@ -79,20 +80,20 @@ class RegionMeta friend bool operator==(const RegionMeta & meta1, const RegionMeta & meta2); - void waitIndex(UInt64 index); - bool checkIndex(UInt64 index); + void waitIndex(UInt64 index) const; + bool checkIndex(UInt64 index) const; bool isPeerRemoved() const; - void execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); - void execCompactLog(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); std::tuple dumpVersionRange() const; + MetaRaftCommandDelegate & makeRaftCommandDelegate(); private: - void doSetRegion(const metapb::Region & region); + RegionMeta() = delete; + friend class MetaRaftCommandDelegate; + void doSetRegion(const metapb::Region & region); void doSetApplied(UInt64 index, UInt64 term); - bool doCheckIndex(UInt64 index) const; private: @@ -105,7 +106,7 @@ class RegionMeta raft_serverpb::RegionLocalState region_state; mutable std::mutex mutex; - std::condition_variable cv; + mutable std::condition_variable cv; const RegionID region_id; }; @@ -125,4 +126,14 @@ inline raft_serverpb::RaftApplyState initialApplyState() return state; } +class MetaRaftCommandDelegate : public RegionMeta, private boost::noncopyable +{ + friend class RegionRaftCommandDelegate; + + MetaRaftCommandDelegate() = delete; + + void execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); + void execCompactLog(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); +}; + } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 941262cf368..9701ea98a2e 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -359,7 +359,7 @@ void RegionTable::removeRegion(const RegionID region_id) void RegionTable::tryFlushRegion(RegionID region_id) { - TableID table_id; + TableIDSet table_ids; { std::lock_guard lock(mutex); if (auto it = regions.find(region_id); it != regions.end()) @@ -370,7 +370,7 @@ void RegionTable::tryFlushRegion(RegionID region_id) return; } // maybe this region contains more than one table, just flush the first one. - table_id = *it->second.begin(); + table_ids = it->second; } else { @@ -379,6 +379,12 @@ void RegionTable::tryFlushRegion(RegionID region_id) } } + for (const auto table_id : table_ids) + tryFlushRegion(region_id, table_id); +} + +void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id) +{ const auto func_update_region = [&](std::function && callback) -> bool { std::lock_guard lock(mutex); if (auto table_it = tables.find(table_id); table_it != tables.end()) diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index b438f1ff0ab..36ab104ca66 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -177,6 +177,7 @@ class RegionTable : private boost::noncopyable bool tryFlushRegions(); void tryFlushRegion(RegionID region_id); + void tryFlushRegion(RegionID region_id, TableID table_id); void traverseInternalRegions(std::function && callback); void traverseInternalRegionsByTable(const TableID table_id, std::function && callback); diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index b65db7d9183..fd34a0bf30f 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include diff --git a/dbms/src/Storages/Transaction/applySnapshot.cpp b/dbms/src/Storages/Transaction/applySnapshot.cpp index 2d1a7689344..add7e60fa02 100644 --- a/dbms/src/Storages/Transaction/applySnapshot.cpp +++ b/dbms/src/Storages/Transaction/applySnapshot.cpp @@ -26,7 +26,7 @@ bool applySnapshot(const KVStorePtr & kvstore, RegionPtr new_region, Context * c if (old_region) { - if (old_region->getIndex() >= new_region->getIndex()) + if (old_region->appliedIndex() >= new_region->appliedIndex()) { LOG_WARNING(log, "Region " << new_region->id() << " already has newer index, " << old_region->toString(true)); return false; From 42023a585898bf416f712572573232423addf122 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Wed, 4 Sep 2019 18:57:03 +0800 Subject: [PATCH 2/4] make genTaskLock private --- dbms/src/Storages/Transaction/KVStore.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 87589bf053a..0fe6882c32f 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -55,12 +55,11 @@ class KVStore final : private boost::noncopyable void updateRegionTableBySnapshot(RegionTable & region_table); - KVStoreTaskLock genTaskLock() const; - private: friend class MockTiDB; friend struct MockTiDBTable; void removeRegion(const RegionID region_id, RegionTable * region_table); + KVStoreTaskLock genTaskLock() const; RegionMap & regions(); const RegionMap & regions() const; @@ -85,7 +84,7 @@ class KVStore final : private boost::noncopyable /// Encapsulation of lock guard of task mutex in KVStore class KVStoreTaskLock : private boost::noncopyable { - friend KVStoreTaskLock KVStore::genTaskLock() const; + friend class KVStore; KVStoreTaskLock(std::mutex & mutex_) : lock(mutex_) {} std::lock_guard lock; }; From b054cdf410fb8a43a6f713c3074176e439296528 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Wed, 4 Sep 2019 19:24:46 +0800 Subject: [PATCH 3/4] catch exception when try to flush region --- dbms/src/Storages/Transaction/RegionTable.cpp | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 9701ea98a2e..bcb36472aa1 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -423,7 +423,16 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id) if (!status) return; - flushRegion(table_id, region_id, cache_bytes, false); + std::exception_ptr first_exception; + + try + { + flushRegion(table_id, region_id, cache_bytes, false); + } + catch (...) + { + first_exception = std::current_exception(); + } func_update_region([&](InternalRegion & region) -> bool { region.pause_flush = false; @@ -433,6 +442,9 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id) region.last_flush_time = Clock::now(); return true; }); + + if (first_exception) + std::rethrow_exception(first_exception); } bool RegionTable::tryFlushRegions() @@ -449,8 +461,17 @@ bool RegionTable::tryFlushRegions() }); } - for (auto & [id, cache_bytes] : to_flush) - flushRegion(id.first, id.second, cache_bytes); + std::exception_ptr first_exception; + + try + { + for (auto & [id, cache_bytes] : to_flush) + flushRegion(id.first, id.second, cache_bytes); + } + catch (...) + { + first_exception = std::current_exception(); + } { // Now reset status information. Timepoint now = Clock::now(); @@ -466,6 +487,9 @@ bool RegionTable::tryFlushRegions() }); } + if (first_exception) + std::rethrow_exception(first_exception); + return !to_flush.empty(); } From 608215447f303e9716366ca704adcfd6ff8a80a0 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Wed, 4 Sep 2019 20:24:16 +0800 Subject: [PATCH 4/4] checkout to pingcap/tidb:v3.0.3 --- tests/docker/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/docker/docker-compose.yaml b/tests/docker/docker-compose.yaml index 2729a116178..1132ce1137f 100644 --- a/tests/docker/docker-compose.yaml +++ b/tests/docker/docker-compose.yaml @@ -67,7 +67,7 @@ services: - "pd0" restart: on-failure tidb0: - image: pingcap/tidb:latest + image: pingcap/tidb:v3.0.3 ports: - "4000:4000" - "10080:10080"