Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-223] Fix Process When UInt64 Is Handle #47

Merged
merged 6 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions dbms/src/DataStreams/MvccTMTSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <DataStreams/MvccTMTSortedBlockInputStream.h>
#include <Storages/MutableSupport.h>
#include <Columns/ColumnsNumber.h>

namespace DB
{
Expand Down Expand Up @@ -59,14 +60,10 @@ void MvccTMTSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
current_key.swap(next_key);
}

if ((*(current->all_columns[version_column_number]))[current->pos].template get<UInt64>() <= read_tso &&
(selected_row.empty()
|| current->all_columns[version_column_number]->compareAt(
current->pos, selected_row.row_num,
*(*selected_row.columns)[version_column_number],
1 ) > 0))
if (auto cur_tso = static_cast<const ColumnUInt64 *>(current->all_columns[version_column_number])->getElement(current->pos); cur_tso <= read_tso)
{
setRowRef(selected_row, current);
if (selected_row.empty() || cur_tso > static_cast<const ColumnUInt64 *>((*selected_row.columns)[version_column_number])->getElement(selected_row.row_num))
setRowRef(selected_row, current);
}

if (!current->isLast()) {
Expand All @@ -88,8 +85,8 @@ void MvccTMTSortedBlockInputStream::merge(MutableColumns & merged_columns, std::

bool MvccTMTSortedBlockInputStream::hasDeleteFlag()
{
UInt8 val = (*(*selected_row.columns)[del_column_number])[selected_row.row_num].template get<UInt8>();
return MutableSupport::DelMark::isDel(val);
const ColumnUInt8 * column = static_cast<const ColumnUInt8 *>((*selected_row.columns)[del_column_number]);
return MutableSupport::DelMark::isDel(column->getElement(selected_row.row_num));
}

}
54 changes: 43 additions & 11 deletions dbms/src/DataStreams/RangesFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,62 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

Block RangesFilterBlockInputStream::readImpl()
template class RangesFilterBlockInputStream<Int64>;
template class RangesFilterBlockInputStream<UInt64>;

struct PKColumnIterator : public std::iterator<std::random_access_iterator_tag, UInt64, size_t>
{
PKColumnIterator & operator++()
{
++pos;
return *this;
}

PKColumnIterator operator=(const PKColumnIterator & itr)
{
pos = itr.pos;
column = itr.column;
return *this;
}

UInt64 operator*() const { return column->getUInt(pos); }

size_t operator-(const PKColumnIterator & itr) const { return pos - itr.pos; }

PKColumnIterator(const int pos_, const IColumn * column_) : pos(pos_), column(column_) {}

void operator+=(size_t n) { pos += n; }

size_t pos;
const IColumn * column;
};

template <typename HandleType>
Block RangesFilterBlockInputStream<HandleType>::readImpl()
{
static const auto func_cmp = [](const UInt64 & a, const Handle & b) -> bool { return static_cast<HandleType>(a) < b; };

while (true)
{
Block block = input->read();
if (!block)
return block;

if (!block.has(handle_col_name))
throw Exception("RangesFilterBlockInputStream: block without _tidb_rowid.", ErrorCodes::LOGICAL_ERROR);
throw Exception("RangesFilterBlockInputStream: block without " + handle_col_name, ErrorCodes::LOGICAL_ERROR);

const ColumnWithTypeAndName & handle_column = block.getByName(handle_col_name);
const ColumnInt64 * column = typeid_cast<const ColumnInt64 *>(handle_column.column.get());
const auto * column = handle_column.column.get();
if (!column)
{
throw Exception("RangesFilterBlockInputStream: _tidb_rowid column should be type ColumnInt64.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
"RangesFilterBlockInputStream: " + handle_col_name + " column should be type numeric", ErrorCodes::LOGICAL_ERROR);
}

size_t rows = block.rows();

auto handle_begin = column->getElement(0);
auto handle_end = column->getElement(rows - 1);
auto handle_begin = static_cast<HandleType>(column->getUInt(0));
auto handle_end = static_cast<HandleType>(column->getUInt(rows - 1));

if (handle_begin >= ranges.second || ranges.first > handle_end)
continue;
Expand All @@ -44,8 +78,7 @@ Block RangesFilterBlockInputStream::readImpl()
}
else
{
size_t pos
= std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.second) - column->getData().cbegin();
size_t pos = std::lower_bound(PKColumnIterator(0, column), PKColumnIterator(rows, column), ranges.second, func_cmp).pos;
size_t pop_num = rows - pos;
for (size_t i = 0; i < block.columns(); i++)
{
Expand All @@ -58,11 +91,10 @@ Block RangesFilterBlockInputStream::readImpl()
}
else
{
size_t pos_begin
= std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.first) - column->getData().cbegin();
size_t pos_begin = std::lower_bound(PKColumnIterator(0, column), PKColumnIterator(rows, column), ranges.first, func_cmp).pos;
size_t pos_end = rows;
if (handle_end >= ranges.second)
pos_end = std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.second) - column->getData().cbegin();
pos_end = std::lower_bound(PKColumnIterator(0, column), PKColumnIterator(rows, column), ranges.second, func_cmp).pos;

size_t len = pos_end - pos_begin;
if (!len)
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/DataStreams/RangesFilterBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/Transaction/Types.h>
#include <Storages/Transaction/TiKVHandle.h>
#include <common/logger_useful.h>

namespace DB
{

template <typename HandleType>
class RangesFilterBlockInputStream : public IProfilingBlockInputStream
{
using Handle = TiKVHandle::Handle<HandleType>;

public:
RangesFilterBlockInputStream(const BlockInputStreamPtr & input_, const HandleRange & ranges_, const String & handle_col_name_)
RangesFilterBlockInputStream(
const BlockInputStreamPtr & input_, const HandleRange<HandleType> & ranges_, const String & handle_col_name_)
: input(input_), ranges(ranges_), handle_col_name(handle_col_name_)
{}

Expand All @@ -29,7 +33,7 @@ class RangesFilterBlockInputStream : public IProfilingBlockInputStream

private:
BlockInputStreamPtr input;
const HandleRange ranges;
const HandleRange<HandleType> ranges;
const String handle_col_name;
Logger * log = &Logger::get("RangesFilterBlockInputStream");
};
Expand Down
50 changes: 34 additions & 16 deletions dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@

namespace DB
{
template class ReplacingTMTSortedBlockInputStream<Int64>;
template class ReplacingTMTSortedBlockInputStream<UInt64>;

void ReplacingTMTSortedBlockInputStream::insertRow(MutableColumns & merged_columns, size_t & merged_rows)
template <typename HandleType>
void ReplacingTMTSortedBlockInputStream<HandleType>::insertRow(MutableColumns & merged_columns, size_t & merged_rows)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*selected_row.columns)[i], selected_row.row_num);
}

