Skip to content

Commit

Permalink
[FLASH-358] Support Region DeleteRange (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
solotzg authored Jul 25, 2019
1 parent 605acbc commit 6e19709
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 15 deletions.
30 changes: 15 additions & 15 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1071,21 +1071,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
}
}

{
BlocksList blocks;
if (region_block_data[special_region_index])
blocks.emplace_back(std::move(region_block_data[special_region_index]));

if (!blocks.empty())
{
BlockInputStreamPtr region_input_stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));

region_input_stream = func_make_version_filter_input(region_input_stream);

merging.emplace_back(region_input_stream);
}
}

if (log->debug())
{
std::stringstream ss;
Expand All @@ -1105,6 +1090,21 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
<< region_block_data[special_region_index].rows() << " rows from memory");
}

{
BlocksList blocks;
if (region_block_data[special_region_index])
blocks.emplace_back(std::move(region_block_data[special_region_index]));

if (!blocks.empty())
{
BlockInputStreamPtr region_input_stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));

region_input_stream = func_make_version_filter_input(region_input_stream);

merging.emplace_back(region_input_stream);
}
}

if (!merging.empty())
{
union_regions_stream.emplace_back(func_make_multi_way_merge_sort_input(merging));
Expand Down
19 changes: 19 additions & 0 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,19 @@ RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd)
case raft_cmdpb::CmdType::ReadIndex:
LOG_WARNING(log, toString(false) << " skip unsupported command: " << raft_cmdpb::CmdType_Name(type));
break;
case raft_cmdpb::CmdType::DeleteRange:
{
const auto & delete_range = req.delete_range();
const auto & cf = delete_range.cf();
const auto & start = static_cast<const TiKVKey &>(delete_range.start_key());
const auto & end = static_cast<const TiKVKey &>(delete_range.end_key());

LOG_INFO(log,
toString(false) << " start to execute " << raft_cmdpb::CmdType_Name(type) << ", CF: " << cf
<< ", start key in hex: " << start.toHex() << ", end key in hex: " << end.toHex());
doDeleteRange(cf, start, end);
break;
}
default:
{
throw Exception(
Expand Down Expand Up @@ -634,4 +647,10 @@ void Region::compareAndCompleteSnapshot(const Timestamp safe_point, const Region
compareAndCompleteSnapshot(handle_map, table_id, safe_point);
}

void Region::doDeleteRange(const std::string & cf, const TiKVKey & start_key, const TiKVKey & end_key)
{
auto type = getCf(cf);
return data.deleteRange(type, start_key, end_key);
}

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class Region : public std::enable_shared_from_this<Region>

TableID doInsert(const std::string & cf, TiKVKey && key, TiKVValue && value);
TableID doRemove(const std::string & cf, const TiKVKey & key);
void doDeleteRange(const std::string & cf, const TiKVKey & start_key, const TiKVKey & end_key);

RegionDataReadInfo readDataByWriteIt(
const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, bool need_value = true) const;
Expand Down
26 changes: 26 additions & 0 deletions dbms/src/Storages/Transaction/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,32 @@ typename RegionCFDataBase<Trait>::Data & RegionCFDataBase<Trait>::getDataMut()
return data;
}

template <typename Trait>
void RegionCFDataBase<Trait>::deleteRange(const TiKVKey & start_key, const TiKVKey & end_key)
{
for (auto data_it = data.begin(); data_it != data.end();)
{
auto & ori_map = data_it->second;

for (auto it = ori_map.begin(); it != ori_map.end();)
{
const auto & key = getTiKVKey(it->second);

bool ok = start_key ? key >= start_key : true;
ok = ok && (end_key ? key < end_key : true);
if (ok)
it = ori_map.erase(it);
else
++it;
}

if (ori_map.empty())
data_it = data.erase(data_it);
else
++data_it;
}
}

template struct RegionCFDataBase<RegionWriteCFDataTrait>;
template struct RegionCFDataBase<RegionDefaultCFDataTrait>;
template struct RegionCFDataBase<RegionLockCFDataTrait>;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct RegionCFDataBase

TableIDSet getAllTables() const;

void deleteRange(const TiKVKey & start_key, const TiKVKey & end_key);

private:
static bool shouldIgnoreInsert(const Value & value);
static bool shouldIgnoreRemove(const Value & value);
Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Storages/Transaction/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,22 @@ RegionData::RegionData(RegionData && data)

UInt8 RegionData::getWriteType(const ConstWriteCFIter & write_it) { return RegionWriteCFDataTrait::getWriteType(write_it->second); }

void RegionData::deleteRange(const ColumnFamilyType cf, const TiKVKey & start_key, const TiKVKey & end_key)
{
switch (cf)
{
case Write:
write_cf.deleteRange(start_key, end_key);
break;
case Default:
default_cf.deleteRange(start_key, end_key);
break;
case Lock:
lock_cf.deleteRange(start_key, end_key);
break;
default:
throw Exception("[RegionData::deleteRange] with undefined CF, should not happen", ErrorCodes::LOGICAL_ERROR);
}
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class RegionData

RegionData(RegionData && data);

void deleteRange(const ColumnFamilyType cf, const TiKVKey & start_key, const TiKVKey & end_key);

public:
static UInt8 getWriteType(const ConstWriteCFIter & write_it);

Expand Down

0 comments on commit 6e19709

Please sign in to comment.