Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-348] Optimize Some Modules #119

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline

auto table_id = static_cast<StorageMergeTree*>(storage.get()) -> getTableInfo().id;

auto start_key = TiKVRange::getRangeHandle<true>(region.start_key(), table_id);
auto end_key = TiKVRange::getRangeHandle<false>(region.end_key(), table_id);
auto start_key = TiKVRange::getRangeHandle<true, true, std::string>(region.start_key(), table_id);
auto end_key = TiKVRange::getRangeHandle<false, true, std::string>(region.end_key(), table_id);
info.range_in_table = HandleRange<HandleID>(start_key, end_key);
query_info.mvcc_query_info->regions_query_info.push_back(info);
}
Expand Down
21 changes: 12 additions & 9 deletions dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
#include <Interpreters/PartLog.h>
#include <Storages/MergeTree/TxnMergeTreeBlockOutputStream.h>
#include <Storages/StorageMergeTree.h>
#include <Interpreters/PartLog.h>

namespace DB
{

Block TxnMergeTreeBlockOutputStream::getHeader() const
{
return storage.getSampleBlock();
}
Block TxnMergeTreeBlockOutputStream::getHeader() const { return storage.getSampleBlock(); }

void TxnMergeTreeBlockOutputStream::write(const Block & block)
void TxnMergeTreeBlockOutputStream::write(Block && block)
{
storage.data.delayInsertIfNeeded();

Row partition(1, Field(UInt64(partition_id)));
Block block_copy = block;
BlockWithPartition part_block(std::move(block_copy), std::move(partition));

BlockWithPartition part_block(std::move(block), std::move(partition));

Stopwatch watch;

MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(part_block);
storage.data.renameTempPartAndAdd(part, &storage.increment);

PartLog::addNewPartToTheLog(storage.context, * part, watch.elapsed());
PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed());

storage.merge_task_handle->wake();
}

void TxnMergeTreeBlockOutputStream::write(const Block & block)
{
Block block_copy = block;
write(std::move(block_copy));
}

} // namespace DB
14 changes: 7 additions & 7 deletions dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <DataStreams/IBlockOutputStream.h>
#include <Core/Row.h>
#include <DataStreams/IBlockOutputStream.h>
#include <common/logger_useful.h>

namespace DB
Expand All @@ -13,18 +13,18 @@ class StorageMergeTree;
class TxnMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
TxnMergeTreeBlockOutputStream(StorageMergeTree & storage_, UInt64 partition_id_ = 0) :
storage(storage_), log(&Logger::get("TxnMergeTreeBlockOutputStream")), partition_id(partition_id_)
{
}
TxnMergeTreeBlockOutputStream(StorageMergeTree & storage_, UInt64 partition_id_ = 0)
: storage(storage_), log(&Logger::get("TxnMergeTreeBlockOutputStream")), partition_id(partition_id_)
{}

Block getHeader() const override;
void write(const Block & block) override;
void write(Block && block);

private:
StorageMergeTree & storage;
Logger *log;
Logger * log;
size_t partition_id;
};

}
} // namespace DB
3 changes: 0 additions & 3 deletions dbms/src/Storages/Transaction/Codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ static const size_t ENC_GROUP_SIZE = 8;
static const UInt8 ENC_MARKER = static_cast<UInt8>(0xff);
static const char ENC_ASC_PADDING[ENC_GROUP_SIZE] = {0};

static const size_t KEY_SIZE_WITHOUT_TS = ((1 + 8 + 2 + 8)/ENC_GROUP_SIZE+1)*(ENC_GROUP_SIZE+1);


