Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-270] Delete Range Data At First while Applying Snapshot #75

Merged
merged 35 commits into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
350cd73
change getGCSafePoint in MockPDClient.
solotzg May 20, 2019
65c3037
Add del col to secondary sorting expression.
solotzg May 20, 2019
c92453a
Complete operation in memory.
solotzg May 20, 2019
9fbd896
check all table while applying snapshot.
solotzg May 20, 2019
6e9ded9
!!!tmp
solotzg May 21, 2019
3e34280
Merge remote-tracking branch 'pingcap/master' into FLASH-270
solotzg May 21, 2019
9abf554
Revert "!!!tmp"
solotzg May 21, 2019
93f7d9d
!!!tmp
solotzg May 21, 2019
3e85a92
Small fix.
solotzg May 22, 2019
f3569e6
rollback operation about deleting record.
solotzg May 22, 2019
0b10c65
Optimize executor.
solotzg May 22, 2019
7210284
optimize ReplacingTMTSortedBlockInputStream.
solotzg May 22, 2019
6db5de9
optimize for only need handle, tso, delmark.
solotzg May 23, 2019
acf76b6
import TMTSortCursor.
solotzg May 23, 2019
2a27711
finish.
solotzg May 23, 2019
b0c8399
fix.
solotzg May 23, 2019
7759b63
fix.
solotzg May 23, 2019
1019e44
fix.
solotzg May 23, 2019
146690b
Optimize for count(*).
solotzg May 23, 2019
c860f47
fix bug.
solotzg May 23, 2019
d3a77a5
Fix
solotzg May 24, 2019
dacb64c
optimize TMTSortedBlockInputStream.
solotzg May 24, 2019
3a13a98
Optimize log.
solotzg May 24, 2019
2ef4eaa
no need to report after persisted.
solotzg May 25, 2019
b4d3c12
Optimize memory copy in Region::onCommand.
solotzg May 25, 2019
acea86c
more mock test.
solotzg May 25, 2019
189f1b1
optimize snapshot.
solotzg May 26, 2019
92ee816
Fix Region::compareAndCompleteSnapshot.
solotzg May 26, 2019
e73b8c5
fix compile
solotzg May 26, 2019
2bca1d2
Optimize when pk is int64 or uint64.
solotzg May 27, 2019
68b450d
fix compile.
solotzg May 27, 2019
457c872
Small fix: add const to parameter.
solotzg May 29, 2019
5e0d2c8
optimize performance.
solotzg May 29, 2019
3270582
Format.
solotzg May 29, 2019
f2e51a8
address comment.
solotzg May 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/client-c/include/pd/MockPDClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class MockPDClient : public IClient {

uint64_t getGCSafePoint() override
{
return (Clock::now() - Seconds(60 * 60)).time_since_epoch().count();
return (Clock::now() - Seconds(2)).time_since_epoch().count();
}

uint64_t getTS() override
Expand Down
197 changes: 197 additions & 0 deletions dbms/src/Core/TMTSortCursor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
#pragma once

#include <Columns/ColumnsNumber.h>
#include <Core/SortCursor.h>

namespace DB
{

union TMTCmpOptimizedRes
{
Int32 all;
std::array<Int8, 4> diffs;
solotzg marked this conversation as resolved.
Show resolved Hide resolved
};

static_assert(sizeof(TMTCmpOptimizedRes) == 4);

enum TMTPKType
{
INT64,
UINT64,
UNSPECIFIED,
};

/// type of pk column will be int64, uint64 and others(int32, int8, uint32 ...).
/// type of version column is uint64.
/// type of delmark column is uint8.
/// order of sorting will always be pk -> version -> delmark
template <bool just_diff, bool only_pk, TMTPKType pk_type>
inline TMTCmpOptimizedRes cmpTMTCursor(
const ColumnRawPtrs & lsort_columns, const size_t lhs_pos, const ColumnRawPtrs & rsort_columns, const size_t rhs_pos)
{
TMTCmpOptimizedRes res{.all = 0};

// PK
if constexpr (pk_type == TMTPKType::INT64)
{
auto h1 = static_cast<const ColumnInt64 *>(lsort_columns[0])->getElement(lhs_pos);
auto h2 = static_cast<const ColumnInt64 *>(rsort_columns[0])->getElement(rhs_pos);

if constexpr (just_diff)
{
res.diffs[0] = h1 != h2;
}
else
{
res.diffs[0] = h1 == h2 ? 0 : (h1 > h2 ? 1 : -1);
}
}
else if constexpr (pk_type == TMTPKType::UINT64)
{
auto h1 = static_cast<const ColumnUInt64 *>(lsort_columns[0])->getElement(lhs_pos);
solotzg marked this conversation as resolved.
Show resolved Hide resolved
auto h2 = static_cast<const ColumnUInt64 *>(rsort_columns[0])->getElement(rhs_pos);

if constexpr (just_diff)
{
res.diffs[0] = h1 != h2;
}
else
{
res.diffs[0] = h1 == h2 ? 0 : (h1 > h2 ? 1 : -1);
}
}
else
{
res.diffs[0] = lsort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rsort_columns[0]), 0);
}

if constexpr (only_pk)
{
return res;
}

// VERSION
{
auto t1 = static_cast<const ColumnUInt64 *>(lsort_columns[1])->getElement(lhs_pos);
auto t2 = static_cast<const ColumnUInt64 *>(rsort_columns[1])->getElement(rhs_pos);

if constexpr (just_diff)
{
res.diffs[1] = t1 != t2;
}
else
{
res.diffs[1] = t1 == t2 ? 0 : (t1 > t2 ? 1 : -1);
}
}

// DELMARK
{
auto d1 = static_cast<const ColumnUInt8 *>(lsort_columns[2])->getElement(lhs_pos);
auto d2 = static_cast<const ColumnUInt8 *>(rsort_columns[2])->getElement(rhs_pos);

if constexpr (just_diff)
{
res.diffs[2] = d1 != d2;
}
else
{
res.diffs[2] = d1 == d2 ? 0 : (d1 > d2 ? 1 : -1);
}
}

return res;
}


