Skip to content

Commit

Permalink
DM: support rough index in Storage level. (pingcap#233)
Browse files Browse the repository at this point in the history
* DM: support rough set index in Storage level.

* put delta_deletes into snapshot

* rename sort_column to handle_column

* remove useless code

* address comments
  • Loading branch information
flowbehappy authored and JaySon-Huang committed Oct 23, 2019
1 parent 5f0815f commit 1b2933c
Show file tree
Hide file tree
Showing 50 changed files with 2,110 additions and 327 deletions.
2 changes: 2 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ add_headers_and_sources(dbms src/Interpreters/ClusterProxy)
add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/DeltaMerge)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Index)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Filter)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Storages/Transaction)
Expand Down
20 changes: 17 additions & 3 deletions dbms/src/Storages/DeltaMerge/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ void Chunk::serialize(WriteBuffer & buf) const
writeIntBinary(d.rows, buf);
writeIntBinary(d.bytes, buf);
writeStringBinary(d.type->getName(), buf);
if (d.minmax)
{
writePODBinary(true, buf);
d.minmax->write(*d.type, buf);
}
else
{
writePODBinary(false, buf);
}
}
}

Expand All @@ -46,8 +55,11 @@ Chunk Chunk::deserialize(ReadBuffer & buf)
readIntBinary(d.rows, buf);
readIntBinary(d.bytes, buf);
readStringBinary(type, buf);

d.type = DataTypeFactory::instance().get(type);
bool has_minmax;
readPODBinary(has_minmax, buf);
if (has_minmax)
d.minmax = MinMaxIndex::read(*d.type, buf);

chunk.columns.emplace(d.col_id, d);

