Skip to content

Commit

Permalink
Revert "Implement decompression side of table packing" to make reviewing
Browse files Browse the repository at this point in the history
easie

This reverts commit 19590b1.
  • Loading branch information
gibber9809 committed Jul 2, 2024
1 parent 19590b1 commit 794a732
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 312 deletions.
114 changes: 38 additions & 76 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile);
m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_table_reader.open_tables(archive_path_str + constants::cArchiveTablesFile);
}

void ArchiveReader::read_metadata() {
Expand All @@ -38,74 +38,46 @@ void ArchiveReader::read_metadata() {
cDecompressorFileReadBufferCapacity
);

m_table_reader.read_metadata(m_table_metadata_decompressor);

size_t num_schemas;
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_schemas);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

bool prev_metadata_initialized{false};
SchemaReader::SchemaMetadata prev_metadata{};
int32_t prev_schema_id{};
for (size_t i = 0; i < num_schemas; ++i) {
size_t table_id;
size_t table_offset;
for (size_t i = 0; i < num_schemas; i++) {
int32_t schema_id;
size_t num_messages;
uint64_t num_messages;
size_t table_offset;
size_t uncompressed_size;

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_id);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (table_offset > m_table_reader.get_uncompressed_table_size(table_id)) {
throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (prev_metadata_initialized) {
size_t uncompressed_size{0};
if (table_id != prev_metadata.table_id) {
uncompressed_size
= m_table_reader.get_uncompressed_table_size(prev_metadata.table_id)
- prev_metadata.table_offset;
} else {
uncompressed_size = table_offset - prev_metadata.table_offset;
}
prev_metadata.uncompressed_size = uncompressed_size;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
} else {
prev_metadata_initialized = true;
}
prev_metadata = {table_id, table_offset, num_messages, 0};
prev_schema_id = schema_id;
m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size};
m_schema_ids.push_back(schema_id);
}
prev_metadata.uncompressed_size
= m_table_reader.get_uncompressed_table_size(prev_metadata.table_id)
- prev_metadata.table_offset;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
m_table_metadata_decompressor.close();
}

Expand All @@ -117,12 +89,14 @@ void ArchiveReader::read_dictionaries_and_metadata() {
read_metadata();
}

SchemaReader& ArchiveReader::read_schema_table(
SchemaReader& ArchiveReader::read_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
) {
if (m_id_to_schema_metadata.count(schema_id) == 0) {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

if (m_id_to_table_metadata.count(schema_id) == 0) {
throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__);
}

Expand All @@ -133,26 +107,30 @@ SchemaReader& ArchiveReader::read_schema_table(
should_marshal_records
);

auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto table_buffer = read_table(schema_metadata.table_id, true);
m_schema_reader
.load(table_buffer, schema_metadata.table_offset, schema_metadata.uncompressed_size);
m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
m_schema_reader.load(
m_tables_decompressor,
m_id_to_table_metadata[schema_id].uncompressed_size
);
m_tables_decompressor.close_for_reuse();
return m_schema_reader;
}