template<typename T>
T DecodeInt(size_t & cursor, const String & raw_value)
{
Expand Down
30 changes: 22 additions & 8 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@
namespace DB
{

using BlockOption = std::optional<Block>;

std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::getBlockInputStreamByRegion(TableID table_id,
RegionTable::BlockOption RegionTable::getBlockInputStreamByRegion(TableID table_id,
RegionPtr region,
const TiDB::TableInfo & table_info,
const ColumnsDescription & columns,
const Names & ordered_columns,
RegionDataReadInfoList & data_list_for_remove)
{
return getBlockInputStreamByRegion(table_id,
return std::get<0>(getBlockInputStreamByRegion(table_id,
region,
InvalidRegionVersion,
InvalidRegionVersion,
Expand All @@ -29,10 +27,11 @@ std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::getBlockInpu
false,
false,
0,
&data_list_for_remove);
&data_list_for_remove,
log));
}

std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::getBlockInputStreamByRegion(TableID table_id,
std::tuple<RegionTable::BlockOption, RegionTable::RegionReadStatus> RegionTable::getBlockInputStreamByRegion(TableID table_id,
RegionPtr region,
const RegionVersion region_version,
const RegionVersion conf_version,
Expand All @@ -42,7 +41,8 @@ std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::getBlockInpu
bool learner_read,
bool resolve_locks,
Timestamp start_ts,
RegionDataReadInfoList * data_list_for_remove)
RegionDataReadInfoList * data_list_for_remove,
Logger * log)
{
if (!region)
return {BlockOption{}, NOT_FOUND};
Expand All @@ -63,6 +63,8 @@ std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::getBlockInpu
if (ordered_columns->size() == 3)
need_value = false;

auto start_time = Clock::now();

{
auto scanner = region->createCommittedScanner(table_id);

Expand Down Expand Up @@ -92,12 +94,24 @@ std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::getBlockInpu
} while (scanner->hasNext());
}

const auto scan_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
start_time = Clock::now();

auto block = RegionBlockRead(*table_info, *columns, *ordered_columns, data_list);

auto compute_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();

if (log)
{
LOG_TRACE(log,
region->toString(false) << " read " << data_list.size() << " rows, cost [scan " << scan_cost << ", compute " << compute_cost
<< "] ms");
}

if (data_list_for_remove)
*data_list_for_remove = std::move(data_list);

return {block, OK};
return {std::move(block), OK};
}
}

Expand Down
28 changes: 21 additions & 7 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,16 @@ StoragePtr RegionTable::getOrCreateStorage(TableID table_id)

RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const Region & region)
{
auto region_id = region.id();
const auto range = region.getRange();
return insertRegion(table, range.first, range.second, region.id());
}

RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const TiKVKey & start, const TiKVKey & end, const RegionID region_id)
{
auto & table_regions = table.regions;
// Insert table mapping.
auto [it, ok] = table_regions.emplace(region_id, InternalRegion(region_id, region.getHandleRangeByTable(table.table_id)));
auto [it, ok]
= table_regions.emplace(region_id, InternalRegion(region_id, TiKVRange::getHandleRangeByTable(start, end, table.table_id)));
if (!ok)
throw Exception(
"[RegionTable::insertRegion] insert duplicate internal region " + DB::toString(region_id), ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -172,6 +178,8 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac

LOG_DEBUG(log, "[flushRegion] table " << table_id << ", [region " << region_id << "] original " << region->dataSize() << " bytes");

UInt64 mem_read_cost = -1, write_part_cost = -1;

RegionDataReadInfoList data_list;
if (storage == nullptr)
{
Expand All @@ -193,18 +201,23 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac
if (names.size() < 3)
throw Exception("[flushRegion] size of merge tree columns < 3, should not happen", ErrorCodes::LOGICAL_ERROR);

auto [block, status] = getBlockInputStreamByRegion(table_id, region, table_info, columns, names, data_list);
auto start_time = Clock::now();

auto block = getBlockInputStreamByRegion(table_id, region, table_info, columns, names, data_list);
if (!block)
{
// no data in region for table. update cache size.
cache_size = region->dataSize();
return;
}

std::ignore = status;
mem_read_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
start_time = Clock::now();

TxnMergeTreeBlockOutputStream output(*merge_tree);
output.write(*block);
output.write(std::move(*block));

write_part_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
}

// remove data in region
Expand Down Expand Up @@ -237,7 +250,9 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac
region->incDirtyFlag();
}

LOG_DEBUG(log, "[flushRegion] table " << table_id << ", [region " << region_id << "] after flush " << cache_size << " bytes");
LOG_DEBUG(log,
"[flushRegion] table " << table_id << ", [region " << region_id << "] after flush " << cache_size << " bytes, cost [mem read "
<< mem_read_cost << ", write part " << write_part_cost << "] ms");
}
}

Expand Down Expand Up @@ -331,7 +346,6 @@ void RegionTable::applySnapshotRegions(const RegionMap & region_map)
if (cache_bytes)
internal_region.updated = true;
}
doShrinkRegionRange(*region);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we dont need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function will only be called during initiation, there will not be any redundancy.

}
}

Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class RegionTable : private boost::noncopyable

