Skip to content

Commit

Permalink
Refactor and encapsulate for pre-decode (#201)
Browse files Browse the repository at this point in the history
* refactor and encapsulate for pre-decode

* fix bug when throw exception in RegionCFDataBase::insert

* fix bug in TMTStorages::getAllStorage
  • Loading branch information
solotzg authored Aug 26, 2019
1 parent 827de9f commit 3b181d8
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 67 deletions.
115 changes: 51 additions & 64 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionBlockReader.h>
#include <Storages/Transaction/TiDB.h>
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>
#include <Storages/Transaction/RegionBlockReaderHelper.hpp>

namespace DB
{
Expand Down Expand Up @@ -127,13 +126,12 @@ void setPKVersionDel(ColumnUInt8 & delmark_col,
}
}

using ColumnIdToInfoIndexMap = google::dense_hash_map<ColumnID, UInt16>;
using ColumnIdToInfoIndexMap = google::dense_hash_map<ColumnID, size_t>;
using SchemaAllColumnIds = google::dense_hash_set<ColumnID>;

/// DecodeRowSkip function will try to jump over unnecessary field.
bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id_to_info_index,
const SchemaAllColumnIds & schema_all_column_ids, DecodedRow & additional_decoded_row,
std::vector<DecodedRow::const_iterator> & decoded_col_iter, const bool force_decode)
const SchemaAllColumnIds & schema_all_column_ids, DecodedRecordData & decoded_data, const bool force_decode)
{
const String & raw_value = value.getStr();
size_t cursor = 0;
Expand Down Expand Up @@ -162,8 +160,7 @@ bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & colum
}
else
{
additional_decoded_row.emplace_back(col_id, DecodeDatum(cursor, raw_value));
decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1);
decoded_data.emplace_back(col_id, DecodeDatum(cursor, raw_value));
}
}

Expand All @@ -179,8 +176,7 @@ bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & colum

/// DecodeRow function will try to get pre-decoded fields from value, if is none, just decode its str.
bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id_to_info_index,
const SchemaAllColumnIds & schema_all_column_ids, DecodedRow & additional_decoded_row,
std::vector<DecodedRow::const_iterator> & decoded_col_iter, const bool force_decode)
const SchemaAllColumnIds & schema_all_column_ids, DecodedRecordData & decoded_data, const bool force_decode)
{
auto & decoded_row_info = value.extraInfo();
const DecodedRow * id_fields_ptr = decoded_row_info.load();
Expand All @@ -190,7 +186,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id

const DecodedRow & id_fields = *id_fields_ptr;

for (auto it = id_fields.begin(); it != id_fields.end(); ++it)
for (auto it = id_fields.cbegin(); it != id_fields.cend(); ++it)
{
const auto & ele = *it;
const auto & col_id = ele.col_id;
Expand All @@ -204,7 +200,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id

if (column_id_to_info_index.count(col_id))
{
decoded_col_iter.emplace_back(it);
decoded_data.push_back(it);
}
}

Expand All @@ -215,7 +211,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id
}
else
{
return DecodeRowSkip(value, column_id_to_info_index, schema_all_column_ids, additional_decoded_row, decoded_col_iter, force_decode);
return DecodeRowSkip(value, column_id_to_info_index, schema_all_column_ids, decoded_data, force_decode);
}
}

Expand All @@ -232,21 +228,19 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,

ColumnID handle_col_id = InvalidColumnID;

constexpr size_t MustHaveColCnt = 3; // pk, del, version
constexpr ColumnID EmptyColumnID = InvalidColumnID - 1;

using ColTypePair = std::pair<MutableColumnPtr, NameAndTypePair>;
google::dense_hash_map<ColumnID, std::shared_ptr<ColTypePair>> column_map;
column_map.set_empty_key(EmptyColumnID);
// column_map contains columns in column_names_to_read exclude del and version.
ColumnDataInfoMap column_map(column_names_to_read.size() - MustHaveColCnt + 1, EmptyColumnID);