/// optimize SortCursor for TMT engine which must have 3 column: PK, VERSION, DELMARK.
template <bool only_pk = false, TMTPKType pk_type = TMTPKType::UNSPECIFIED>
struct TMTSortCursor
{
SortCursorImpl * impl = nullptr;

TMTSortCursor() {}
TMTSortCursor(SortCursorImpl * impl_) : impl(impl_) {}
SortCursorImpl * operator->() { return impl; }
const SortCursorImpl * operator->() const { return impl; }

bool none() { return !impl; }

bool isSame(const TMTSortCursor & other) const { return impl == other.impl; }
bool notSame(const TMTSortCursor & other) const { return impl != other.impl; }

TMTCmpOptimizedRes cmpIgnOrder(const TMTSortCursor & rhs, const size_t lhs_pos, const size_t rhs_pos) const
{
return cmpTMTCursor<false, only_pk, pk_type>(impl->sort_columns, lhs_pos, rhs.impl->sort_columns, rhs_pos);
}

bool greaterAt(const TMTSortCursor & rhs, const size_t lhs_pos, const size_t rhs_pos) const
{
auto res = cmpIgnOrder(rhs, lhs_pos, rhs_pos);

if constexpr (only_pk)
{
return res.diffs[0] > 0 ? true : (res.diffs[0] < 0 ? false : (impl->order > rhs.impl->order));
}

return greaterAt(res, impl->order, rhs.impl->order);
}

bool lessAtIgnOrder(const TMTSortCursor & rhs, const size_t lhs_pos, const size_t rhs_pos) const
{
auto res = cmpIgnOrder(rhs, lhs_pos, rhs_pos);

if constexpr (only_pk)
return res.diffs[0] < 0;

return lessAtIgnOrder(res);
}

static inline bool greaterAt(const TMTCmpOptimizedRes res, const size_t lorder, const size_t rorder)
{
return res.diffs[0] > 0
? true
: (res.diffs[0] < 0
? false
: (res.diffs[1] > 0
? true
: (res.diffs[1] < 0 ? false : (res.diffs[2] > 0 ? true : (res.diffs[2] < 0 ? false : (lorder > rorder))))));
}

static inline bool lessAtIgnOrder(const TMTCmpOptimizedRes res)
{
return res.diffs[0] < 0
? true
: (res.diffs[0] > 0 ? false : (res.diffs[1] < 0 ? true : (res.diffs[1] > 0 ? false : (res.diffs[2] < 0 ? true : false))));
}

bool totallyLessOrEquals(const TMTSortCursor & rhs) const
{
if (impl->rows == 0 || rhs.impl->rows == 0)
return false;

/// The last row of this cursor is no larger than the first row of the another cursor.
return !greaterAt(rhs, impl->rows - 1, 0);
}

bool greater(const TMTSortCursor & rhs) const { return greaterAt(rhs, impl->pos, rhs.impl->pos); }

bool totallyLessIgnOrder(const TMTSortCursor & rhs) const
{
if (impl->rows == 0 || rhs.impl->rows == 0)
return false;
return lessAtIgnOrder(rhs, impl->rows - 1, 0);
}

bool operator<(const TMTSortCursor & rhs) const { return greater(rhs); }
};

