Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-731] DMFile support Nullable columns #344

Merged
merged 7 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ class DMFile : private boost::noncopyable
String chunkStatPath() { return path() + "/chunk"; }
// Do not gc me.
String ngcPath() { return path() + "/" + NGC_FILE_NAME; }
String colDataPath(ColId col_id) { return path() + "/" + DB::toString(col_id) + ".dat"; }
String colIndexPath(ColId col_id) { return path() + "/" + DB::toString(col_id) + ".idx"; }
String colEdgePath(ColId col_id) { return path() + "/" + DB::toString(col_id) + ".edge"; }
String colMarkPath(ColId col_id) { return path() + "/" + DB::toString(col_id) + ".mrk"; }
String colDataPath(const String & file_name_base) { return path() + "/" + file_name_base + ".dat"; }
String colIndexPath(const String & file_name_base) { return path() + "/" + file_name_base + ".idx"; }
String colEdgePath(const String & file_name_base) { return path() + "/" + file_name_base + ".edge"; }
String colMarkPath(const String & file_name_base) { return path() + "/" + file_name_base + ".mrk"; }

const auto & getColumnStat(ColId col_id)
const ColumnStat & getColumnStat(ColId col_id)
{
auto it = column_stats.find(col_id);
if (it == column_stats.end())
Expand Down Expand Up @@ -105,6 +105,11 @@ class DMFile : private boost::noncopyable
const ColumnStats & getColumnStats() { return column_stats; }
Status getStatus() { return status; }

static String getFileNameBase(ColId col_id, const IDataType::SubstreamPath & substream = {})
{
return IDataType::getFileNameForStream(DB::toString(col_id), substream);
}

private:
DMFile(UInt64 file_id_, UInt64 ref_id_, const String & parent_path_, Status status_, Logger * log_)
: file_id(file_id_), ref_id(ref_id_), parent_path(parent_path_), status(status_), log(log_)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileChunkFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DMFileChunkFilter
if (param.indexes.count(col_id))
return;

auto index_path = dmfile->colIndexPath(col_id);
auto index_path = dmfile->colIndexPath(DMFile::getFileNameBase(col_id));
Poco::File index_file(index_path);
if (!index_file.exists())
return;
Expand Down
32 changes: 26 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ namespace DB
namespace DM
{

DMFileReader::Stream::Stream(DMFileReader & reader, ColId col_id, size_t aio_threshold, size_t max_read_buffer_size, Logger * log)
DMFileReader::Stream::Stream(DMFileReader & reader, //
ColId col_id,
const String & file_name_base,
size_t aio_threshold,
size_t max_read_buffer_size,
Logger * log)
: avg_size_hint(reader.dmfile->getColumnStat(col_id).avg_size)
{
String mark_path = reader.dmfile->colMarkPath(col_id);
String data_path = reader.dmfile->colDataPath(col_id);
String mark_path = reader.dmfile->colMarkPath(file_name_base);
String data_path = reader.dmfile->colDataPath(file_name_base);

auto mark_load = [&]() -> MarksInCompressedFilePtr {
auto res = std::make_shared<MarksInCompressedFile>(reader.dmfile->getChunks());
Expand Down Expand Up @@ -122,7 +127,17 @@ DMFileReader::DMFileReader(bool enable_clean_read_,

for (auto & cd : read_columns)
{
column_streams.emplace(cd.id, std::make_unique<Stream>(*this, cd.id, aio_threshold, max_read_buffer_size, log));
auto callback = [&](const IDataType::SubstreamPath & substream) {
String stream_name = DMFile::getFileNameBase(cd.id, substream);
auto stream = std::make_unique<Stream>(*this, //
cd.id,
stream_name,
aio_threshold,
max_read_buffer_size,
log);
column_streams.emplace(stream_name, std::move(stream));
};
cd.type->enumerateStreams(callback, {});
}
}

Expand Down Expand Up @@ -216,7 +231,8 @@ Block DMFileReader::read()
}
else
{
auto & stream = column_streams.at(cd.id);
String stream_name = DMFile::getFileNameBase(cd.id);
auto & stream = column_streams.at(stream_name);
if (shouldSeek(start_chunk_id) || skip_chunks_by_column[i] > 0)
{
auto & mark = (*stream->marks)[start_chunk_id];
Expand All @@ -225,7 +241,11 @@ Block DMFileReader::read()

auto column = cd.type->createColumn();
cd.type->deserializeBinaryBulkWithMultipleStreams(*column, //
[&](const IDataType::SubstreamPath &) { return stream->buf.get(); },
[&](const IDataType::SubstreamPath & substream) {
String name = DMFile::getFileNameBase(cd.id, substream);
auto & stream = column_streams.at(name);
return stream->buf.get();
},
read_rows,
stream->avg_size_hint,
true,
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@ class DMFileReader
public:
struct Stream
{
Stream(DMFileReader & reader, ColId col_id, size_t aio_threshold, size_t max_read_buffer_size, Logger * log);
Stream(DMFileReader & reader, //
ColId col_id,
const String & file_name_base,
size_t aio_threshold,
size_t max_read_buffer_size,
Logger * log);

double avg_size_hint;
MarksInCompressedFilePtr marks;

std::unique_ptr<CompressedReadBufferFromFile> buf;
};
using StreamPtr = std::unique_ptr<Stream>;
using ColumnStreams = std::map<ColId, StreamPtr>;
using ColumnStreams = std::map<String, StreamPtr>;

DMFileReader(bool enable_clean_read_,
UInt64 max_data_version_,
Expand All @@ -53,6 +58,7 @@ class DMFileReader
private:
bool shouldSeek(size_t chunk_id);


private:
bool enable_clean_read;
UInt64 max_data_version;
Expand Down
93 changes: 62 additions & 31 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,27 @@ DMFileWriter::DMFileWriter(const DMFilePtr & dmfile_,
// TODO: currently we only generate index for Integers, Date, DateTime types, and this should be configurable by user.

bool do_index = !wal_mode && (cd.type->isInteger() || cd.type->isDateOrDateTime());
auto stream = std::make_unique<Stream>(dmfile, //
cd.id,
cd.type,
compression_settings,
max_compress_block_size,
do_index);
column_streams.emplace(cd.id, std::move(stream));
addStreams(cd.id, cd.type, do_index);
dmfile->column_stats.emplace(cd.id, ColumnStat{cd.id, cd.type, /*avg_size=*/0});
}
}

void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)
{
auto callback = [&](const IDataType::SubstreamPath & substream_path) {
String stream_name = DMFile::getFileNameBase(col_id, substream_path);
auto stream = std::make_unique<Stream>(dmfile, //
stream_name,
type,
compression_settings,
max_compress_block_size,
IDataType::isNullMap(substream_path) ? false : do_index);
column_streams.emplace(stream_name, std::move(stream));
};

type->enumerateStreams(callback, {});
}

void DMFileWriter::write(const Block & block, size_t not_clean_rows)
{
DMFile::ChunkStat stat;
Expand Down Expand Up @@ -74,48 +84,69 @@ void DMFileWriter::finalize()

void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColumn & column)
{
size_t rows = column.size();
auto & stream = column_streams.at(col_id);
if (stream->minmaxes)
stream->minmaxes->addChunk(column, nullptr);
size_t rows = column.size();

/// There could already be enough data to compress into the new block.
if (stream->original_hashing.offset() >= min_compress_block_size)
stream->original_hashing.next();
type.enumerateStreams(
[&](const IDataType::SubstreamPath & substream) {
String name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(name);
if (stream->minmaxes)
stream->minmaxes->addChunk(column, nullptr);

auto offset_in_compressed_block = stream->original_hashing.offset();
if (unlikely(wal_mode && offset_in_compressed_block != 0))
throw Exception("Offset in compressed block is expected to be 0, now " + DB::toString(offset_in_compressed_block));
/// There could already be enough data to compress into the new block.
if (stream->original_hashing.offset() >= min_compress_block_size)
stream->original_hashing.next();

writeIntBinary(stream->plain_hashing.count(), stream->mark_file);
writeIntBinary(offset_in_compressed_block, stream->mark_file);
auto offset_in_compressed_block = stream->original_hashing.offset();
if (unlikely(wal_mode && offset_in_compressed_block != 0))
throw Exception("Offset in compressed block is expected to be 0, now " + DB::toString(offset_in_compressed_block));

writeIntBinary(stream->plain_hashing.count(), stream->mark_file);
writeIntBinary(offset_in_compressed_block, stream->mark_file);
},
{});

type.serializeBinaryBulkWithMultipleStreams(column, //
[&](const IDataType::SubstreamPath &) { return &(stream->original_hashing); },
[&](const IDataType::SubstreamPath & substream) {
String stream_name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(stream_name);
return &(stream->original_hashing);
},
0,
rows,
true,
{});

if (wal_mode)
stream->flush();
else
stream->original_hashing.nextIfAtEnd();
type.enumerateStreams(
[&](const IDataType::SubstreamPath & substream) {
String name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(name);
if (wal_mode)
stream->flush();
else
stream->original_hashing.nextIfAtEnd();
},
{});

auto & avg_size = dmfile->column_stats.at(col_id).avg_size;
IDataType::updateAvgValueSizeHint(column, avg_size);
}

void DMFileWriter::finalizeColumn(ColId col_id, const IDataType & type)
{
auto & stream = column_streams.at(col_id);
stream->flush();
auto callback = [&](const IDataType::SubstreamPath & substream) {
String stream_name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(stream_name);
stream->flush();

if (stream->minmaxes)
{
WriteBufferFromFile buf(dmfile->colIndexPath(col_id));
stream->minmaxes->write(type, buf);
}
if (stream->minmaxes)
{
WriteBufferFromFile buf(dmfile->colIndexPath(stream_name));
stream->minmaxes->write(type, buf);
}
};

type.enumerateStreams(callback, {});
}

} // namespace DM
Expand Down
17 changes: 11 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ class DMFileWriter
struct Stream
{
Stream(const DMFilePtr & dmfile,
ColId col_id,
const String & file_base_name,
const DataTypePtr & type,
CompressionSettings compression_settings,
size_t max_compress_block_size,
bool do_index)
: plain_file(createWriteBufferFromFileBase(dmfile->colDataPath(col_id), 0, 0, max_compress_block_size)),
: plain_file(createWriteBufferFromFileBase(dmfile->colDataPath(file_base_name), 0, 0, max_compress_block_size)),
plain_hashing(*plain_file),
compressed_buf(plain_hashing, compression_settings),
original_hashing(compressed_buf),
minmaxes(do_index ? std::make_shared<MinMaxIndex>(*type) : nullptr),
mark_file(dmfile->colMarkPath(col_id))
mark_file(dmfile->colMarkPath(file_base_name))
{
}

Expand All @@ -54,11 +54,11 @@ class DMFileWriter
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer original_hashing;

MinMaxIndexPtr minmaxes;
WriteBufferFromFile mark_file;
MinMaxIndexPtr minmaxes;
WriteBufferFromFile mark_file;
};
using StreamPtr = std::unique_ptr<Stream>;
using ColumnStreams = std::map<ColId, StreamPtr>;
using ColumnStreams = std::map<String, StreamPtr>;

public:
DMFileWriter(const DMFilePtr & dmfile_,
Expand All @@ -75,6 +75,11 @@ class DMFileWriter
void writeColumn(ColId col_id, const IDataType & type, const IColumn & column);
void finalizeColumn(ColId col_id, const IDataType & type);

/// Add streams with specified column id. Since a single column may have more than one Stream,
/// for example Nullable column has a NullMap column, we would track them with a mapping
/// FileNameBase -> Stream.
void addStreams(ColId col_id, DataTypePtr type, bool do_index);

private:
DMFilePtr dmfile;
ColumnDefines write_columns;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ target_link_libraries(dm_test_delta_tree dbms gtest_main)

add_executable(dm_test_chunk EXCLUDE_FROM_ALL gtest_dm_chunk.cpp)
target_link_libraries(dm_test_chunk dbms gtest_main)

add_executable(dm_test_dmfile EXCLUDE_FROM_ALL gtest_dm_file.cpp)
target_link_libraries(dm_test_dmfile dbms gtest_main clickhouse_functions)
20 changes: 10 additions & 10 deletions dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class DMTestEnv
Block block;
const size_t num_rows = (end - beg);
{
ColumnWithTypeAndName col1(std::make_shared<DataTypeInt64>(), pk_name);
{
IColumn::MutablePtr m_col = col1.type->createColumn();
ColumnWithTypeAndName col1({}, std::make_shared<DataTypeInt64>(), pk_name, EXTRA_HANDLE_COLUMN_ID);
IColumn::MutablePtr m_col = col1.type->createColumn();
// insert form large to small
for (size_t i = 0; i < num_rows; i++)
{
Expand All @@ -98,33 +98,33 @@ class DMTestEnv
m_col->insert(field);
}
col1.column = std::move(m_col);
block.insert(col1);
}
block.insert(col1);

ColumnWithTypeAndName version_col(VERSION_COLUMN_TYPE, VERSION_COLUMN_NAME);
{
IColumn::MutablePtr m_col = version_col.type->createColumn();
ColumnWithTypeAndName version_col({}, VERSION_COLUMN_TYPE, VERSION_COLUMN_NAME, VERSION_COLUMN_ID);
IColumn::MutablePtr m_col = version_col.type->createColumn();
for (size_t i = 0; i < num_rows; ++i)
{
Field field = tso;
m_col->insert(field);
}
version_col.column = std::move(m_col);
block.insert(version_col);
}
block.insert(version_col);

ColumnWithTypeAndName tag_col(TAG_COLUMN_TYPE, TAG_COLUMN_NAME);
{
IColumn::MutablePtr m_col = tag_col.type->createColumn();
auto & column_data = typeid_cast<ColumnVector<UInt8> &>(*m_col).getData();
ColumnWithTypeAndName tag_col({}, TAG_COLUMN_TYPE, TAG_COLUMN_NAME, TAG_COLUMN_ID);
IColumn::MutablePtr m_col = tag_col.type->createColumn();
auto & column_data = typeid_cast<ColumnVector<UInt8> &>(*m_col).getData();
column_data.resize(num_rows);
for (size_t i = 0; i < num_rows; ++i)
{
column_data[i] = 0;
}
tag_col.column = std::move(m_col);
block.insert(tag_col);
}
block.insert(tag_col);
}
return block;
}
Expand Down
Loading