diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 31eeaa5e099..a02c54359fb 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1072,21 +1072,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t } } - { - BlocksList blocks; - if (region_block_data[special_region_index]) - blocks.emplace_back(std::move(region_block_data[special_region_index])); - - if (!blocks.empty()) - { - BlockInputStreamPtr region_input_stream = std::make_shared(std::move(blocks)); - - region_input_stream = func_make_version_filter_input(region_input_stream); - - merging.emplace_back(region_input_stream); - } - } - if (log->debug()) { std::stringstream ss; @@ -1106,6 +1091,21 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t << region_block_data[special_region_index].rows() << " rows from memory"); } + { + BlocksList blocks; + if (region_block_data[special_region_index]) + blocks.emplace_back(std::move(region_block_data[special_region_index])); + + if (!blocks.empty()) + { + BlockInputStreamPtr region_input_stream = std::make_shared(std::move(blocks)); + + region_input_stream = func_make_version_filter_input(region_input_stream); + + merging.emplace_back(region_input_stream); + } + } + if (!merging.empty()) { union_regions_stream.emplace_back(func_make_multi_way_merge_sort_input(merging)); diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 1212640b450..573c18f7141 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -341,6 +341,19 @@ RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd) case raft_cmdpb::CmdType::ReadIndex: LOG_WARNING(log, toString(false) << " skip unsupported command: " << raft_cmdpb::CmdType_Name(type)); break; + case raft_cmdpb::CmdType::DeleteRange: + { + const auto & delete_range = req.delete_range(); + const auto & cf = delete_range.cf(); + const auto & start = static_cast(delete_range.start_key()); + const auto & end = static_cast(delete_range.end_key()); + + LOG_INFO(log, + toString(false) << " start to execute " << raft_cmdpb::CmdType_Name(type) << ", CF: " << cf + << ", start key in hex: " << start.toHex() << ", end key in hex: " << end.toHex()); + doDeleteRange(cf, start, end); + break; + } default: { throw Exception( @@ -634,4 +647,10 @@ void Region::compareAndCompleteSnapshot(const Timestamp safe_point, const Region compareAndCompleteSnapshot(handle_map, table_id, safe_point); } +void Region::doDeleteRange(const std::string & cf, const TiKVKey & start_key, const TiKVKey & end_key) +{ + auto type = getCf(cf); + return data.deleteRange(type, start_key, end_key); +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 9131a8465d4..4a91bedd6cf 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -173,6 +173,7 @@ class Region : public std::enable_shared_from_this TableID doInsert(const std::string & cf, TiKVKey && key, TiKVValue && value); TableID doRemove(const std::string & cf, const TiKVKey & key); + void doDeleteRange(const std::string & cf, const TiKVKey & start_key, const TiKVKey & end_key); RegionDataReadInfo readDataByWriteIt( const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, bool need_value = true) const; diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp index 907e4520c3f..ee7e40437c3 100644 --- a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp +++ b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp @@ -273,6 +273,32 @@ typename RegionCFDataBase::Data & RegionCFDataBase::getDataMut() return data; } +template +void RegionCFDataBase::deleteRange(const TiKVKey & start_key, const TiKVKey & end_key) +{ + for (auto data_it = data.begin(); data_it != data.end();) + { + auto & ori_map = data_it->second; + + for (auto it = ori_map.begin(); it != ori_map.end();) + { + const auto & key = getTiKVKey(it->second); + + bool ok = start_key ? key >= start_key : true; + ok = ok && (end_key ? key < end_key : true); + if (ok) + it = ori_map.erase(it); + else + ++it; + } + + if (ori_map.empty()) + data_it = data.erase(data_it); + else + ++data_it; + } +} + template struct RegionCFDataBase; template struct RegionCFDataBase; template struct RegionCFDataBase; diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.h b/dbms/src/Storages/Transaction/RegionCFDataBase.h index 6e0d7bd003a..d06b1e6bde7 100644 --- a/dbms/src/Storages/Transaction/RegionCFDataBase.h +++ b/dbms/src/Storages/Transaction/RegionCFDataBase.h @@ -67,6 +67,8 @@ struct RegionCFDataBase TableIDSet getAllTables() const; + void deleteRange(const TiKVKey & start_key, const TiKVKey & end_key); + private: static bool shouldIgnoreInsert(const Value & value); static bool shouldIgnoreRemove(const Value & value); diff --git a/dbms/src/Storages/Transaction/RegionData.cpp b/dbms/src/Storages/Transaction/RegionData.cpp index feb0c1ae251..a8297bbff03 100644 --- a/dbms/src/Storages/Transaction/RegionData.cpp +++ b/dbms/src/Storages/Transaction/RegionData.cpp @@ -197,4 +197,22 @@ RegionData::RegionData(RegionData && data) UInt8 RegionData::getWriteType(const ConstWriteCFIter & write_it) { return RegionWriteCFDataTrait::getWriteType(write_it->second); } +void RegionData::deleteRange(const ColumnFamilyType cf, const TiKVKey & start_key, const TiKVKey & end_key) +{ + switch (cf) + { + case Write: + write_cf.deleteRange(start_key, end_key); + break; + case Default: + default_cf.deleteRange(start_key, end_key); + break; + case Lock: + lock_cf.deleteRange(start_key, end_key); + break; + default: + throw Exception("[RegionData::deleteRange] with undefined CF, should not happen", ErrorCodes::LOGICAL_ERROR); + } +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionData.h b/dbms/src/Storages/Transaction/RegionData.h index 16ea4554d79..1fdc1dec5d2 100644 --- a/dbms/src/Storages/Transaction/RegionData.h +++ b/dbms/src/Storages/Transaction/RegionData.h @@ -63,6 +63,8 @@ class RegionData RegionData(RegionData && data); + void deleteRange(const ColumnFamilyType cf, const TiKVKey & start_key, const TiKVKey & end_key); + public: static UInt8 getWriteType(const ConstWriteCFIter & write_it);