InternalRegion & insertRegion(Table & table, const Region & region);
InternalRegion & getOrInsertRegion(TableID table_id, const Region & region);
InternalRegion & insertRegion(Table & table, const TiKVKey & start, const TiKVKey & end, const RegionID region_id);

bool shouldFlush(const InternalRegion & region) const;

Expand Down Expand Up @@ -178,14 +179,15 @@ class RegionTable : private boost::noncopyable
void traverseInternalRegionsByTable(const TableID table_id, std::function<void(const InternalRegion &)> && callback);
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(const TableID table_id);

static std::tuple<std::optional<Block>, RegionReadStatus> getBlockInputStreamByRegion(TableID table_id,
using BlockOption = std::optional<Block>;
BlockOption getBlockInputStreamByRegion(TableID table_id,
RegionPtr region,
const TiDB::TableInfo & table_info,
const ColumnsDescription & columns,
const Names & ordered_columns,
RegionDataReadInfoList & data_list_for_remove);

static std::tuple<std::optional<Block>, RegionReadStatus> getBlockInputStreamByRegion(TableID table_id,
static std::tuple<BlockOption, RegionReadStatus> getBlockInputStreamByRegion(TableID table_id,
RegionPtr region,
const RegionVersion region_version,
const RegionVersion conf_version,
Expand All @@ -195,7 +197,8 @@ class RegionTable : private boost::noncopyable
bool learner_read,
bool resolve_locks,
Timestamp start_ts,
RegionDataReadInfoList * data_list_for_remove = nullptr);
RegionDataReadInfoList * data_list_for_remove = nullptr,
Logger * log = nullptr);

TableIDSet getAllMappedTables(const RegionID region_id) const;
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/TiKVKeyValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ struct StringObject : std::string
}

const std::string & getStr() const { return *this; }
std::string & getStr() { return *this; }
size_t dataSize() const { return size(); }
size_t dataSize() const { return Base::size(); }
std::string toString() const { return *this; }

// For debug
Expand All @@ -67,6 +66,7 @@ struct StringObject : std::string
private:
StringObject(const Base & str_) : Base(str_) {}
StringObject(const StringObject & obj) = delete;
size_t size() const = delete;
};

using TiKVKey = StringObject<true>;
Expand Down
27 changes: 16 additions & 11 deletions dbms/src/Storages/Transaction/TiKVRange.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ namespace TiKVRange

using Handle = TiKVHandle::Handle<HandleID>;

template <bool start, bool decoded = false>
inline Handle getRangeHandle(const TiKVKey & tikv_key, const TableID table_id)
template <bool start, bool decoded = false, typename KeyType = TiKVKey>
inline Handle getRangeHandle(const KeyType & tikv_key, const TableID table_id)
{
if constexpr (decoded)
static_assert(std::is_same_v<KeyType, std::string>);
else
static_assert(std::is_same_v<KeyType, TiKVKey>);

constexpr HandleID min = std::numeric_limits<HandleID>::min();
constexpr HandleID max = std::numeric_limits<HandleID>::max();

Expand All @@ -27,11 +32,17 @@ inline Handle getRangeHandle(const TiKVKey & tikv_key, const TableID table_id)
return Handle::max;
}

String key;
const std::string * raw_key_ptr = nullptr;
std::string decoded_raw_key;
if constexpr (decoded)
key = tikv_key.getStr();
raw_key_ptr = &tikv_key;
else
key = RecordKVFormat::decodeTiKVKey(tikv_key);
{
decoded_raw_key = RecordKVFormat::decodeTiKVKey(tikv_key);
raw_key_ptr = &decoded_raw_key;
}

const std::string & key = *raw_key_ptr;

if (key <= RecordKVFormat::genRawKey(table_id, min))
return Handle::normal_min;
Expand Down Expand Up @@ -62,12 +73,6 @@ inline Handle getRangeHandle(const TiKVKey & tikv_key, const TableID table_id)
}
}

template <bool start>
inline Handle getRangeHandle(const std::string & key, const TableID table_id)
{
return getRangeHandle<start, true>(static_cast<const TiKVKey &>(key), table_id);
}

inline HandleRange<HandleID> getHandleRangeByTable(const TiKVKey & start_key, const TiKVKey & end_key, TableID table_id)
{
auto start_handle = getRangeHandle<true>(start_key, table_id);
Expand Down
Loading