Block ReplacingTMTSortedBlockInputStream::readImpl()
template <typename HandleType>
Block ReplacingTMTSortedBlockInputStream<HandleType>::readImpl()
{
if (finished)
return Block();
Expand All @@ -29,16 +33,24 @@ Block ReplacingTMTSortedBlockInputStream::readImpl()

if (deleted_by_range)
{
std::ostringstream ss;
std::stringstream ss;
for (size_t i = 0; i < begin_handle_ranges.size(); ++i)
ss << "(" << begin_handle_ranges[i] << "," << end_handle_ranges[i] << ") ";
LOG_TRACE(log, "deleted by handle range: " << deleted_by_range << " rows, handle ranges: " << ss.str());
{
ss << "[";
begin_handle_ranges[i].toString(ss);
ss << ",";
end_handle_ranges[i].toString(ss);
ss << ") ";
}
LOG_TRACE(log,
"deleted by handle range: " << deleted_by_range << " rows, " << begin_handle_ranges.size() << " handle ranges: " << ss.str());
}

return header.cloneWithColumns(std::move(merged_columns));
}

void ReplacingTMTSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
template <typename HandleType>
void ReplacingTMTSortedBlockInputStream<HandleType>::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
size_t merged_rows = 0;

Expand Down Expand Up @@ -95,7 +107,8 @@ void ReplacingTMTSortedBlockInputStream::merge(MutableColumns & merged_columns,
finished = true;
}