// column_id_to_info_index contains columns in column_names_to_read exclude pk, del and version
ColumnIdToInfoIndexMap column_id_to_info_index;
column_id_to_info_index.set_empty_key(EmptyColumnID);

SchemaAllColumnIds schema_all_column_ids;
schema_all_column_ids.set_empty_key(EmptyColumnID);

if (table_info.columns.size() > std::numeric_limits<ColumnIdToInfoIndexMap::mapped_type>::max())
throw Exception("Too many columns in schema", ErrorCodes::LOGICAL_ERROR);

for (size_t i = 0; i < table_info.columns.size(); i++)
{
auto & column_info = table_info.columns[i];
Expand All @@ -257,26 +251,30 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
{
continue;
}
auto ch_col = columns.getPhysical(col_name);
column_map.insert(std::make_pair(col_id, std::make_shared<ColTypePair>(ch_col.type->createColumn(), ch_col)));
column_map[col_id]->first->reserve(data_list.size());

{
auto ch_col = columns.getPhysical(col_name);
auto mut_col = ch_col.type->createColumn();
column_map.insert(col_id, std::move(mut_col), std::move(ch_col), i, data_list.size());
}

if (table_info.pk_is_handle && column_info.hasPriKeyFlag())
handle_col_id = col_id;
else
column_id_to_info_index.insert(std::make_pair(col_id, i));
}

if (column_names_to_read.size() - 3 != column_id_to_info_index.size())
if (column_names_to_read.size() - MustHaveColCnt != column_id_to_info_index.size())
throw Exception("schema doesn't contain needed columns.", ErrorCodes::LOGICAL_ERROR);

if (!table_info.pk_is_handle)
{
auto ch_col = columns.getPhysical(MutableSupport::tidb_pk_column_name);
column_map.insert(std::make_pair(handle_col_id, std::make_shared<ColTypePair>(ch_col.type->createColumn(), ch_col)));
column_map[handle_col_id]->first->reserve(data_list.size());
auto mut_col = ch_col.type->createColumn();
column_map.insert(handle_col_id, std::move(mut_col), std::move(ch_col), -1, data_list.size());
}

const TMTPKType pk_type = getTMTPKType(*column_map[handle_col_id]->second.type);
const TMTPKType pk_type = getTMTPKType(*column_map.getNameAndTypePair(handle_col_id).type);

if (pk_type == TMTPKType::UINT64)
ReorderRegionDataReadList(data_list);
Expand All @@ -296,26 +294,19 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
break;
}

func(*delmark_col, *version_col, column_map[handle_col_id]->first, data_list, start_ts);
func(*delmark_col, *version_col, column_map.getMutableColumnPtr(handle_col_id), data_list, start_ts);
}

const size_t target_col_size = column_names_to_read.size() - 3;

Block block;
const size_t target_col_size = column_names_to_read.size() - MustHaveColCnt;

