diff --git a/dbms/src/DataStreams/RangesFilterBlockInputStream.cpp b/dbms/src/DataStreams/RangesFilterBlockInputStream.cpp index 936e765a09d..29364f12f90 100644 --- a/dbms/src/DataStreams/RangesFilterBlockInputStream.cpp +++ b/dbms/src/DataStreams/RangesFilterBlockInputStream.cpp @@ -10,9 +10,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -template class RangesFilterBlockInputStream; -template class RangesFilterBlockInputStream; - struct PKColumnIterator : public std::iterator { PKColumnIterator & operator++() @@ -112,4 +109,7 @@ Block RangesFilterBlockInputStream::readImpl() } } +template class RangesFilterBlockInputStream; +template class RangesFilterBlockInputStream; + } // namespace DB diff --git a/dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.cpp index 5a9a69b973a..b0adc4d67eb 100644 --- a/dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.cpp @@ -3,8 +3,6 @@ namespace DB { -template class ReplacingTMTSortedBlockInputStream; -template class ReplacingTMTSortedBlockInputStream; template void ReplacingTMTSortedBlockInputStream::insertRow(MutableColumns & merged_columns, size_t & merged_rows) @@ -181,4 +179,7 @@ void ReplacingTMTSortedBlockInputStream::logRowGoing(const std::stri << ". same=" << ((toString(curr_pk) == next_pk) ? "true" : "false") << ". why{" << msg << "}, output: " << is_output); } +template class ReplacingTMTSortedBlockInputStream; +template class ReplacingTMTSortedBlockInputStream; + } // namespace DB diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp index a528aaf50a7..e762ba3bd13 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp @@ -1,21 +1,16 @@ +#include +#include +#include +#include #include #include #include #include #include #include - -#include -#include - #include #include -#include - -#include -#include - namespace DB { diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index 3bd27878aa3..7a74e86a448 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -1,23 +1,19 @@ #include - +#include +#include +#include +#include #include #include - #include +#include #include -#include -#include - #include +#include #include +#include #include -#include -#include -#include - -#include - namespace DB { diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index 9a5149141bd..3354de724fe 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -1,13 +1,12 @@ #include -#include - -#include - +#include #include #include +#include +#include #include - +#include #include #include #include diff --git a/dbms/src/Debug/dbgTools.h b/dbms/src/Debug/dbgTools.h index 7b922c70504..31ab68108ea 100644 --- a/dbms/src/Debug/dbgTools.h +++ b/dbms/src/Debug/dbgTools.h @@ -1,5 +1,12 @@ #pragma once +#include + +namespace TiDB +{ +struct TableInfo; +} + namespace DB { class Context; diff --git a/dbms/src/Raft/RaftService.cpp b/dbms/src/Raft/RaftService.cpp index 12e405d7e36..7cfeebc0d1f 100644 --- a/dbms/src/Raft/RaftService.cpp +++ b/dbms/src/Raft/RaftService.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include diff --git a/dbms/src/Raft/RaftService.h b/dbms/src/Raft/RaftService.h index 100fc082512..b3567dcee9a 100644 --- a/dbms/src/Raft/RaftService.h +++ b/dbms/src/Raft/RaftService.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index d83a00291a3..355d2da41c0 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -2,16 +2,19 @@ #include +#include #include +#include +#include #include +#include #include #include -#include -#include -#include -#include -#include +namespace TiDB +{ +struct TableInfo; +} namespace DB { @@ -20,8 +23,8 @@ namespace DB */ class StorageMergeTree : public ext::shared_ptr_helper, public IStorage { -friend class MergeTreeBlockOutputStream; -friend class TxnMergeTreeBlockOutputStream; + friend class MergeTreeBlockOutputStream; + friend class TxnMergeTreeBlockOutputStream; using TableInfo = TiDB::TableInfo; @@ -30,10 +33,7 @@ friend class TxnMergeTreeBlockOutputStream; void shutdown() override; ~StorageMergeTree() override; - std::string getName() const override - { - return data.merging_params.getModeName() + "MergeTree"; - } + std::string getName() const override { return data.merging_params.getModeName() + "MergeTree"; } std::string getTableName() const override { return table_name; } std::string getDatabaseName() const { return database_name; } @@ -47,18 +47,11 @@ friend class TxnMergeTreeBlockOutputStream; const ColumnsDescription & getColumns() const override { return data.getColumns(); } void setColumns(ColumnsDescription columns_) override { return data.setColumns(std::move(columns_)); } - NameAndTypePair getColumn(const String & column_name) const override - { - return data.getColumn(column_name); - } + NameAndTypePair getColumn(const String & column_name) const override { return data.getColumn(column_name); } - bool hasColumn(const String & column_name) const override - { - return data.hasColumn(column_name); - } + bool hasColumn(const String & column_name) const override { return data.hasColumn(column_name); } - BlockInputStreams read( - const Names & column_names, + BlockInputStreams read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum & processed_stage, @@ -121,7 +114,7 @@ friend class TxnMergeTreeBlockOutputStream; Logger * log; - std::atomic shutdown_called {false}; + std::atomic shutdown_called{false}; BackgroundProcessingPool::TaskHandle merge_task_handle; @@ -134,7 +127,7 @@ friend class TxnMergeTreeBlockOutputStream; * Returns true if merge is finished successfully. */ bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate, - String * out_disable_reason = nullptr); + String * out_disable_reason = nullptr); bool mergeTask(); @@ -148,8 +141,7 @@ friend class TxnMergeTreeBlockOutputStream; * date_column_name - if not empty, the name of the column with the date used for partitioning by month; otherwise, partition_expr_ast is used as the partitioning expression; */ - StorageMergeTree( - const String & path_, + StorageMergeTree(const String & path_, const String & database_name_, const String & table_name_, const ColumnsDescription & columns_, @@ -166,4 +158,4 @@ friend class TxnMergeTreeBlockOutputStream; bool has_force_restore_data_flag); }; -} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index ae7e49e2811..e6195cfe728 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -69,7 +70,7 @@ void KVStore::onSnapshot(RegionPtr new_region, RegionTable * region_table) LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated"); return; } - old_region->reset(std::move(*new_region)); + old_region->assignRegion(std::move(*new_region)); new_region = old_region; } else @@ -229,7 +230,7 @@ bool KVStore::tryPersistAndReport(RaftContext & context, const Seconds kvstore_t traverseRegions([&](const RegionID region_id, const RegionPtr & region) { if (now < (region->lastPersistTime() + region_persist_period)) return; - if (region->persistParm() == 0) + if (region->dirtyFlag() == 0) return; all_region_copy[region_id] = region; }); @@ -244,7 +245,7 @@ bool KVStore::tryPersistAndReport(RaftContext & context, const Seconds kvstore_t region_persister.persist(region, response); - ss << "(" << region_id << "," << region->persistParm() << ") "; + ss << "(" << region_id << "," << region->dirtyFlag() << ") "; } if (persist_job) diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 31244332839..e0f5b3223cb 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -41,7 +41,7 @@ std::tuple RegionTab const Names & ordered_columns, bool learner_read, bool resolve_locks, - UInt64 start_ts, + Timestamp start_ts, RegionDataReadInfoList * data_list_for_remove) { if (!region) diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 97e736ae34e..8fd629ad9e1 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -15,10 +15,10 @@ extern const int UNKNOWN_FORMAT_VERSION; const UInt32 Region::CURRENT_VERSION = 0; -const String Region::lock_cf_name = "lock"; -const String Region::default_cf_name = "default"; -const String Region::write_cf_name = "write"; -const String Region::log_name = "Region"; +const std::string Region::lock_cf_name = "lock"; +const std::string Region::default_cf_name = "default"; +const std::string Region::write_cf_name = "write"; +const std::string Region::log_name = "Region"; RegionData::WriteCFIter Region::removeDataByWriteIt(const TableID & table_id, const RegionData::WriteCFIter & write_it) { @@ -38,12 +38,12 @@ TableID Region::insert(const std::string & cf, const TiKVKey & key, const TiKVVa return doInsert(cf, key, value); } -void Region::batchInsert(std::function && f) +void Region::batchInsert(std::function && f) { std::unique_lock lock(mutex); for (;;) { - if (BatchInsertNode p; f(p)) + if (BatchInsertElement p; f(p)) { auto && [k, v, cf] = p; doInsert(*cf, *k, *v); @@ -53,10 +53,10 @@ void Region::batchInsert(std::function && f) } } -TableID Region::doInsert(const String & cf, const TiKVKey & key, const TiKVValue & value) +TableID Region::doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) { // Ignoring all keys other than records. - String raw_key = RecordKVFormat::decodeTiKVKey(key); + std::string raw_key = RecordKVFormat::decodeTiKVKey(key); if (!RecordKVFormat::isRecord(raw_key)) return InvalidTableID; @@ -68,16 +68,16 @@ TableID Region::doInsert(const String & cf, const TiKVKey & key, const TiKVValue return data.insert(type, key, raw_key, value); } -TableID Region::remove(const String & cf, const TiKVKey & key) +TableID Region::remove(const std::string & cf, const TiKVKey & key) { std::unique_lock lock(mutex); return doRemove(cf, key); } -TableID Region::doRemove(const String & cf, const TiKVKey & key) +TableID Region::doRemove(const std::string & cf, const TiKVKey & key) { // Ignoring all keys other than records. - String raw_key = RecordKVFormat::decodeTiKVKey(key); + std::string raw_key = RecordKVFormat::decodeTiKVKey(key); if (!RecordKVFormat::isRecord(raw_key)) return InvalidTableID; @@ -154,7 +154,7 @@ Regions Region::execBatchSplit( { std::unique_lock lock(mutex); - int new_region_index = 0; + int new_region_index = -1; for (int i = 0; i < new_region_infos.size(); ++i) { const auto & region_info = new_region_infos[i]; @@ -166,12 +166,20 @@ Regions Region::execBatchSplit( split_regions.emplace_back(split_region); } else - new_region_index = i; + { + if (new_region_index == -1) + new_region_index = i; + else + throw Exception("Region::execBatchSplit duplicate region index", ErrorCodes::LOGICAL_ERROR); + } } + if (new_region_index == -1) + throw Exception("Region::execBatchSplit region index not found", ErrorCodes::LOGICAL_ERROR); + RegionMeta new_meta(meta.getPeer(), new_region_infos[new_region_index], meta.getApplyState()); - meta.reset(std::move(new_meta)); - meta.setApplied(index, term); + new_meta.setApplied(index, term); + meta.assignRegionMeta(std::move(new_meta)); } std::stringstream ids; @@ -289,7 +297,7 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng meta.notifyAll(); if (need_persist) - incPersistParm(); + incDirtyFlag(); for (auto & region : split_regions) region->last_persist_time.store(last_persist_time); @@ -325,7 +333,7 @@ RegionPtr Region::deserialize(ReadBuffer & buf, const RegionClientCreateFunc * r RegionData::deserialize(buf, region->data); - region->persist_parm = 0; + region->dirty_flag = 0; return region; } @@ -374,11 +382,11 @@ void Region::markPersisted() { last_persist_time = Clock::now(); } Timepoint Region::lastPersistTime() const { return last_persist_time; } -size_t Region::persistParm() const { return persist_parm; } +size_t Region::dirtyFlag() const { return dirty_flag; } -void Region::decPersistParm(size_t x) { persist_parm -= x; } +void Region::decDirtyFlag(size_t x) { dirty_flag -= x; } -void Region::incPersistParm() { persist_parm++; } +void Region::incDirtyFlag() { dirty_flag++; } std::unique_ptr Region::createCommittedScanner(TableID expected_table_id) { @@ -422,15 +430,15 @@ HandleRange Region::getHandleRangeByTable(TableID table_id) const return TiKVRange::getHandleRangeByTable(getRange(), table_id); } -void Region::reset(Region && new_region) +void Region::assignRegion(Region && new_region) { std::unique_lock lock(mutex); - data.reset(std::move(new_region.data)); + data.assignRegionData(std::move(new_region.data)); - incPersistParm(); + incDirtyFlag(); - meta.reset(std::move(new_region.meta)); + meta.assignRegionMeta(std::move(new_region.meta)); meta.notifyAll(); } diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 45cff96d416..f5cc229b107 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -22,10 +22,10 @@ class Region : public std::enable_shared_from_this public: const static UInt32 CURRENT_VERSION; - const static String lock_cf_name; - const static String default_cf_name; - const static String write_cf_name; - const static String log_name; + const static std::string lock_cf_name; + const static std::string default_cf_name; + const static std::string write_cf_name; + const static std::string log_name; static const auto PutFlag = RegionData::CFModifyFlag::PutFlag; static const auto DelFlag = RegionData::CFModifyFlag::DelFlag; @@ -110,8 +110,8 @@ class Region : public std::enable_shared_from_this TableID insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); TableID remove(const std::string & cf, const TiKVKey & key); - using BatchInsertNode = std::tuple; - void batchInsert(std::function && f); + using BatchInsertElement = std::tuple; + void batchInsert(std::function && f); std::tuple, TableIDSet, bool> onCommand(const enginepb::CommandRequest & cmd); @@ -135,9 +135,9 @@ class Region : public std::enable_shared_from_this void markPersisted(); Timepoint lastPersistTime() const; - size_t persistParm() const; - void decPersistParm(size_t x); - void incPersistParm(); + size_t dirtyFlag() const; + void decDirtyFlag(size_t x); + void incDirtyFlag(); friend bool operator==(const Region & region1, const Region & region2) { @@ -159,18 +159,18 @@ class Region : public std::enable_shared_from_this HandleRange getHandleRangeByTable(TableID table_id) const; - void reset(Region && new_region); + void assignRegion(Region && new_region); TableIDSet getCommittedRecordTableID() const; private: // Private methods no need to lock mutex, normally - TableID doInsert(const String & cf, const TiKVKey & key, const TiKVValue & value); - TableID doRemove(const String & cf, const TiKVKey & key); + TableID doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); + TableID doRemove(const std::string & cf, const TiKVKey & key); bool checkIndex(UInt64 index); - static ColumnFamilyType getCf(const String & cf); + static ColumnFamilyType getCf(const std::string & cf); RegionDataReadInfo readDataByWriteIt(const TableID & table_id, const RegionData::ConstWriteCFIter & write_it) const; RegionData::WriteCFIter removeDataByWriteIt(const TableID & table_id, const RegionData::WriteCFIter & write_it); @@ -191,7 +191,8 @@ class Region : public std::enable_shared_from_this std::atomic last_persist_time = Clock::now(); - std::atomic persist_parm = 1; + // dirty_flag is used to present whether this region need to be persisted. + std::atomic dirty_flag = 1; Logger * log; }; diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp new file mode 100644 index 00000000000..e1bbb4848dd --- /dev/null +++ b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp @@ -0,0 +1,243 @@ +#include +#include + +namespace DB +{ + +template +const TiKVKey & RegionCFDataBase::getTiKVKey(const Value & val) +{ + return std::get<0>(val); +} + +template +const TiKVValue & RegionCFDataBase::getTiKVValue(const Value & val) +{ + return std::get<1>(val); +} + +template +TableID RegionCFDataBase::insert(const TiKVKey & key, const TiKVValue & value) +{ + const String & raw_key = RecordKVFormat::decodeTiKVKey(key); + return insert(key, value, raw_key); +} + +template +TableID RegionCFDataBase::insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) +{ + TableID table_id = RecordKVFormat::getTableId(raw_key); + auto & map = data[table_id]; + auto [it, ok] = map.insert(Trait::genKVPair(key, raw_key, value)); + std::ignore = it; + if (!ok) + throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + return table_id; +} + +template +size_t RegionCFDataBase::calcTiKVKeyValueSize(const Value & value) +{ + return calcTiKVKeyValueSize(getTiKVKey(value), getTiKVValue(value)); +} + +template +size_t RegionCFDataBase::calcTiKVKeyValueSize(const TiKVKey & key, const TiKVValue & value) +{ + if constexpr (std::is_same::value) + return 0; + else + return key.dataSize() + value.dataSize(); +} + +template +size_t RegionCFDataBase::remove(TableID table_id, const Key & key) +{ + auto & map = data[table_id]; + + if (auto it = map.find(key); it != map.end()) + { + const Value & value = it->second; + size_t size = calcTiKVKeyValueSize(value); + map.erase(it); + return size; + } + else + { + auto tikv_key = Trait::genTiKVKey(table_id, key); + throw Exception(" key not found [" + tikv_key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + + return 0; + } +} + +template +bool RegionCFDataBase::cmp(const Map & a, const Map & b) +{ + if (a.size() != b.size()) + return false; + for (const auto & [key, value] : a) + { + if (auto it = b.find(key); it != b.end()) + { + if (getTiKVKey(value) != getTiKVKey(it->second) || getTiKVValue(value) != getTiKVValue(it->second)) + return false; + } + else + return false; + } + return true; +} + +template +bool RegionCFDataBase::operator==(const RegionCFDataBase & cf) const +{ + if (getSize() != cf.getSize()) + return false; + + const auto & cf_data = cf.data; + for (const auto & [table_id, map] : data) + { + if (map.empty()) + continue; + + if (auto it = cf_data.find(table_id); it != cf_data.end()) + { + if (!cmp(map, it->second)) + return false; + } + else + return false; + } + return true; +} + +template +size_t RegionCFDataBase::getSize() const +{ + size_t size = 0; + for (auto data_it = data.begin(); data_it != data.end(); ++data_it) + size += data_it->second.size(); + return size; +} + +template +RegionCFDataBase::RegionCFDataBase(RegionCFDataBase && region) : data(std::move(region.data)) +{} + +template +RegionCFDataBase & RegionCFDataBase::operator=(RegionCFDataBase && region) +{ + data = std::move(region.data); + return *this; +} + +template +size_t RegionCFDataBase::splitInto(const RegionRange & range, RegionCFDataBase & new_region_data) +{ + const auto & [start_key, end_key] = range; + size_t size_changed = 0; + + for (auto data_it = data.begin(); data_it != data.end();) + { + const auto & table_id = data_it->first; + auto & ori_map = data_it->second; + if (ori_map.empty()) + { + data_it = data.erase(data_it); + continue; + } + + auto & tar_map = new_region_data.data[table_id]; + + for (auto it = ori_map.begin(); it != ori_map.end();) + { + const auto & key = getTiKVKey(it->second); + + bool ok = start_key ? key >= start_key : true; + ok = ok && (end_key ? key < end_key : true); + if (ok) + { + size_changed += calcTiKVKeyValueSize(it->second); + tar_map.insert(std::move(*it)); + it = ori_map.erase(it); + } + else + ++it; + } + + ++data_it; + } + return size_changed; +} + +template +size_t RegionCFDataBase::serialize(WriteBuffer & buf) const +{ + size_t total_size = 0; + + size_t size = getSize(); + + total_size += writeBinary2(size, buf); + + for (const auto & [table_id, map] : data) + { + std::ignore = table_id; + for (const auto & ele : map) + { + const auto & key = getTiKVKey(ele.second); + const auto & value = getTiKVValue(ele.second); + total_size += key.serialize(buf); + total_size += value.serialize(buf); + } + } + + return total_size; +} + +template +size_t RegionCFDataBase::deserialize(ReadBuffer & buf, RegionCFDataBase & new_region_data) +{ + size_t size = readBinary2(buf); + size_t cf_data_size = 0; + for (size_t i = 0; i < size; ++i) + { + auto key = TiKVKey::deserialize(buf); + auto value = TiKVValue::deserialize(buf); + + new_region_data.insert(key, value); + cf_data_size += calcTiKVKeyValueSize(key, value); + } + return cf_data_size; +} + +template +TableIDSet RegionCFDataBase::getAllRecordTableID() const +{ + TableIDSet tables; + for (const auto & [table_id, map] : data) + { + if (map.empty()) + continue; + tables.insert(table_id); + } + return tables; +} + +template +const typename RegionCFDataBase::Data & RegionCFDataBase::getData() const +{ + return data; +} + +template +typename RegionCFDataBase::Data & RegionCFDataBase::getDataMut() +{ + return data; +} + +template struct RegionCFDataBase; +template struct RegionCFDataBase; +template struct RegionCFDataBase; + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.h b/dbms/src/Storages/Transaction/RegionCFDataBase.h new file mode 100644 index 00000000000..0f756294e81 --- /dev/null +++ b/dbms/src/Storages/Transaction/RegionCFDataBase.h @@ -0,0 +1,60 @@ +#pragma once + +#include + +#include + +namespace DB +{ + +using RegionRange = std::pair; + +template +struct RegionCFDataBase +{ + using Key = typename Trait::Key; + using Value = typename Trait::Value; + using Map = typename Trait::Map; + using Data = std::unordered_map; + + static const TiKVKey & getTiKVKey(const Value & val); + + static const TiKVValue & getTiKVValue(const Value & val); + + TableID insert(const TiKVKey & key, const TiKVValue & value); + + TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key); + + static size_t calcTiKVKeyValueSize(const Value & value); + + static size_t calcTiKVKeyValueSize(const TiKVKey & key, const TiKVValue & value); + + size_t remove(TableID table_id, const Key & key); + + static bool cmp(const Map & a, const Map & b); + + bool operator==(const RegionCFDataBase & cf) const; + + size_t getSize() const; + + RegionCFDataBase() {} + RegionCFDataBase(RegionCFDataBase && region); + RegionCFDataBase & operator=(RegionCFDataBase && region); + + size_t splitInto(const RegionRange & range, RegionCFDataBase & new_region_data); + + size_t serialize(WriteBuffer & buf) const; + + static size_t deserialize(ReadBuffer & buf, RegionCFDataBase & new_region_data); + + const Data & getData() const; + + Data & getDataMut(); + + TableIDSet getAllRecordTableID() const; + +private: + Data data; +}; + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionCFDataTrait.h b/dbms/src/Storages/Transaction/RegionCFDataTrait.h new file mode 100644 index 00000000000..dc18fd56d22 --- /dev/null +++ b/dbms/src/Storages/Transaction/RegionCFDataTrait.h @@ -0,0 +1,72 @@ +#pragma once + +#include + +#include + +namespace DB +{ + +struct RegionWriteCFDataTrait +{ + using DecodedWriteCFValue = RecordKVFormat::DecodedWriteCFValue; + using Key = std::tuple; + using Value = std::tuple; + using Map = std::map; + + static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + { + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + return {Key{handle_id, ts}, Value{key, value, RecordKVFormat::decodeWriteCfValue(value)}}; + } + + static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) + { + const auto & [handle_id, ts] = key; + auto tikv_key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); + return tikv_key; + } +}; + + +struct RegionDefaultCFDataTrait +{ + using Key = std::tuple; + using Value = std::tuple; + using Map = std::map; + + static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + { + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + return {Key{handle_id, ts}, Value{key, value}}; + } + static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) + { + const auto & [handle_id, ts] = key; + auto tikv_key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); + return tikv_key; + } +}; + +struct RegionLockCFDataTrait +{ + using DecodedLockCFValue = RecordKVFormat::DecodedLockCFValue; + using Key = HandleID; + using Value = std::tuple; + using Map = std::map; + + static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + { + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + return {handle_id, Value{key, value, RecordKVFormat::decodeLockCfValue(value)}}; + } + static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) + { + auto tikv_key = RecordKVFormat::genKey(table_id, key); + return tikv_key; + } +}; + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionData.cpp b/dbms/src/Storages/Transaction/RegionData.cpp new file mode 100644 index 00000000000..c18e6ed3c50 --- /dev/null +++ b/dbms/src/Storages/Transaction/RegionData.cpp @@ -0,0 +1,176 @@ +#include +#include +#include + +namespace DB +{ + +TableID RegionData::insert(ColumnFamilyType cf, const TiKVKey & key, const String & raw_key, const TiKVValue & value) +{ + switch (cf) + { + case Write: + { + auto table_id = write_cf.insert(key, value, raw_key); + cf_data_size += key.dataSize() + value.dataSize(); + return table_id; + } + case Default: + { + auto table_id = default_cf.insert(key, value, raw_key); + cf_data_size += key.dataSize() + value.dataSize(); + return table_id; + } + case Lock: + { + return lock_cf.insert(key, value, raw_key); + } + default: + throw Exception(" should not happen", ErrorCodes::LOGICAL_ERROR); + } +} + +TableID RegionData::removeLockCF(const TableID & table_id, const String & raw_key) +{ + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + lock_cf.remove(table_id, handle_id); + return table_id; +} + +RegionData::WriteCFIter RegionData::removeDataByWriteIt(const TableID & table_id, const WriteCFIter & write_it) +{ + const auto & [key, value, decoded_val] = write_it->second; + const auto & [handle, ts] = write_it->first; + const auto & [write_type, prewrite_ts, short_str] = decoded_val; + + std::ignore = ts; + std::ignore = value; + + if (write_type == PutFlag && !short_str) + { + auto & map = default_cf.getDataMut()[table_id]; + + if (auto data_it = map.find({handle, prewrite_ts}); data_it != map.end()) + { + cf_data_size -= RegionDefaultCFData::calcTiKVKeyValueSize(data_it->second); + map.erase(data_it); + } + else + throw Exception(" key [" + key.toString() + "] not found in data cf when removing", ErrorCodes::LOGICAL_ERROR); + } + + cf_data_size -= RegionWriteCFData::calcTiKVKeyValueSize(write_it->second); + + return write_cf.getDataMut()[table_id].erase(write_it); +} + +RegionDataReadInfo RegionData::readDataByWriteIt(const TableID & table_id, const ConstWriteCFIter & write_it) const +{ + const auto & [key, value, decoded_val] = write_it->second; + const auto & [handle, ts] = write_it->first; + + std::ignore = value; + + const auto & [write_type, prewrite_ts, short_value] = decoded_val; + + if (write_type != PutFlag) + return std::make_tuple(handle, write_type, ts, TiKVValue()); + + if (short_value) + return std::make_tuple(handle, write_type, ts, TiKVValue(*short_value)); + + if (auto map_it = default_cf.getData().find(table_id); map_it != default_cf.getData().end()) + { + const auto & map = map_it->second; + if (auto data_it = map.find({handle, prewrite_ts}); data_it != map.end()) + return std::make_tuple(handle, write_type, ts, RegionDefaultCFData::getTiKVValue(data_it->second)); + else + throw Exception(" key [" + key.toString() + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); + } + else + throw Exception(" table [" + toString(table_id) + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); +} + +LockInfoPtr RegionData::getLockInfo(TableID expected_table_id, Timestamp start_ts) const +{ + if (auto it = lock_cf.getData().find(expected_table_id); it != lock_cf.getData().end()) + { + for (const auto & [handle, value] : it->second) + { + std::ignore = handle; + + const auto & [tikv_key, tikv_val, decoded_val] = value; + const auto & [lock_type, primary, ts, ttl, data] = decoded_val; + std::ignore = tikv_val; + std::ignore = data; + + if (lock_type == DelFlag || ts > start_ts) + continue; + + return std::make_unique(LockInfo{primary, ts, RecordKVFormat::decodeTiKVKey(tikv_key), ttl}); + } + + return nullptr; + } + else + return nullptr; +} + +void RegionData::splitInto(const RegionRange & range, RegionData & new_region_data) +{ + size_t size_changed = 0; + size_changed += default_cf.splitInto(range, new_region_data.default_cf); + size_changed += write_cf.splitInto(range, new_region_data.write_cf); + size_changed += lock_cf.splitInto(range, new_region_data.lock_cf); + cf_data_size -= size_changed; + new_region_data.cf_data_size += size_changed; +} + +size_t RegionData::dataSize() const { return cf_data_size; } + +void RegionData::assignRegionData(RegionData && new_region_data) +{ + default_cf = std::move(new_region_data.default_cf); + write_cf = std::move(new_region_data.write_cf); + lock_cf = std::move(new_region_data.lock_cf); + + cf_data_size = new_region_data.cf_data_size.load(); +} + +size_t RegionData::serialize(WriteBuffer & buf) const +{ + size_t total_size = 0; + + total_size += default_cf.serialize(buf); + total_size += write_cf.serialize(buf); + total_size += lock_cf.serialize(buf); + + return total_size; +} + +void RegionData::deserialize(ReadBuffer & buf, RegionData & region_data) +{ + size_t total_size = 0; + total_size += RegionDefaultCFData::deserialize(buf, region_data.default_cf); + total_size += RegionWriteCFData::deserialize(buf, region_data.write_cf); + total_size += RegionLockCFData::deserialize(buf, region_data.lock_cf); + + region_data.cf_data_size += total_size; +} + +RegionWriteCFData & RegionData::writeCFMute() { return write_cf; } + +const RegionWriteCFData & RegionData::writeCF() const { return write_cf; } + +TableIDSet RegionData::getCommittedRecordTableID() const { return writeCF().getAllRecordTableID(); } + +bool RegionData::isEqual(const RegionData & r2) const +{ + return default_cf == r2.default_cf && write_cf == r2.write_cf && lock_cf == r2.lock_cf && cf_data_size == r2.cf_data_size; +} + +RegionData::RegionData(RegionData && data) + : write_cf(std::move(data.write_cf)), default_cf(std::move(data.default_cf)), lock_cf(std::move(data.lock_cf)) +{} + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionData.h b/dbms/src/Storages/Transaction/RegionData.h index b81d2e13f05..297c036c50e 100644 --- a/dbms/src/Storages/Transaction/RegionData.h +++ b/dbms/src/Storages/Transaction/RegionData.h @@ -1,19 +1,13 @@ #pragma once -#include -#include - -#include -#include +#include +#include #include -#include -#include +#include namespace DB { -using RegionRange = std::pair; - enum ColumnFamilyType { Write, @@ -21,291 +15,6 @@ enum ColumnFamilyType Lock, }; -struct RegionWriteCFDataTrait -{ - using DecodedWriteCFValue = RecordKVFormat::DecodedWriteCFValue; - using Key = std::tuple; - using Value = std::tuple; - using Map = std::map; - - static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) - { - HandleID handle_id = RecordKVFormat::getHandle(raw_key); - Timestamp ts = RecordKVFormat::getTs(key); - return {Key{handle_id, ts}, Value{key, value, RecordKVFormat::decodeWriteCfValue(value)}}; - } - static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) - { - const auto & [handle_id, ts] = key; - auto tikv_key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); - return tikv_key; - } -}; - -struct RegionDefaultCFDataTrait -{ - using Key = std::tuple; - using Value = std::tuple; - using Map = std::map; - - static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) - { - HandleID handle_id = RecordKVFormat::getHandle(raw_key); - Timestamp ts = RecordKVFormat::getTs(key); - return {Key{handle_id, ts}, Value{key, value}}; - } - static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) - { - const auto & [handle_id, ts] = key; - auto tikv_key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); - return tikv_key; - } -}; - -struct RegionLockCFDataTrait -{ - using DecodedLockCFValue = RecordKVFormat::DecodedLockCFValue; - using Key = HandleID; - using Value = std::tuple; - using Map = std::map; - - static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) - { - HandleID handle_id = RecordKVFormat::getHandle(raw_key); - return {handle_id, Value{key, value, RecordKVFormat::decodeLockCfValue(value)}}; - } - static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) - { - auto tikv_key = RecordKVFormat::genKey(table_id, key); - return tikv_key; - } -}; - -template -struct RegionCFDataBase -{ - using Key = typename Trait::Key; - using Value = typename Trait::Value; - using Map = typename Trait::Map; - using Data = std::unordered_map; - - static const TiKVKey & getTiKVKey(const Value & val) - { - return std::get<0>(val); - } - - static const TiKVValue & getTiKVValue(const Value & val) - { - return std::get<1>(val); - } - - TableID insert(const TiKVKey & key, const TiKVValue & value) - { - const String & raw_key = RecordKVFormat::decodeTiKVKey(key); - return insert(key, value, raw_key); - } - - TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) - { - TableID table_id = RecordKVFormat::getTableId(raw_key); - auto & map = data[table_id]; - auto [it, ok] = map.insert(Trait::genKVPair(key, raw_key, value)); - std::ignore = it; - if (!ok) - throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); - return table_id; - } - - static size_t calcTiKVKeyValueSize(const Value & value) - { - return calcTiKVKeyValueSize(getTiKVKey(value), getTiKVValue(value)); - } - - static size_t calcTiKVKeyValueSize(const TiKVKey & key, const TiKVValue & value) - { - if constexpr (std::is_same::value) - return 0; - else - return key.dataSize() + value.dataSize(); - } - - size_t remove(TableID table_id, const Key & key) - { - auto & map = data[table_id]; - - if (auto it = map.find(key); it != map.end()) - { - const Value & value = it->second; - size_t size = calcTiKVKeyValueSize(value); - map.erase(it); - return size; - } - else - { - auto tikv_key = Trait::genTiKVKey(table_id, key); - throw Exception(" key not found [" + tikv_key.toString() + "]", ErrorCodes::LOGICAL_ERROR); - - return 0; - } - } - - static bool cmp(const Map & a, const Map & b) - { - if (a.size() != b.size()) - return false; - for (const auto & [key, value] : a) - { - if (auto it = b.find(key); it != b.end()) - { - if (getTiKVKey(value) != getTiKVKey(it->second) || getTiKVValue(value) != getTiKVValue(it->second)) - return false; - } - else - return false; - } - return true; - } - - bool operator == (const RegionCFDataBase & cf) const - { - if (getSize() != cf.getSize()) - return false; - - const auto & cf_data = cf.data; - for (const auto & [table_id, map] : data) - { - if (map.empty()) - continue; - - if (auto it = cf_data.find(table_id); it != cf_data.end()) - { - if (!cmp(map, it->second)) - return false; - } - else - return false; - } - return true; - } - - size_t getSize() const - { - size_t size = 0; - for (auto data_it = data.begin(); data_it != data.end(); ++data_it) - size += data_it->second.size(); - return size; - } - - RegionCFDataBase(){} - RegionCFDataBase(RegionCFDataBase && region): data(std::move(region.data)) {} - RegionCFDataBase & operator = (RegionCFDataBase && region) - { - data = std::move(region.data); - return *this; - } - - size_t splitInto(const RegionRange & range, RegionCFDataBase & new_region_data) - { - const auto & [start_key, end_key] = range; - size_t size_changed = 0; - - for (auto data_it = data.begin(); data_it != data.end(); ) - { - const auto & table_id = data_it->first; - auto & ori_map = data_it->second; - if (ori_map.empty()) - { - data_it = data.erase(data_it); - continue; - } - - auto & tar_map = new_region_data.data[table_id]; - - for (auto it = ori_map.begin(); it != ori_map.end();) - { - const auto & key = getTiKVKey(it->second); - - bool ok = start_key ? key >= start_key : true; - ok = ok && (end_key ? key < end_key : true); - if (ok) - { - size_changed += calcTiKVKeyValueSize(it->second); - tar_map.insert(std::move(*it)); - it = ori_map.erase(it); - } - else - ++it; - } - - ++data_it; - } - return size_changed; - } - - size_t serialize(WriteBuffer & buf) const - { - size_t total_size = 0; - - size_t size = getSize(); - - total_size += writeBinary2(size, buf); - - for (const auto & [table_id, map] : data) - { - std::ignore = table_id; - for (const auto & ele : map) - { - const auto & key = getTiKVKey(ele.second); - const auto & value = getTiKVValue(ele.second); - total_size += key.serialize(buf); - total_size += value.serialize(buf); - } - } - - return total_size; - } - - static size_t deserialize(ReadBuffer & buf, RegionCFDataBase & new_region_data) - { - size_t size = readBinary2(buf); - size_t cf_data_size = 0; - for (size_t i = 0; i < size; ++i) - { - auto key = TiKVKey::deserialize(buf); - auto value = TiKVValue::deserialize(buf); - - new_region_data.insert(key, value); - cf_data_size += calcTiKVKeyValueSize(key, value); - } - return cf_data_size; - } - - const auto & getData() const - { - return data; - } - - auto & getDataMut() - { - return data; - } - - TableIDSet getAllRecordTableID() const - { - TableIDSet tables; - for (const auto & [table_id, map] : data) - { - if (map.empty()) - continue; - tables.insert(table_id); - } - return tables; - } - -private: - Data data; -}; - using RegionWriteCFData = RegionCFDataBase; using RegionDefaultCFData = RegionCFDataBase; using RegionLockCFData = RegionCFDataBase; @@ -329,183 +38,39 @@ class RegionData using WriteCFIter = RegionWriteCFData::Map::iterator; using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator; - TableID insert(ColumnFamilyType cf, const TiKVKey & key, const String & raw_key, const TiKVValue & value) - { - switch(cf) - { - case Write: - { - auto table_id = write_cf.insert(key, value, raw_key); - cf_data_size += key.dataSize() + value.dataSize(); - return table_id; - } - case Default: - { - auto table_id = default_cf.insert(key, value, raw_key); - cf_data_size += key.dataSize() + value.dataSize(); - return table_id; - } - case Lock: - { - return lock_cf.insert(key, value, raw_key); - } - default: - throw Exception(" should not happen", ErrorCodes::LOGICAL_ERROR); - } - } - - TableID removeLockCF(const TableID & table_id, const String & raw_key) - { - HandleID handle_id = RecordKVFormat::getHandle(raw_key); - lock_cf.remove(table_id, handle_id); - return table_id; - } - - WriteCFIter removeDataByWriteIt(const TableID & table_id, const WriteCFIter & write_it) - { - const auto & [key, value, decoded_val] = write_it->second; - const auto & [handle, ts] = write_it->first; - const auto & [write_type, prewrite_ts, short_str] = decoded_val; - - std::ignore = ts; - std::ignore = value; - - if (write_type == PutFlag && !short_str) - { - auto & map = default_cf.getDataMut()[table_id]; - - if (auto data_it = map.find({handle, prewrite_ts}); data_it != map.end()) - { - cf_data_size -= RegionDefaultCFData::calcTiKVKeyValueSize(data_it->second); - map.erase(data_it); - } - else - throw Exception(" key [" + key.toString() + "] not found in data cf when removing", ErrorCodes::LOGICAL_ERROR); - } - - cf_data_size -= RegionWriteCFData::calcTiKVKeyValueSize(write_it->second); - - return write_cf.getDataMut()[table_id].erase(write_it); - } - - RegionDataReadInfo readDataByWriteIt(const TableID & table_id, const ConstWriteCFIter & write_it) const - { - const auto & [key, value, decoded_val] = write_it->second; - const auto & [handle, ts] = write_it->first; - - std::ignore = value; - - const auto & [write_type, prewrite_ts, short_value] = decoded_val; - - if (write_type != PutFlag) - return std::make_tuple(handle, write_type, ts, TiKVValue()); - - if (short_value) - return std::make_tuple(handle, write_type, ts, TiKVValue(*short_value)); - - if (auto map_it = default_cf.getData().find(table_id); map_it != default_cf.getData().end()) - { - const auto & map = map_it->second; - if (auto data_it = map.find({handle, prewrite_ts}); data_it != map.end()) - return std::make_tuple(handle, write_type, ts, RegionDefaultCFData::getTiKVValue(data_it->second)); - else - throw Exception(" key [" + key.toString() + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); - } - else - throw Exception(" table [" + toString(table_id) + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); - } + TableID insert(ColumnFamilyType cf, const TiKVKey & key, const String & raw_key, const TiKVValue & value); - LockInfoPtr getLockInfo(TableID expected_table_id, Timestamp start_ts) const - { - if (auto it = lock_cf.getData().find(expected_table_id); it != lock_cf.getData().end()) - { - for (const auto & [handle, value] : it->second) - { - std::ignore = handle; + TableID removeLockCF(const TableID & table_id, const String & raw_key); - const auto & [tikv_key, tikv_val, decoded_val] = value; - const auto & [lock_type, primary, ts, ttl, data] = decoded_val; - std::ignore = tikv_val; - std::ignore = data; + WriteCFIter removeDataByWriteIt(const TableID & table_id, const WriteCFIter & write_it); - if (lock_type == DelFlag || ts > start_ts) - continue; + RegionDataReadInfo readDataByWriteIt(const TableID & table_id, const ConstWriteCFIter & write_it) const; - return std::make_unique(LockInfo{primary, ts, RecordKVFormat::decodeTiKVKey(tikv_key), ttl}); - } + LockInfoPtr getLockInfo(TableID expected_table_id, Timestamp start_ts) const; - return nullptr; - } - else - return nullptr; - } + void splitInto(const RegionRange & range, RegionData & new_region_data); - void splitInto(const RegionRange & range, RegionData & new_region_data) - { - size_t size_changed = 0; - size_changed += default_cf.splitInto(range, new_region_data.default_cf); - size_changed += write_cf.splitInto(range, new_region_data.write_cf); - size_changed += lock_cf.splitInto(range, new_region_data.lock_cf); - cf_data_size -= size_changed; - new_region_data.cf_data_size += size_changed; - } + size_t dataSize() const; - size_t dataSize() const { return cf_data_size; } + void assignRegionData(RegionData && new_region_data); - void reset(RegionData && new_region_data) - { - default_cf = std::move(new_region_data.default_cf); - write_cf = std::move(new_region_data.write_cf); - lock_cf = std::move(new_region_data.lock_cf); + size_t serialize(WriteBuffer & buf) const; - cf_data_size = new_region_data.cf_data_size.load(); - } + static void deserialize(ReadBuffer & buf, RegionData & region_data); - size_t serialize(WriteBuffer & buf) const - { - size_t total_size = 0; + friend bool operator==(const RegionData & r1, const RegionData & r2) { return r1.isEqual(r2); } - total_size += default_cf.serialize(buf); - total_size += write_cf.serialize(buf); - total_size += lock_cf.serialize(buf); + bool isEqual(const RegionData & r2) const; - return total_size; - } + RegionWriteCFData & writeCFMute(); - static void deserialize(ReadBuffer & buf, RegionData & region_data) - { - size_t total_size = 0; - total_size += RegionDefaultCFData::deserialize(buf, region_data.default_cf); - total_size += RegionWriteCFData::deserialize(buf, region_data.write_cf); - total_size += RegionLockCFData::deserialize(buf, region_data.lock_cf); - - region_data.cf_data_size += total_size; - } + const RegionWriteCFData & writeCF() const; - friend bool operator==(const RegionData & r1, const RegionData & r2) - { - return r1.default_cf == r2.default_cf && r1.write_cf == r2.write_cf - && r1.lock_cf == r2.lock_cf && r1.cf_data_size == r2.cf_data_size; - } - - RegionWriteCFData & writeCFMute() - { - return write_cf; - } - - const RegionWriteCFData & writeCF() const - { - return write_cf; - } - - TableIDSet getCommittedRecordTableID() const - { - return writeCF().getAllRecordTableID(); - } + TableIDSet getCommittedRecordTableID() const; RegionData() {} - RegionData(RegionData && data):write_cf(std::move(data.write_cf)),default_cf(std::move(data.default_cf)),lock_cf(std::move(data.lock_cf)) {} + RegionData(RegionData && data); private: RegionWriteCFData write_cf; diff --git a/dbms/src/Storages/Transaction/RegionLockInfo.h b/dbms/src/Storages/Transaction/RegionLockInfo.h index aee723b1625..253a79f03ae 100644 --- a/dbms/src/Storages/Transaction/RegionLockInfo.h +++ b/dbms/src/Storages/Transaction/RegionLockInfo.h @@ -1,5 +1,7 @@ #pragma once +#include + namespace DB { diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index 592ce3bd988..89e9235f061 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -183,12 +183,12 @@ UInt64 RegionMeta::confVer() const return region.region_epoch().conf_ver(); } -void RegionMeta::reset(RegionMeta && rhs) +void RegionMeta::assignRegionMeta(RegionMeta && rhs) { std::lock_guard lock(mutex); if (regionId() != rhs.regionId()) - throw Exception("RegionMeta::reset region_id not equal, should not happen", ErrorCodes::LOGICAL_ERROR); + throw Exception("RegionMeta::assignRegionMeta region_id not equal, should not happen", ErrorCodes::LOGICAL_ERROR); peer = std::move(rhs.peer); region = std::move(rhs.region); diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index 646c0c1c9d0..81d24e7c428 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -78,7 +78,7 @@ class RegionMeta bool isPendingRemove() const; void setPendingRemove(); - void reset(RegionMeta && other); + void assignRegionMeta(RegionMeta && other); friend bool operator==(const RegionMeta & meta1, const RegionMeta & meta2) { diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 2877cceb6ba..01a677df706 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -22,10 +22,10 @@ void RegionPersister::persist(const RegionPtr & region, enginepb::CommandRespons // Support only on thread persist. std::lock_guard lock(mutex); - size_t persist_parm = region->persistParm(); + size_t dirty_flag = region->dirtyFlag(); doPersist(region, response); region->markPersisted(); - region->decPersistParm(persist_parm); + region->decDirtyFlag(dirty_flag); } void RegionPersister::doPersist(const RegionPtr & region, enginepb::CommandResponse * response) diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 367557f1c96..f5f46b172a7 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -194,7 +194,7 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac cache_size = region->dataSize(); if (cache_size == 0) - region->incPersistParm(); + region->incDirtyFlag(); LOG_DEBUG( log, "Flush region - table_id: " << table_id << ", region_id: " << region_id << ", after flush " << cache_size << " bytes"); diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 8cb301ea801..9b0261a07a9 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -15,12 +16,14 @@ TMTContext::TMTContext(Context & context, std::vector addrs, std::string region_cache(std::make_shared(pd_client, learner_key_, learner_value_)), rpc_client(std::make_shared()) { + std::vector regions_to_remove; + kvstore->restore( [&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr { return this->createRegionClient(id); }, ®ions_to_remove); region_table.restore(std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)); for (RegionID id : regions_to_remove) kvstore->removeRegion(id, ®ion_table); - regions_to_remove.clear(); + kvstore->updateRegionTableBySnapshot(region_table); } diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index 611ad34647c..1d617153572 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #pragma GCC diagnostic push @@ -17,6 +16,9 @@ class Context; class KVStore; using KVStorePtr = std::shared_ptr; +class SchemaSyncer; +using SchemaSyncerPtr = std::shared_ptr; + class TMTContext { public: @@ -41,8 +43,6 @@ class TMTContext pingcap::kv::RpcClientPtr getRpcClient(); private: - std::vector regions_to_remove = {}; - SchemaSyncerPtr schema_syncer; pingcap::pd::ClientPtr pd_client; pingcap::kv::RegionCachePtr region_cache; diff --git a/dbms/src/Storages/Transaction/TMTStorages.cpp b/dbms/src/Storages/Transaction/TMTStorages.cpp new file mode 100644 index 00000000000..c9b52f729f0 --- /dev/null +++ b/dbms/src/Storages/Transaction/TMTStorages.cpp @@ -0,0 +1,41 @@ +#include +#include + +namespace DB +{ + +void TMTStorages::put(StoragePtr storage) +{ + std::lock_guard lock(mutex); + + const StorageMergeTree * merge_tree = dynamic_cast(storage.get()); + if (!merge_tree) + throw Exception{storage->getName() + " is not in MergeTree family", ErrorCodes::LOGICAL_ERROR}; + + TableID table_id = merge_tree->getTableInfo().id; + if (storages.find(table_id) != storages.end()) + return; + storages.emplace(table_id, storage); +} + +StoragePtr TMTStorages::get(TableID table_id) +{ + std::lock_guard lock(mutex); + + auto it = storages.find(table_id); + if (it == storages.end()) + return nullptr; + return it->second; +} + +void TMTStorages::remove(TableID table_id) +{ + std::lock_guard lock(mutex); + + auto it = storages.find(table_id); + if (it == storages.end()) + return; + storages.erase(it); +} + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/TMTStorages.h b/dbms/src/Storages/Transaction/TMTStorages.h index 16ff56c4f6e..4e4558fedbc 100644 --- a/dbms/src/Storages/Transaction/TMTStorages.h +++ b/dbms/src/Storages/Transaction/TMTStorages.h @@ -1,50 +1,29 @@ #pragma once -#include +#include +#include +#include + +#include namespace DB { +class IStorage; +using StoragePtr = std::shared_ptr; + class TMTStorages { public: - void put(StoragePtr storage) - { - std::lock_guard lock(mutex); - - const StorageMergeTree * merge_tree = dynamic_cast(storage.get()); - if (!merge_tree) - throw Exception{storage->getName() + " is not in MergeTree family", ErrorCodes::LOGICAL_ERROR}; - - TableID table_id = merge_tree->getTableInfo().id; - if (storages.find(table_id) != storages.end()) - return; - storages.emplace(table_id, storage); - } - - StoragePtr get(TableID table_id) - { - std::lock_guard lock(mutex); - - auto it = storages.find(table_id); - if (it == storages.end()) - return nullptr; - return it->second; - } - - void remove(TableID table_id) - { - std::lock_guard lock(mutex); - - auto it = storages.find(table_id); - if (it == storages.end()) - return; - storages.erase(it); - } + void put(StoragePtr storage); + + StoragePtr get(TableID table_id); + + void remove(TableID table_id); private: std::unordered_map storages; std::mutex mutex; }; -} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/applySnapshot.cpp b/dbms/src/Storages/Transaction/applySnapshot.cpp index 03ac8ec9294..4f754de5883 100644 --- a/dbms/src/Storages/Transaction/applySnapshot.cpp +++ b/dbms/src/Storages/Transaction/applySnapshot.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -17,8 +18,12 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) enginepb::SnapshotRequest request; auto ok = read(&request); - if (!ok || !request.has_state()) + if (!ok) + throw Exception("Read snapshot fail", ErrorCodes::LOGICAL_ERROR); + + if (!request.has_state()) throw Exception("Failed to read snapshot state", ErrorCodes::LOGICAL_ERROR); + const auto & state = request.state(); pingcap::kv::RegionClientPtr region_client = nullptr; auto meta = RegionMeta(state.peer(), state.region(), state.apply_state()); @@ -46,12 +51,12 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) auto cf_name = data.cf(); auto key = TiKVKey(); auto value = TiKVValue(); - region->batchInsert([&](Region::BatchInsertNode & node) -> bool { + region->batchInsert([&](Region::BatchInsertElement & node) -> bool { if (it == cf_data.end()) return false; key = TiKVKey(it->key()); value = TiKVValue(it->value()); - node = Region::BatchInsertNode(&key, &value, &cf_name); + node = Region::BatchInsertElement(&key, &value, &cf_name); ++it; return true; });