Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-358] Support Region DeleteRange #139

Merged
merged 3 commits into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1072,21 +1072,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
}
}

{
BlocksList blocks;
if (region_block_data[special_region_index])
blocks.emplace_back(std::move(region_block_data[special_region_index]));

if (!blocks.empty())
{
BlockInputStreamPtr region_input_stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));

region_input_stream = func_make_version_filter_input(region_input_stream);

merging.emplace_back(region_input_stream);
}
}

if (log->debug())
{
std::stringstream ss;
Expand All @@ -1106,6 +1091,21 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
<< region_block_data[special_region_index].rows() << " rows from memory");
}

{
BlocksList blocks;
if (region_block_data[special_region_index])
blocks.emplace_back(std::move(region_block_data[special_region_index]));

if (!blocks.empty())
{
BlockInputStreamPtr region_input_stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));

region_input_stream = func_make_version_filter_input(region_input_stream);

merging.emplace_back(region_input_stream);
}
}

if (!merging.empty())
{
union_regions_stream.emplace_back(func_make_multi_way_merge_sort_input(merging));
Expand Down
19 changes: 19 additions & 0 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,19 @@ RaftCommandResult Region::onCommand(enginepb::CommandRequest && cmd)
case raft_cmdpb::CmdType::ReadIndex:
LOG_WARNING(log, toString(false) << " skip unsupported command: " << raft_cmdpb::CmdType_Name(type));
break;
case raft_cmdpb::CmdType::DeleteRange:
{
const auto & delete_range = req.delete_range();
const auto & cf = delete_range.cf();
const auto & start = static_cast<const TiKVKey &>(delete_range.start_key());
const auto & end = static_cast<const TiKVKey &>(delete_range.end_key());

LOG_INFO(log,
toString(false) << " start to execute " << raft_cmdpb::CmdType_Name(type) << ", CF: " << cf
<< ", start key in hex: " << start.toHex() << ", end key in hex: " << end.toHex());
doDeleteRange(cf, start, end);
break;
}
default:
{
throw Exception(
Expand Down Expand Up @@ -634,4 +647,10 @@ void Region::compareAndCompleteSnapshot(const Timestamp safe_point, const Region
compareAndCompleteSnapshot(handle_map, table_id, safe_point);
}

void Region::doDeleteRange(const std::string & cf, const TiKVKey & start_key, const TiKVKey & end_key)
{
auto type = getCf(cf);
return data.deleteRange(type, start_key, end_key);
}

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class Region : public std::enable_shared_from_this<Region>

TableID doInsert(const std::string & cf, TiKVKey && key, TiKVValue && value);
TableID doRemove(const std::string & cf, const TiKVKey & key);
void doDeleteRange(const std::string & cf, const TiKVKey & start_key, const TiKVKey & end_key);

RegionDataReadInfo readDataByWriteIt(
const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, bool need_value = true) const;
Expand Down
26 changes: 26 additions & 0 deletions dbms/src/Storages/Transaction/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,32 @@ typename RegionCFDataBase<Trait>::Data & RegionCFDataBase<Trait>::getDataMut()
return data;
}

template <typename Trait>
void RegionCFDataBase<Trait>::deleteRange(const TiKVKey & start_key, const TiKVKey & end_key)
{
for (auto data_it = data.begin(); data_it != data.end();)
{
auto & ori_map = data_it->second;

for (auto it = ori_map.begin(); it != ori_map.end();)
{
const auto & key = getTiKVKey(it->second);

bool ok = start_key ? key >= start_key : true;
ok = ok && (end_key ? key < end_key : true);
if (ok)
it = ori_map.erase(it);
else
++it;
}

if (ori_map.empty())
data_it = data.erase(data_it);
else
++data_it;
}
}

template struct RegionCFDataBase<RegionWriteCFDataTrait>;
template struct RegionCFDataBase<RegionDefaultCFDataTrait>;
template struct RegionCFDataBase<RegionLockCFDataTrait>;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct RegionCFDataBase

TableIDSet getAllTables() const;

void deleteRange(const TiKVKey & start_key, const TiKVKey & end_key);

private:
static bool shouldIgnoreInsert(const Value & value);
static bool shouldIgnoreRemove(const Value & value);
Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Storages/Transaction/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,22 @@ RegionData::RegionData(RegionData && data)

UInt8 RegionData::getWriteType(const ConstWriteCFIter & write_it) { return RegionWriteCFDataTrait::getWriteType(write_it->second); }

void RegionData::deleteRange(const ColumnFamilyType cf, const TiKVKey & start_key, const TiKVKey & end_key)
{
switch (cf)
{
case Write:
write_cf.deleteRange(start_key, end_key);
break;
case Default:
default_cf.deleteRange(start_key, end_key);
break;
case Lock:
lock_cf.deleteRange(start_key, end_key);
break;
default:
throw Exception("[RegionData::deleteRange] with undefined CF, should not happen", ErrorCodes::LOGICAL_ERROR);
}
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class RegionData

RegionData(RegionData && data);

void deleteRange(const ColumnFamilyType cf, const TiKVKey & start_key, const TiKVKey & end_key);

public:
static UInt8 getWriteType(const ConstWriteCFIter & write_it);

Expand Down