using TMTSortCursorInt64PK = TMTSortCursor<true, TMTPKType::INT64>;
using TMTSortCursorUInt64PK = TMTSortCursor<true, TMTPKType::UINT64>;
using TMTSortCursorUnspecifiedPK = TMTSortCursor<true, TMTPKType::UNSPECIFIED>;

using TMTSortCursorInt64 = TMTSortCursor<false, TMTPKType::INT64>;
using TMTSortCursorUInt64 = TMTSortCursor<false, TMTPKType::UINT64>;
using TMTSortCursorUnspecified = TMTSortCursor<false, TMTPKType::UNSPECIFIED>;

} // namespace DB
31 changes: 26 additions & 5 deletions dbms/src/DataStreams/MergingSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <iomanip>

#include <DataStreams/MergingSortedBlockInputStream.h>

#include <Core/TMTSortCursor.hpp>

namespace DB
{
Expand Down Expand Up @@ -55,10 +55,7 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
has_collation |= cursors[i].has_collation;
}

if (has_collation)
initQueue(queue_with_collation);
else
initQueue(queue);
initQueue();
}

/// Let's check that all source blocks have the same structure.
Expand All @@ -78,6 +75,13 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
}
}

void MergingSortedBlockInputStream::initQueue()
{
if (has_collation)
initQueue(queue_with_collation);
else
initQueue(queue);
}

template <typename TSortCursor>
void MergingSortedBlockInputStream::initQueue(std::priority_queue<TSortCursor> & queue)
Expand Down Expand Up @@ -136,6 +140,23 @@ void MergingSortedBlockInputStream::fetchNextBlock<SortCursor>(const SortCursor
template
void MergingSortedBlockInputStream::fetchNextBlock<SortCursorWithCollation>(const SortCursorWithCollation & current, std::priority_queue<SortCursorWithCollation> & queue);

template
void MergingSortedBlockInputStream::fetchNextBlock<TMTSortCursorInt64PK>(const TMTSortCursorInt64PK & current, std::priority_queue<TMTSortCursorInt64PK> & queue);

template
void MergingSortedBlockInputStream::fetchNextBlock<TMTSortCursorUInt64PK>(const TMTSortCursorUInt64PK & current, std::priority_queue<TMTSortCursorUInt64PK> & queue);

template
void MergingSortedBlockInputStream::fetchNextBlock<TMTSortCursorUnspecifiedPK>(const TMTSortCursorUnspecifiedPK & current, std::priority_queue<TMTSortCursorUnspecifiedPK> & queue);

template
void MergingSortedBlockInputStream::fetchNextBlock<TMTSortCursorInt64>(const TMTSortCursorInt64 & current, std::priority_queue<TMTSortCursorInt64> & queue);

template
void MergingSortedBlockInputStream::fetchNextBlock<TMTSortCursorUInt64>(const TMTSortCursorUInt64 & current, std::priority_queue<TMTSortCursorUInt64> & queue);

template
void MergingSortedBlockInputStream::fetchNextBlock<TMTSortCursorUnspecified>(const TMTSortCursorUnspecified & current, std::priority_queue<TMTSortCursorUnspecified> & queue);

template <typename TSortCursor>
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/MergingSortedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ class MergingSortedBlockInputStream : public IProfilingBlockInputStream
row_ref.columns = &row_ref.shared_block->sort_columns;
}

virtual void initQueue();

private:

/** We support two different cursors - with Collation and without.
Expand Down
Loading