std::vector<std::shared_ptr<SchemaReader>> ArchiveReader::read_all_tables() {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

std::vector<std::shared_ptr<SchemaReader>> readers;
readers.reserve(m_id_to_schema_metadata.size());
for (auto schema_id : m_schema_ids) {
readers.reserve(m_id_to_table_metadata.size());
for (auto const& [id, table_metadata] : m_id_to_table_metadata) {
auto schema_reader = std::make_shared<SchemaReader>();
initialize_schema_reader(*schema_reader, schema_id, true, true);
auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto table_buffer = read_table(schema_metadata.table_id, false);
schema_reader->load(
table_buffer,
schema_metadata.table_offset,
schema_metadata.uncompressed_size
);
initialize_schema_reader(*schema_reader, id, true, true);

m_tables_file_reader.try_seek_from_begin(table_metadata.offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
schema_reader->load(m_tables_decompressor, table_metadata.uncompressed_size);
m_tables_decompressor.close_for_reuse();

readers.push_back(std::move(schema_reader));
}
return readers;
Expand Down Expand Up @@ -259,7 +237,7 @@ void ArchiveReader::initialize_schema_reader(
m_schema_tree,
schema_id,
schema.get_ordered_schema_view(),
m_id_to_schema_metadata[schema_id].num_messages,
m_id_to_table_metadata[schema_id].num_messages,
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();
Expand Down Expand Up @@ -306,8 +284,9 @@ void ArchiveReader::initialize_schema_reader(

void ArchiveReader::store(FileWriter& writer) {
std::string message;
for (auto schema_id : m_schema_ids) {
auto& schema_reader = read_schema_table(schema_id, false, true);

for (auto& [id, table_metadata] : m_id_to_table_metadata) {
auto& schema_reader = read_table(id, false, true);
while (schema_reader.get_next_message(message)) {
writer.write(message.c_str(), message.length());
}
Expand All @@ -325,28 +304,11 @@ void ArchiveReader::close() {
m_array_dict->close();
m_timestamp_dict->close();

m_table_reader.close();
m_tables_file_reader.close();
m_table_metadata_file_reader.close();

m_id_to_schema_metadata.clear();
m_id_to_table_metadata.clear();
m_schema_ids.clear();
m_cur_table_id = 0;
m_table_buffer.reset();
m_table_buffer_size = 0ULL;
}

std::shared_ptr<char[]> ArchiveReader::read_table(size_t table_id, bool reuse_buffer) {
if (nullptr != m_table_buffer && m_cur_table_id == table_id) {
return m_table_buffer;
}

if (false == reuse_buffer) {
m_table_buffer.reset();
m_table_buffer_size = 0;
}

m_table_reader.read_table(table_id, m_table_buffer, m_table_buffer_size);
m_cur_table_id = table_id;
return m_table_buffer;
}
} // namespace clp_s
24 changes: 5 additions & 19 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "DictionaryReader.hpp"
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "TableReader.hpp"
#include "TimestampDictionaryReader.hpp"
#include "Utils.hpp"

Expand Down Expand Up @@ -92,11 +91,8 @@ class ArchiveReader {
* @param should_marshal_records
* @return the schema reader
*/
SchemaReader& read_schema_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
);
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);

/**
* Loads all of the tables in the archive and returns SchemaReaders for them.
Expand Down Expand Up @@ -175,14 +171,6 @@ class ArchiveReader {
bool should_marshal_records
);

/**
* Reads a table with given ID from the table reader. If read_table is called multiple times in
* a row for the same table_id a cached buffer is returned. This function allows the caller to
* ask for the same buffer to be reused to read multiple different tables: this can save memory
* allocations, but can only be used when tables are read one at a time.
*/
std::shared_ptr<char[]> read_table(size_t table_id, bool reuse_buffer);

bool m_is_open;
std::string m_archive_id;
std::shared_ptr<VariableDictionaryReader> m_var_dict;
Expand All @@ -193,15 +181,13 @@ class ArchiveReader {
std::shared_ptr<SchemaTree> m_schema_tree;
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
std::vector<int32_t> m_schema_ids;
std::map<int32_t, SchemaReader::SchemaMetadata> m_id_to_schema_metadata;
std::map<int32_t, SchemaReader::TableMetadata> m_id_to_table_metadata;

TableReader m_table_reader;
FileReader m_tables_file_reader;
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_tables_decompressor;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
std::shared_ptr<char[]> m_table_buffer{};
size_t m_table_buffer_size{0ULL};
size_t m_cur_table_id{0ULL};
};
} // namespace clp_s

Expand Down
2 changes: 0 additions & 2 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ set(
SchemaTree.hpp
SchemaWriter.cpp
SchemaWriter.hpp
TableReader.cpp
TableReader.hpp
TimestampDictionaryReader.cpp
TimestampDictionaryReader.hpp
TimestampDictionaryWriter.cpp
Expand Down
18 changes: 11 additions & 7 deletions components/core/src/clp_s/SchemaReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) {
}
}

void SchemaReader::load(
std::shared_ptr<char[]> table_buffer,
size_t offset,
size_t uncompressed_size
) {
m_table_buffer = table_buffer;
BufferViewReader buffer_reader{m_table_buffer.get() + offset, uncompressed_size};
void SchemaReader::load(ZstdDecompressor& decompressor, size_t uncompressed_size) {
if (uncompressed_size > m_table_buffer_size) {
m_table_buffer = std::make_unique<char[]>(uncompressed_size);
m_table_buffer_size = uncompressed_size;
}
auto error = decompressor.try_read_exact_length(m_table_buffer.get(), uncompressed_size);
if (ErrorCodeSuccess != error) {
throw OperationFailed(error, __FILENAME__, __LINE__);
}

BufferViewReader buffer_reader{m_table_buffer.get(), uncompressed_size};
for (auto& reader : m_columns) {
reader->load(buffer_reader, m_num_messages);
}
Expand Down
19 changes: 9 additions & 10 deletions components/core/src/clp_s/SchemaReader.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#ifndef CLP_S_SCHEMAREADER_HPP
#define CLP_S_SCHEMAREADER_HPP

#include <memory>
#include <span>
#include <string>
#include <type_traits>
Expand All @@ -12,6 +11,7 @@
#include "FileReader.hpp"
#include "JsonSerializer.hpp"
#include "SchemaTree.hpp"
#include "ZstdDecompressor.hpp"

namespace clp_s {
class SchemaReader;
Expand Down Expand Up @@ -47,10 +47,9 @@ class SchemaReader {
: TraceableException(error_code, filename, line_number) {}
};

struct SchemaMetadata {
size_t table_id;
size_t table_offset;
size_t num_messages;
struct TableMetadata {
uint64_t num_messages;
size_t offset;
size_t uncompressed_size;
};

Expand Down Expand Up @@ -131,12 +130,11 @@ class SchemaReader {
);

/**
* Loads the encoded messages from a shared buffer starting at a given offset
* @param table_buffer
* @param offset
* Loads the encoded messages
* @param decompressor
* @param uncompressed_size
*/
void load(std::shared_ptr<char[]> table_buffer, size_t offset, size_t uncompressed_size);
void load(ZstdDecompressor& decompressor, size_t uncompressed_size);

/**
* Gets next message
Expand Down Expand Up @@ -279,7 +277,8 @@ class SchemaReader {
std::unordered_map<int32_t, BaseColumnReader*> m_column_map;
std::vector<BaseColumnReader*> m_columns;
std::vector<BaseColumnReader*> m_reordered_columns;
std::shared_ptr<char[]> m_table_buffer;
std::unique_ptr<char[]> m_table_buffer;
size_t m_table_buffer_size{0};

BaseColumnReader* m_timestamp_column;
std::function<epochtime_t()> m_get_timestamp;
Expand Down
Loading

0 comments on commit 794a732

Please sign in to comment.