bool ReplacingTMTSortedBlockInputStream::shouldOutput()
template <typename HandleType>
bool ReplacingTMTSortedBlockInputStream<HandleType>::shouldOutput()
{
if (isDefiniteDeleted())
{
Expand All @@ -120,33 +133,38 @@ bool ReplacingTMTSortedBlockInputStream::shouldOutput()
return false;
}

bool ReplacingTMTSortedBlockInputStream::behindGcTso()
template <typename HandleType>
bool ReplacingTMTSortedBlockInputStream<HandleType>::behindGcTso()
{
return (*(*selected_row.columns)[version_column_number])[selected_row.row_num].template get<UInt64>() < gc_tso;
return (*(*selected_row.columns)[version_column_number]).getUInt(selected_row.row_num) < gc_tso;
}

bool ReplacingTMTSortedBlockInputStream::nextHasDiffPk()
template <typename HandleType>
bool ReplacingTMTSortedBlockInputStream<HandleType>::nextHasDiffPk()
{
return (*(*selected_row.columns)[pk_column_number])[selected_row.row_num] != (*(*next_key.columns)[0])[next_key.row_num];
return (*(*selected_row.columns)[pk_column_number]).getUInt(selected_row.row_num)
!= (*(*next_key.columns)[0]).getUInt(next_key.row_num);
}

bool ReplacingTMTSortedBlockInputStream::isDefiniteDeleted()
template <typename HandleType>
bool ReplacingTMTSortedBlockInputStream<HandleType>::isDefiniteDeleted()
{
if (begin_handle_ranges.empty())
return true;
HandleID handle_id = (*(*selected_row.columns)[pk_column_number])[selected_row.row_num].template get<HandleID>();
int pa = std::upper_bound(begin_handle_ranges.begin(), begin_handle_ranges.end(), handle_id) - begin_handle_ranges.begin();
Handle pk_handle = static_cast<HandleType>((*(*selected_row.columns)[pk_column_number]).getUInt(selected_row.row_num));
int pa = std::upper_bound(begin_handle_ranges.begin(), begin_handle_ranges.end(), pk_handle) - begin_handle_ranges.begin();
if (pa == 0)
return true;
else
{
if (handle_id < end_handle_ranges[pa - 1])
if (pk_handle < end_handle_ranges[pa - 1])
return false;
return true;
}
}

void ReplacingTMTSortedBlockInputStream::logRowGoing(const std::string & msg, bool is_output)
template <typename HandleType>
void ReplacingTMTSortedBlockInputStream<HandleType>::logRowGoing(const std::string & msg, bool is_output)
{
// Disable debug log
return;
Expand Down
12 changes: 8 additions & 4 deletions dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
#include <common/logger_useful.h>

#include <DataStreams/MergingSortedBlockInputStream.h>
#include <Storages/Transaction/Types.h>
#include <Storages/Transaction/TiKVHandle.h>

namespace DB
{

// operation merge is optimized because pk is definite integer
template <typename HandleType>
class ReplacingTMTSortedBlockInputStream : public MergingSortedBlockInputStream
{
using Handle = TiKVHandle::Handle<HandleType>;

public:
ReplacingTMTSortedBlockInputStream(const std::vector<std::pair<HandleID, HandleID>> & ranges_,
ReplacingTMTSortedBlockInputStream(const std::vector<HandleRange<HandleType>> & ranges_,
const BlockInputStreams & inputs_,
const SortDescription & description_,
const String & version_column,
Expand Down Expand Up @@ -52,8 +56,8 @@ class ReplacingTMTSortedBlockInputStream : public MergingSortedBlockInputStream
void logRowGoing(const std::string & reason, bool is_output);

private:
std::vector<HandleID> begin_handle_ranges;
std::vector<HandleID> end_handle_ranges;
std::vector<Handle> begin_handle_ranges;
std::vector<Handle> end_handle_ranges;

size_t version_column_number;
size_t del_column_number;
Expand Down
7 changes: 1 addition & 6 deletions dbms/src/DataStreams/VersionFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ Block VersionFilterBlockInputStream::readImpl()
}

const ColumnWithTypeAndName & version_column = block.getByName(version_column_name);
const ColumnUInt64 * column = typeid_cast<const ColumnUInt64 *>(version_column.column.get());
if (!column)
{
throw Exception("VersionFilterBlockInputStream: Version column should be type ColumnUInt64.",
ErrorCodes::LOGICAL_ERROR);
}
const ColumnUInt64 * column = static_cast<const ColumnUInt64 *>(version_column.column.get());

size_t rows = block.rows();
IColumn::Filter filter(rows, 1);
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri
output(ss.str());
}

std::string getRegionKeyString(const HandleID s, const TiKVKey & k)
std::string getRegionKeyString(const TiKVRange::Handle s, const TiKVKey & k)
{
try
{
if (s == std::numeric_limits<HandleID>::min() || s == std::numeric_limits<HandleID>::max())
if (s.type != TiKVHandle::HandleIDType::NORMAL)
{
String raw_key = k.empty() ? "" : RecordKVFormat::decodeTiKVKey(k);
bool is_record = RecordKVFormat::isRecord(raw_key);
Expand All @@ -138,7 +138,7 @@ std::string getRegionKeyString(const HandleID s, const TiKVKey & k)
}
return ss.str();
}
return toString(s);
return toString(s.handle_id);
}
catch (...)
{
Expand All @@ -150,7 +150,7 @@ std::string getStartKeyString(TableID table_id, const TiKVKey & start_key)
{
try
{
HandleID start_handle = TiKVRange::getRangeHandle<true>(start_key, table_id);
auto start_handle = TiKVRange::getRangeHandle<true>(start_key, table_id);
return getRegionKeyString(start_handle, start_key);
}
catch (...)
Expand All @@ -163,7 +163,7 @@ std::string getEndKeyString(TableID table_id, const TiKVKey & end_key)
{
try
{
HandleID end_handle = TiKVRange::getRangeHandle<false>(end_key, table_id);
auto end_handle = TiKVRange::getRangeHandle<false>(end_key, table_id);
return getRegionKeyString(end_handle, end_key);
}
catch (...)
Expand All @@ -182,7 +182,8 @@ void dbgFuncDumpAllRegion(Context & context, const ASTs & args, DBGInvoker::Prin
auto range = region->getHandleRangeByTable(table_id);
size += 1;
std::stringstream ss;
ss << "table #" << table_id << " " << region->toString() << " ranges: " << range.first << ", " << range.second;
ss << "table #" << table_id << " " << region->toString() << " ranges: " << range.first.toString() << ", "
<< range.second.toString();
output(ss.str());
});
output("total size: " + toString(size));
Expand Down
Loading