// optimize for only need handle, tso, delmark.
if (column_names_to_read.size() > 3)
if (column_names_to_read.size() > MustHaveColCnt)
{
google::dense_hash_set<ColumnID> decoded_col_ids_set;
decoded_col_ids_set.set_empty_key(EmptyColumnID);
DecodedRecordData decoded_data(column_id_to_info_index.size());

// TODO: optimize columns' insertion, use better implementation rather than Field, it's terrible.
DecodedRow additional_decoded_row;
std::vector<DecodedRow::const_iterator> decoded_col_iter;

/// Notice: iterator of std::vector will invalid after the capacity changed, so !!! must set the capacity of
/// additional_decoded_row big enough
additional_decoded_row.reserve(table_info.columns.size());

for (const auto & [handle, write_type, commit_ts, value_ptr] : data_list)
{
Expand All @@ -325,40 +316,36 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
if (commit_ts > start_ts)
continue;

decoded_col_iter.clear();
additional_decoded_row.clear();
decoded_data.clear();

if (write_type == Region::DelFlag)
{
for (const auto & item : column_id_to_info_index)
{
const auto & column = table_info.columns[item.second];

additional_decoded_row.emplace_back(column.id, GenDecodeRow(column));
decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1);
decoded_data.emplace_back(column.id, GenDecodeRow(column));
}
}
else
{
bool schema_matches = DecodeRow(
*value_ptr, column_id_to_info_index, schema_all_column_ids, additional_decoded_row, decoded_col_iter, force_decode);
bool schema_matches = DecodeRow(*value_ptr, column_id_to_info_index, schema_all_column_ids, decoded_data, force_decode);
if (!schema_matches && !force_decode)
return std::make_tuple(block, false);
return std::make_tuple(Block(), false);
}

/// Modify `row` by adding missing column values or removing useless column values.
if (unlikely(decoded_col_iter.size() > column_id_to_info_index.size()))
if (unlikely(decoded_data.size() > column_id_to_info_index.size()))
{
throw Exception("read unexpected columns.", ErrorCodes::LOGICAL_ERROR);
}

// redundant column values (column id not in current schema) has been dropped when decoding row
// this branch handles the case when the row doesn't contain all the needed column
if (decoded_col_iter.size() < column_id_to_info_index.size())
if (decoded_data.size() < column_id_to_info_index.size())
{
decoded_col_ids_set.clear_no_resize();
for (const auto & e : decoded_col_iter)
decoded_col_ids_set.insert(e->col_id);
for (size_t i = 0; i < decoded_data.size(); ++i)
decoded_col_ids_set.insert(decoded_data[i].col_id);

for (const auto & item : column_id_to_info_index)
{
Expand All @@ -367,26 +354,23 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,

const auto & column = table_info.columns[item.second];

additional_decoded_row.emplace_back(column.id,
decoded_data.emplace_back(column.id,
column.hasNoDefaultValueFlag() ? (column.hasNotNullFlag() ? GenDecodeRow(column) : Field())
: column.defaultValueToField());
decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1);
}
}

if (decoded_col_iter.size() != target_col_size)
if (decoded_data.size() != target_col_size)
throw Exception("decode row error.", ErrorCodes::LOGICAL_ERROR);

/// Transform `row` to columnar format.
for (const auto & iter : decoded_col_iter)
for (size_t data_idx = 0; data_idx < decoded_data.size(); ++data_idx)
{
const ColumnID & col_id = iter->col_id;
const Field & field = iter->field;
const ColumnInfo & column_info = table_info.columns[column_id_to_info_index[col_id]];
const ColumnID & col_id = decoded_data[data_idx].col_id;
const Field & field = decoded_data[data_idx].field;

auto it = column_map.find(col_id);
if (it == column_map.end())
throw Exception("col_id not found in column_map", ErrorCodes::LOGICAL_ERROR);
auto & col_info = column_map[col_id];
const ColumnInfo & column_info = table_info.columns[ColumnDataInfoMap::getIndex(col_info)];

DatumFlat datum(field, column_info.tp);
const Field & unflattened = datum.field();
Expand All @@ -397,21 +381,23 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
{
const auto & data_type = it->second->second.type;
const auto & data_type = ColumnDataInfoMap::getNameAndTypePair(col_info).type;
throw Exception("Detected overflow when decoding data " + std::to_string(unflattened.get<UInt64>()) + " of column "
+ column_info.name + " with type " + data_type->getName(),
ErrorCodes::LOGICAL_ERROR);
}

return std::make_tuple(block, false);
return std::make_tuple(Block(), false);
}
auto & mut_col = it->second->first;
auto & mut_col = ColumnDataInfoMap::getMutableColumnPtr(col_info);
mut_col->insert(unflattened);
}
}
}

decoded_data.checkValid();
}

Block block;
for (const auto & name : column_names_to_read)
{
if (name == MutableSupport::delmark_column_name)
Expand All @@ -424,11 +410,12 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
}
else
{
Int64 col_id = table_info.getColumnID(name);
block.insert({std::move(column_map[col_id]->first), column_map[col_id]->second.type, name});
ColumnID col_id = table_info.getColumnID(name);
block.insert({std::move(column_map.getMutableColumnPtr(col_id)), column_map.getNameAndTypePair(col_id).type, name});
}
}