Expand Down Expand Up @@ -108,9 +120,9 @@ BufferAndSize serializeColumn(const IColumn & column, const DataTypePtr & type,

Chunk prepareChunkDataWrite(const DMContext & dm_context, const GenPageId & gen_data_page_id, WriteBatch & wb, const Block & block)
{
auto & handle_col_data = getColumnVectorData<Handle>(block, block.getPositionByName(dm_context.table_handle_define.name));
auto & handle_col_data = getColumnVectorData<Handle>(block, block.getPositionByName(dm_context.handle_column.name));
Chunk chunk(handle_col_data[0], handle_col_data[handle_col_data.size() - 1]);
for (const auto & col_define : dm_context.table_columns)
for (const auto & col_define : dm_context.store_columns)
{
auto col_id = col_define.id;
const IColumn & column = *(block.getByName(col_define.name).column);
Expand All @@ -122,6 +134,8 @@ Chunk prepareChunkDataWrite(const DMContext & dm_context, const GenPageId & gen_
d.rows = column.size();
d.bytes = size;
d.type = col_define.type;
d.minmax = std::make_shared<MinMaxIndex>(
*col_define.type, column, static_cast<const ColumnVector<UInt8> &>(*block.getByName(TAG_COLUMN_NAME).column), 0, column.size());

wb.putPage(d.page_id, 0, buf, size);
chunk.insert(d);
Expand Down
21 changes: 15 additions & 6 deletions dbms/src/Storages/DeltaMerge/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <DataTypes/IDataType.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/Index/RSIndex.h>
#include <Storages/DeltaMerge/Range.h>
#include <Storages/Page/PageStorage.h>

Expand All @@ -23,11 +24,12 @@ static constexpr size_t CHUNK_SERIALIZE_BUFFER_SIZE = 65536;

struct ColumnMeta
{
ColId col_id;
PageId page_id;
UInt32 rows;
UInt64 bytes;
DataTypePtr type;
ColId col_id;
PageId page_id;
UInt32 rows;
UInt64 bytes;
DataTypePtr type;
MinMaxIndexPtr minmax;
};
using ColumnMetas = std::vector<ColumnMeta>;

Expand All @@ -36,7 +38,6 @@ class Chunk
public:
using ColumnMetaMap = std::unordered_map<ColId, ColumnMeta>;


Chunk() : Chunk(0, 0) {}
Chunk(Handle handle_first_, Handle handle_last_) : handle_start(handle_first_), handle_end(handle_last_), is_delete_range(false) {}
explicit Chunk(const HandleRange & delete_range) : handle_start(delete_range.start), handle_end(delete_range.end), is_delete_range(true)
Expand Down Expand Up @@ -78,6 +79,14 @@ class Chunk
return it->second;
}

const ColumnMeta * tryGetColumn(ColId col_id) const
{
auto it = columns.find(col_id);
if (unlikely(it == columns.end()))
return nullptr;
return &it->second;
}

const ColumnMetaMap & getMetas() const { return columns; }

void insert(const ColumnMeta & c)
Expand Down
36 changes: 28 additions & 8 deletions dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>

#include <Storages/DeltaMerge/Chunk.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>

namespace DB
{
Expand All @@ -12,28 +13,47 @@ namespace DM
class ChunkBlockInputStream final : public IBlockInputStream
{
public:
ChunkBlockInputStream(const Chunks & chunks_, const ColumnDefines & read_columns_, const PageReader & page_reader_)
: chunks(chunks_), read_columns(read_columns_), page_reader(page_reader_)
ChunkBlockInputStream(const Chunks & chunks_,
const RSOperatorPtr & filter,
const ColumnDefines & read_columns_,
const PageReader & page_reader_)
: chunks(chunks_), skip_chunks(chunks.size()), read_columns(read_columns_), page_reader(page_reader_)
{
for (size_t i = 0; i < chunks.size(); ++i)
{
if (!filter)
{
skip_chunks[i] = 0;
continue;
}
auto & chunk = chunks[i];
RSCheckParam param;
for (auto & [col_id, meta] : chunk.getMetas())
param.indexes.emplace(col_id, RSIndex(meta.type, meta.minmax));

skip_chunks[i] = filter->roughCheck(param) == None;
}
}

String getName() const override { return "Chunk"; }
Block getHeader() const override { return toEmptyBlock(read_columns); }

Block read() override
{
if (!hasNextBlock())
if (!hasNext())
return {};
return readChunk(chunks[chunk_index++], read_columns, page_reader);
}

bool hasNextBlock() { return chunk_index < chunks.size(); }
HandlePair nextBlockHandle() { return chunks[chunk_index].getHandleFirstLast(); }
size_t nextBlockRows() { return chunks[chunk_index].getRows(); }
void skipNextBlock() { ++chunk_index; }
bool hasNext() { return chunk_index < chunks.size(); }
size_t nextRows() { return chunks[chunk_index].getRows(); }
bool shouldSkipNext() { return skip_chunks[chunk_index]; }
void skipNext() { ++chunk_index; }

private:
Chunks chunks;
Chunks chunks;
std::vector<UInt8> skip_chunks;

size_t chunk_index = 0;
ColumnDefines read_columns;
PageReader page_reader;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ struct DMContext

// The schema snapshot
// We need a consistent snapshot of columns, copy ColumnsDefines
const ColumnDefines table_columns;
const ColumnDefine table_handle_define;
const ColumnDefines store_columns;
const ColumnDefine handle_column;

const UInt64 min_version;

Expand Down
18 changes: 1 addition & 17 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,11 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
/// If handle_real_type_ is empty, means do not convert handle column back to real type.
DMSegmentThreadInputStream(const SegmentReadTaskPoolPtr & task_pool_,
const SegmentStreamCreator & stream_creator_,
const ColumnDefines & columns_to_read_,
const String & handle_name_,
const DataTypePtr & handle_real_type_,
const Context & context_)
const ColumnDefines & columns_to_read_)
: task_pool(task_pool_),
stream_creator(stream_creator_),
columns_to_read(columns_to_read_),
header(toEmptyBlock(columns_to_read)),
handle_name(handle_name_),
handle_real_type(handle_real_type_),
context(context_),
log(&Logger::get("DMSegmentThreadInputStream"))
{
}
Expand Down Expand Up @@ -74,13 +68,6 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
Block res;
for (auto & cd : columns_to_read)
res.insert(original_block.getByName(cd.name));

if (handle_real_type && res.has(handle_name))
{
auto pos = res.getPositionByName(handle_name);
convertColumn(res, pos, handle_real_type, context);
res.getByPosition(pos).type = handle_real_type;
}
return res;
}

Expand All @@ -89,9 +76,6 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
SegmentStreamCreator stream_creator;
ColumnDefines columns_to_read;
Block header;
String handle_name;
DataTypePtr handle_real_type;
const Context & context;

bool done = false;
BlockInputStreamPtr cur_stream;
Expand Down
Loading

0 comments on commit 1b2933c

Please sign in to comment.