Skip to content

Commit

Permalink
[FLASH-348] Optimize And Add TMT Property To DataPart (#121) (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
solotzg authored Jul 25, 2019
1 parent 4742bef commit e1539d4
Show file tree
Hide file tree
Showing 25 changed files with 371 additions and 133 deletions.
17 changes: 17 additions & 0 deletions dbms/src/Core/TMTPKType.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <DataTypes/IDataType.h>

namespace DB
{

enum TMTPKType
{
INT64,
UINT64,
UNSPECIFIED,
};

TMTPKType getTMTPKType(const IDataType & rhs);

} // namespace DB
8 changes: 1 addition & 7 deletions dbms/src/Core/TMTSortCursor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Columns/ColumnsNumber.h>
#include <Core/SortCursor.h>
#include <Core/TMTPKType.h>

namespace DB
{
Expand All @@ -14,13 +15,6 @@ union TMTCmpOptimizedRes

static_assert(sizeof(TMTCmpOptimizedRes) == 4);

enum TMTPKType
{
INT64,
UINT64,
UNSPECIFIED,
};

/// type of pk column will be int64, uint64 and others(int32, int8, uint32 ...).
/// type of version column is uint64.
/// type of delmark column is uint8.
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline

auto table_id = static_cast<StorageMergeTree*>(storage.get()) -> getTableInfo().id;

auto start_key = TiKVRange::getRangeHandle<true>(region.start_key(), table_id);
auto end_key = TiKVRange::getRangeHandle<false>(region.end_key(), table_id);
auto start_key = TiKVRange::getRangeHandle<true, true, std::string>(region.start_key(), table_id);
auto end_key = TiKVRange::getRangeHandle<false, true, std::string>(region.end_key(), table_id);
info.range_in_table = HandleRange<HandleID>(start_key, end_key);
query_info.mvcc_query_info->regions_query_info.push_back(info);
}
Expand Down
17 changes: 8 additions & 9 deletions dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <Common/SimpleIncrement.h>
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>

#include <Storages/MergeTree/TMTDataPartProperty.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/CHTableHandle.h>
Expand Down Expand Up @@ -671,17 +671,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
std::this_thread::sleep_for(std::chrono::seconds(1));
}

bool pk_is_uint64 = false;
const bool pk_is_uint64 = getTMTPKType(*data.primary_key_data_types[0]) == TMTPKType::UINT64;

const auto handle_col_name = data.getPrimarySortDescription()[0].column_name;

{
const auto pk_type = data.getColumns().getPhysical(handle_col_name).type->getFamilyName();

if (std::strcmp(pk_type, TypeName<UInt64>::get()) == 0)
pk_is_uint64 = true;
}

std::vector<HandleRange<HandleID>> ranges;
tmt.getRegionTable().traverseInternalRegionsByTable(
data.table_info->id,
Expand Down Expand Up @@ -854,6 +847,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
for (const auto & part : parts)
new_data_part->minmax_idx.merge(part->minmax_idx);

if (data.merging_params.mode == MergeTreeData::MergingParams::Txn)
{
for (const auto & part : parts)
new_data_part->tmt_property->merge(*part->tmt_property);
}

/// Print overall profiling info. NOTE: it may duplicates previous messages
{
double elapsed_seconds = merge_entry->watch.elapsedSeconds();
Expand Down
18 changes: 17 additions & 1 deletion dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/TMTDataPartProperty.h>

#include <Poco/File.h>
#include <Poco/Path.h>
Expand Down Expand Up @@ -133,15 +134,26 @@ void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
}
}

MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_, const String & parent_path)
: storage(storage_), name(name_), info(info_), full_path_prefix(parent_path)
{
if (storage.merging_params.mode == MergeTreeData::MergingParams::Txn)
tmt_property = std::make_unique<TMTDataPartProperty>();
}

MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_), name(name_), info(info_)
{
if (storage.merging_params.mode == MergeTreeData::MergingParams::Txn)
tmt_property = std::make_unique<TMTDataPartProperty>();
full_path_prefix = storage.context.getPartPathSelector().getPathForPart(storage, name);
}

MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_)
: storage(storage_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
: storage(storage_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
{
if (storage.merging_params.mode == MergeTreeData::MergingParams::Txn)
tmt_property = std::make_unique<TMTDataPartProperty>();
full_path_prefix = storage.context.getPartPathSelector().getPathForPart(storage, name);
}

Expand Down Expand Up @@ -497,7 +509,11 @@ void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
String full_path = getFullPath();
partition = MergeTreePartition(partition_name);
if (!isEmpty())
{
minmax_idx.load(storage, full_path);
if (storage.merging_params.mode == MergeTreeData::MergingParams::Txn)
tmt_property->load(storage, full_path);
}
}
else if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Storages/MergeTree/MergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,15 @@ namespace DB
{

class MergeTreeData;

struct TMTDataPartProperty;

/// Description of the data part.
struct MergeTreeDataPart
{
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;

MergeTreeDataPart(MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_, const String & parent_path)
: storage(storage_), name(name_), info(info_), full_path_prefix(parent_path)
{
}
MergeTreeDataPart(MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_, const String & parent_path);

MergeTreeDataPart(MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_);

Expand Down Expand Up @@ -206,6 +203,8 @@ struct MergeTreeDataPart

MinMaxIndex minmax_idx;

std::unique_ptr<TMTDataPartProperty> tmt_property;

Checksums checksums;

/// Columns description.
Expand Down
26 changes: 17 additions & 9 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct numeric_limits<__uint128_t>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeThreadBlockInputStream.h>
#include <Storages/MergeTree/TMTDataPartProperty.h>
#include <Storages/MutableSupport.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/Transaction/CHTableHandle.h>
Expand Down Expand Up @@ -148,6 +149,18 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz
return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows));
}

TMTPKType getTMTPKType(const IDataType & rhs)
{
static const DataTypeInt64 & dataTypeInt64 = {};
static const DataTypeUInt64 & dataTypeUInt64 = {};

if (rhs.equals(dataTypeInt64))
return TMTPKType::INT64;
else if (rhs.equals(dataTypeUInt64))
return TMTPKType::UINT64;
return TMTPKType::UNSPECIFIED;
}

BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_to_return,
const SelectQueryInfo & query_info,
const Context & context,
Expand Down Expand Up @@ -248,12 +261,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
;
else if (!select.no_kvstore)
{
const auto pk_family_name = data.getColumns().getPhysical(handle_col_name).type->getFamilyName();

if (std::strcmp(pk_family_name, TypeName<UInt64>::get()) == 0)
pk_type = TMTPKType::UINT64;
else if (std::strcmp(pk_family_name, TypeName<Int64>::get()) == 0)
pk_type = TMTPKType::INT64;
pk_type = getTMTPKType(*data.primary_key_data_types[0]);

TMTContext & tmt = context.getTMTContext();

Expand Down Expand Up @@ -856,7 +864,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
std::sort(handle_ranges.begin(), handle_ranges.end(),
[](const UInt64RangeElement & a, const UInt64RangeElement & b) { return a.first < b.first; });

computeHandleRenges<UInt64>(region_group_mem_block[thread_idx],
computeHandleRanges<UInt64>(region_group_mem_block[thread_idx],
handle_ranges,
region_group_range_parts[thread_idx],
region_group_u64_handle_ranges[thread_idx],
Expand All @@ -880,7 +888,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t

// handle_ranges is sorted.

computeHandleRenges<Int64>(region_group_mem_block[thread_idx],
computeHandleRanges<Int64>(region_group_mem_block[thread_idx],
handle_ranges,
region_group_range_parts[thread_idx],
region_group_handle_ranges[thread_idx],
Expand Down Expand Up @@ -1044,7 +1052,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t

for (const RangesInDataPart & part : parts_with_ranges)
{
MarkRanges mark_ranges = markRangesFromRegionRange<UInt64>(part.data_part->index, handle_range.first,
MarkRanges mark_ranges = markRangesFromRegionRange<UInt64>(*part.data_part, handle_range.first,
handle_range.second, part.ranges, computeMinMarksForSeek(settings, data), settings);

if (mark_ranges.empty())
Expand Down
19 changes: 16 additions & 3 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutorCommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static inline size_t computeMinMarksForSeek(const Settings & settings, const Mer
}

template <typename TargetType>
static inline MarkRanges markRangesFromRegionRange(const MergeTreeData::DataPart::Index & index,
static inline MarkRanges markRangesFromRegionRange(const MergeTreeData::DataPart & data_part,
const TiKVHandle::Handle<TargetType> & handle_begin,
const TiKVHandle::Handle<TargetType> & handle_end,
const MarkRanges & ori_mark_ranges,
Expand All @@ -54,6 +54,19 @@ static inline MarkRanges markRangesFromRegionRange(const MergeTreeData::DataPart
if (handle_end <= handle_begin)
return {};

if (data_part.tmt_property->initialized)
{
TiKVHandle::Handle<TargetType> index_right_handle;
{
UInt64 tmp = data_part.tmt_property->max_pk.get<UInt64>();
index_right_handle = static_cast<TargetType>(tmp);
}
if (handle_begin > index_right_handle)
return {};
}

const auto & index = data_part.index;

MarkRanges res;

size_t marks_count = index.at(0)->size();
Expand Down Expand Up @@ -100,7 +113,7 @@ static inline MarkRanges markRangesFromRegionRange(const MergeTreeData::DataPart
}

template <typename TargetType>
static inline void computeHandleRenges(std::vector<std::deque<size_t>> & block_data,
static inline void computeHandleRanges(std::vector<std::deque<size_t>> & block_data,
std::vector<std::pair<DB::HandleRange<TargetType>, size_t>> & handle_ranges,
std::vector<RangesInDataParts> & region_group_range_parts,
std::vector<DB::HandleRange<TargetType>> & region_group_handle_ranges,
Expand Down Expand Up @@ -139,7 +152,7 @@ static inline void computeHandleRenges(std::vector<std::deque<size_t>> & block_d
for (const RangesInDataPart & ranges : parts_with_ranges)
{
MarkRanges mark_ranges = markRangesFromRegionRange<TargetType>(
ranges.data_part->index, handle_range.first.first, handle_range.first.second, ranges.ranges, min_marks_for_seek, settings);
*ranges.data_part, handle_range.first.first, handle_range.first.second, ranges.ranges, min_marks_for_seek, settings);

if (mark_ranges.empty())
continue;
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/TMTDataPartProperty.h>
#include <Common/escapeForFileName.h>
#include <Common/HashTable/HashMap.h>
#include <Interpreters/AggregationCommon.h>
Expand Down Expand Up @@ -162,8 +163,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
Int64 temp_index = data.insert_increment.get();

MergeTreeDataPart::MinMaxIndex minmax_idx;
TMTDataPartProperty tmt_prop;

minmax_idx.update(block, data.minmax_idx_columns);

if (data.merging_params.mode == MergeTreeData::MergingParams::Txn)
tmt_prop.update(block, data.getPrimarySortDescription()[0].column_name);

MergeTreePartition partition(std::move(block_with_partition.partition));

MergeTreePartInfo new_part_info(partition.getID(data), temp_index, temp_index, 0);
Expand All @@ -189,6 +195,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_name, new_part_info);
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
if (data.merging_params.mode == MergeTreeData::MergingParams::Txn)
*new_data_part->tmt_property = std::move(tmt_prop);
new_data_part->relative_path = TMP_PREFIX + part_name;
new_data_part->is_temp = true;

Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <Common/typeid_cast.h>
#include <Common/MemoryTracker.h>
#include <Poco/File.h>

#include <Storages/MergeTree/TMTDataPartProperty.h>

namespace DB
{
Expand Down Expand Up @@ -393,6 +393,12 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
if (new_part->minmax_idx.initialized)
new_part->minmax_idx.store(storage, part_path, checksums);

if (storage.merging_params.mode == MergeTreeData::MergingParams::Txn)
{
if (new_part->tmt_property->initialized)
new_part->tmt_property->store(storage, part_path, checksums);
}

WriteBufferFromFile count_out(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(count_out);
writeIntText(rows_count, count_out_hashing);
Expand Down
Loading

0 comments on commit e1539d4

Please sign in to comment.