column_map.checkValid();
return std::make_tuple(std::move(block), true);
}

Expand Down
94 changes: 94 additions & 0 deletions dbms/src/Storages/Transaction/RegionBlockReaderHelper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

struct ColumnDataInfoMap
{
using ColTypeInfo = std::tuple<MutableColumnPtr, NameAndTypePair, size_t>;
using ColTypeInfoData = std::vector<ColTypeInfo>;

ColumnDataInfoMap(const size_t cap, const ColumnID empty_id)
{
column_data.reserve(cap);
column_map.set_empty_key(empty_id);
ori_cap = column_data.capacity();
}

/// Notice: iterator of std::vector will invalid after the capacity changed, so !!! must set the capacity big enough
void checkValid() const
{
if (ori_cap != column_data.capacity())
throw Exception("ColumnDataInfoMap capacity changes", ErrorCodes::LOGICAL_ERROR);
}

void insert(const ColumnID col_id, MutableColumnPtr && ptr, NameAndTypePair && name_pair, size_t index, const size_t cap)
{
column_data.emplace_back(std::move(ptr), std::move(name_pair), index);
column_map.insert(std::make_pair(col_id, column_data.end() - 1));
getMutableColumnPtr(col_id)->reserve(cap);
}

MutableColumnPtr & getMutableColumnPtr(const ColumnID col_id) { return getMutableColumnPtr((*this)[col_id]); }
static MutableColumnPtr & getMutableColumnPtr(ColTypeInfo & info) { return std::get<0>(info); }

NameAndTypePair & getNameAndTypePair(const ColumnID col_id) { return getNameAndTypePair((*this)[col_id]); }
static NameAndTypePair & getNameAndTypePair(ColTypeInfo & info) { return std::get<1>(info); }

static size_t getIndex(const ColTypeInfo & info) { return std::get<2>(info); }

ColTypeInfo & operator[](const ColumnID col_id) { return *column_map[col_id]; }

private:
ColTypeInfoData column_data;
google::dense_hash_map<ColumnID, ColTypeInfoData::iterator> column_map;
size_t ori_cap;
};

struct DecodedRecordData
{
DecodedRecordData(const size_t cap)
{
additional_decoded_row.reserve(cap);
ori_cap = additional_decoded_row.capacity();
}

/// just like ColumnDataInfoMap::checkValid
void checkValid() const
{
if (ori_cap != additional_decoded_row.capacity())
throw Exception("DecodedRecordData capacity changes", ErrorCodes::LOGICAL_ERROR);
}

size_t size() const { return decoded_col_iter.size(); }

void clear()
{
additional_decoded_row.clear();
decoded_col_iter.clear();
}

const DecodedRow::value_type & operator[](const size_t index) const { return *decoded_col_iter[index]; }

template <class... _Args>
void emplace_back(_Args &&... __args)
{
additional_decoded_row.emplace_back(std::forward<_Args>(__args)...);
decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1);
}

void push_back(const DecodedRow::const_iterator & iter) { decoded_col_iter.push_back(iter); }

private:
DecodedRow additional_decoded_row;
std::vector<DecodedRow::const_iterator> decoded_col_iter;
size_t ori_cap;
};

} // namespace DB
3 changes: 1 addition & 2 deletions dbms/src/Storages/Transaction/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ TableID RegionCFDataBase<Trait>::insert(const TableID table_id, std::pair<Key, V
{
auto & map = data[table_id];
auto [it, ok] = map.emplace(std::move(kv_pair));
std::ignore = it;
if (!ok)
throw Exception("Found existing key in hex: " + getTiKVKey(kv_pair.second).toHex(), ErrorCodes::LOGICAL_ERROR);
throw Exception("Found existing key in hex: " + getTiKVKey(it->second).toHex(), ErrorCodes::LOGICAL_ERROR);

if constexpr (std::is_same_v<Trait, RegionWriteCFDataTrait>)
extra.add(Trait::getRowRawValuePtr(it->second));
Expand Down
Loading

0 comments on commit 3b181d8

Please sign in to comment.